Skip to content

Commit 786503f

Browse files
author
vlad
committed
small refactoring, move env getters to main
1 parent 8f1f043 commit 786503f

File tree

3 files changed

+13
-84
lines changed

3 files changed

+13
-84
lines changed

main.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ var (
1616
mysqlPassword = os.Getenv("DATABASE_PASSWORD")
1717
mysqlHost = os.Getenv("DATABASE_HOST")
1818
mysqlDbName = os.Getenv("DATABASE_NAME")
19+
20+
influxDbDns = os.Getenv("INFLUX_DB_DNS")
21+
influxDbUsername = os.Getenv("INFLUX_DB_USERNAME")
22+
influxDbPassword = os.Getenv("INFLUX_DB_PASSWORD")
23+
influxDbName = os.Getenv("INFLUX_DB_NAME")
1924
)
2025

2126
func main() {
22-
monitoring := NewMonitoring()
27+
monitoring := NewMonitoring(influxDbDns, influxDbUsername, influxDbPassword, influxDbName)
2328
tasksDeferredService := BuildTriggerHook(monitoring, connection.Options{
2429
User: mysqlUser,
2530
Password: mysqlPassword,

monitoring.go

+7-11
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,23 @@ package main
22

33
import (
44
"log"
5-
"os"
65
"time"
76

87
"github.com/influxdata/influxdb/client/v2"
98
"github.com/pvelx/triggerhook/contracts"
109
)
1110

12-
var (
13-
influxDbDns = os.Getenv("INFLUX_DB_DNS")
14-
influxDbUsername = os.Getenv("INFLUX_DB_USERNAME")
15-
influxDbPassword = os.Getenv("INFLUX_DB_PASSWORD")
16-
)
17-
1811
var sampleSize = 1000
1912
var chPointCap = 10000
13+
var periodSending = 5 * time.Second
2014

2115
type Monitoring struct {
16+
database string
2217
connection client.Client
2318
chPoint chan *client.Point
2419
}
2520

26-
func NewMonitoring() *Monitoring {
21+
func NewMonitoring(influxDbDns, influxDbUsername, influxDbPassword, influxDbName string) *Monitoring {
2722
c, err := client.NewHTTPClient(client.HTTPConfig{
2823
Addr: influxDbDns,
2924
Username: influxDbUsername,
@@ -34,6 +29,7 @@ func NewMonitoring() *Monitoring {
3429
}
3530

3631
return &Monitoring{
32+
database: influxDbName,
3733
connection: c,
3834
chPoint: make(chan *client.Point, chPointCap),
3935
}
@@ -42,13 +38,13 @@ func NewMonitoring() *Monitoring {
4238
func (m *Monitoring) Run() error {
4339
for {
4440
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
45-
Database: "trigger_hook",
41+
Database: m.database,
4642
Precision: "ms",
4743
})
4844
if err != nil {
4945
return err
5046
}
51-
expire := time.After(5 * time.Second)
47+
expire := time.After(periodSending)
5248
for {
5349
select {
5450
case point := <-m.chPoint:
@@ -83,7 +79,7 @@ func (m *Monitoring) AddMeasurement(name string, event contracts.MeasurementEven
8379
event.Time,
8480
)
8581
if err != nil {
86-
log.Fatalln("Error: ", err)
82+
log.Println("Error: ", err)
8783
}
8884

8985
m.chPoint <- point

rabbitmq.go

-72
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ var (
3737
errShutdown = errors.New("session is shutting down")
3838
)
3939

40-
// New creates a new consumer state instance, and automatically
41-
// attempts to connect to the server.
4240
func New(name string, addr string) *Session {
4341
session := Session{
4442
logger: log.New(os.Stdout, "", log.LstdFlags),
@@ -49,8 +47,6 @@ func New(name string, addr string) *Session {
4947
return &session
5048
}
5149

52-
// handleReconnect will wait for a connection error on
53-
// notifyConnClose, and then continuously attempt to reconnect.
5450
func (session *Session) handleReconnect(addr string) {
5551
for {
5652
session.isReady = false
@@ -75,7 +71,6 @@ func (session *Session) handleReconnect(addr string) {
7571
}
7672
}
7773

78-
// connect will create a new AMQP connection
7974
func (session *Session) connect(addr string) (*amqp.Connection, error) {
8075
conn, err := amqp.Dial(addr)
8176

@@ -88,8 +83,6 @@ func (session *Session) connect(addr string) (*amqp.Connection, error) {
8883
return conn, nil
8984
}
9085

91-
// handleReconnect will wait for a channel error
92-
// and then continuously attempt to re-initialize both channels
9386
func (session *Session) handleReInit(conn *amqp.Connection) bool {
9487
for {
9588
session.isReady = false
@@ -119,7 +112,6 @@ func (session *Session) handleReInit(conn *amqp.Connection) bool {
119112
}
120113
}
121114

122-
// init will initialize channel & declare queue
123115
func (session *Session) init(conn *amqp.Connection) error {
124116
ch, err := conn.Channel()
125117

@@ -129,18 +121,6 @@ func (session *Session) init(conn *amqp.Connection) error {
129121

130122
err = ch.Confirm(false)
131123

132-
if err != nil {
133-
return err
134-
}
135-
_, err = ch.QueueDeclare(
136-
session.name,
137-
true, // Durable
138-
false, // Delete when unused
139-
false, // Exclusive
140-
false, // No-wait
141-
nil,
142-
)
143-
144124
if err != nil {
145125
return err
146126
}
@@ -149,54 +129,15 @@ func (session *Session) init(conn *amqp.Connection) error {
149129
session.isReady = true
150130
log.Println("Setup!")
151131

152-
/*
153-
if err := channel.ExchangeDeclare(
154-
a.config.Create.ExchangeName,
155-
a.config.Create.ExchangeType,
156-
true,
157-
false,
158-
false,
159-
false,
160-
nil,
161-
); err != nil {
162-
return errors.Wrap(err, "failed to declare exchange")
163-
}
164-
165-
if _, err := channel.QueueDeclare(
166-
a.config.Create.QueueName,
167-
true,
168-
false,
169-
false,
170-
false,
171-
amqp.Table{"x-queue-mode": "lazy"},
172-
); err != nil {
173-
return errors.Wrap(err, "failed to declare queue")
174-
}
175-
176-
if err := channel.QueueBind(
177-
a.config.Create.QueueName,
178-
a.config.Create.RoutingKey,
179-
a.config.Create.ExchangeName,
180-
false,
181-
nil,
182-
); err != nil {
183-
return errors.Wrap(err, "failed to bind queue")
184-
}
185-
186-
*/
187132
return nil
188133
}
189134

190-
// changeConnection takes a new connection to the queue,
191-
// and updates the close listener to reflect this.
192135
func (session *Session) changeConnection(connection *amqp.Connection) {
193136
session.connection = connection
194137
session.notifyConnClose = make(chan *amqp.Error)
195138
session.connection.NotifyClose(session.notifyConnClose)
196139
}
197140

198-
// changeChannel takes a new channel to the queue,
199-
// and updates the channel listeners to reflect this.
200141
func (session *Session) changeChannel(channel *amqp.Channel) {
201142
session.channel = channel
202143
session.notifyChanClose = make(chan *amqp.Error)
@@ -205,11 +146,6 @@ func (session *Session) changeChannel(channel *amqp.Channel) {
205146
session.channel.NotifyPublish(session.notifyConfirm)
206147
}
207148

208-
// Push will push data onto the queue, and wait for a confirm.
209-
// If no confirms are received until within the resendTimeout,
210-
// it continuously re-sends messages until a confirm is received.
211-
// This will block until the server sends a confirm. Errors are
212-
// only returned if the push action itself fails, see UnsafePush.
213149
func (session *Session) Push(data []byte) error {
214150
if !session.isReady {
215151
return errors.New("failed to push: not connected")
@@ -228,21 +164,14 @@ func (session *Session) Push(data []byte) error {
228164
select {
229165
case confirm := <-session.notifyConfirm:
230166
if confirm.Ack {
231-
//session.logger.Println("Push confirmed!")
232167
return nil
233168
} else {
234-
//session.logger.Println("Push error confirmed!")
235169
}
236170
case <-time.After(resendDelay):
237171
}
238-
//session.logger.Println("Push didn't confirm. Retrying...")
239172
}
240173
}
241174

242-
// UnsafePush will push to the queue without checking for
243-
// confirmation. It returns an error if it fails to connect.
244-
// No guarantees are provided for whether the server will
245-
// recieve the message.
246175
func (session *Session) UnsafePush(data []byte) error {
247176
if !session.isReady {
248177
return errNotConnected
@@ -264,7 +193,6 @@ func (session *Session) UnsafePush(data []byte) error {
264193
return err
265194
}
266195

267-
// Close will cleanly shutdown the channel and connection.
268196
func (session *Session) Close() error {
269197
if !session.isReady {
270198
return errAlreadyClosed

0 commit comments

Comments
 (0)