Skip to content

Commit fe58e2a

Browse files
author
vlad
committed
created builder trigger hook, added monitoring, added delete method to the proto
1 parent 584f1f2 commit fe58e2a

14 files changed

+725
-130
lines changed

Dockerfile

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
FROM golang:1.15
2+
3+
WORKDIR /app
4+
5+
COPY ./ /app
6+
7+
RUN go mod download
8+
9+
ENTRYPOINT go run main.go

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
`cd proto`
22

3-
`protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative task.proto`
3+
`protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./proto/task.proto`

go.mod

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@ go 1.15
44

55
require (
66
github.com/golang/protobuf v1.4.3
7-
github.com/pvelx/triggerHook v0.0.0-20201201162513-7a80e07e5224
7+
github.com/influxdata/influxdb v1.8.3
8+
github.com/pkg/errors v0.9.1
9+
github.com/pvelx/triggerHook v0.0.0-00010101000000-000000000000
10+
github.com/streadway/amqp v1.0.0
811
golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7 // indirect
912
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3 // indirect
1013
golang.org/x/text v0.3.4 // indirect
1114
google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e // indirect
1215
google.golang.org/grpc v1.33.2
1316
google.golang.org/protobuf v1.25.0
1417
)
18+
19+
replace github.com/pvelx/triggerHook => ../trigger-hook

go.sum

+314-2
Large diffs are not rendered by default.

main.go

+52-16
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,70 @@
11
package main
22

33
import (
4-
"github.com/pvelx/triggerHook"
5-
"github.com/pvelx/triggerHookExample/proto"
6-
"github.com/pvelx/triggerHookExample/sendingTransport"
7-
"github.com/pvelx/triggerHookExample/task_server"
8-
"google.golang.org/grpc"
4+
"encoding/json"
5+
"github.com/pvelx/triggerHook/domain"
6+
"github.com/streadway/amqp"
97
"log"
10-
"net"
118
)
129

13-
var tasksDeferredService = triggerHook.Default()
14-
1510
const (
1611
port = ":50051"
1712
)
1813

1914
func main() {
20-
tasksDeferredService.SetTransport(sendingTransport.NewAmqpTransport())
21-
go tasksDeferredService.Run()
2215

23-
taskServer := task_server.New(tasksDeferredService)
16+
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
17+
if err != nil {
18+
log.Fatalf("Dial: %s", err)
19+
}
20+
defer conn.Close()
2421

25-
lis, err := net.Listen("tcp", port)
22+
channel, err := conn.Channel()
2623
if err != nil {
27-
log.Fatalf("failed to listen: %v", err)
24+
log.Fatalf("Error open channel:%v", err)
25+
}
26+
27+
transport := func(task domain.Task) {
28+
taskJson, err := json.Marshal(task)
29+
if err != nil {
30+
log.Fatal(err)
31+
}
32+
33+
if err = channel.Publish(
34+
"test-exchange", // publish to an exchange
35+
"", // routing to 0 or more queues
36+
true, // mandatory
37+
false, // immediate
38+
amqp.Publishing{
39+
Headers: amqp.Table{},
40+
ContentType: "application/json",
41+
ContentEncoding: "",
42+
Body: taskJson,
43+
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
44+
Priority: 0, // 0-9
45+
},
46+
); err != nil {
47+
log.Fatalf("Exchange Publish: %s", err)
48+
}
2849
}
29-
s := grpc.NewServer()
30-
proto.RegisterTaskServer(s, taskServer)
31-
if err := s.Serve(lis); err != nil {
50+
51+
monitoring := NewMonitoring()
52+
tasksDeferredService := BuildTriggerHook(monitoring, transport)
53+
taskServer := NewTaskServer(tasksDeferredService)
54+
55+
go func() {
56+
if err := tasksDeferredService.Run(); err != nil {
57+
log.Fatalf("failed run trigger hook: %v", err)
58+
}
59+
}()
60+
61+
go func() {
62+
if err := monitoring.Run(); err != nil {
63+
log.Fatalf("failed run monitoring: %v", err)
64+
}
65+
}()
66+
67+
if err := RunGrpcServer(taskServer); err != nil {
3268
log.Fatalf("failed to serve: %v", err)
3369
}
3470
}

monitoring.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package main
2+
3+
import (
4+
"github.com/influxdata/influxdb/client/v2"
5+
"github.com/pvelx/triggerHook/contracts"
6+
"log"
7+
"time"
8+
)
9+
10+
var sampleSize = 1000
11+
var chPointCap = 10000
12+
13+
type Monitoring struct {
14+
connection client.Client
15+
chPoint chan *client.Point
16+
}
17+
18+
func NewMonitoring() *Monitoring {
19+
c, err := client.NewHTTPClient(client.HTTPConfig{
20+
Addr: "http://localhost:8086",
21+
Username: "monitor",
22+
Password: "secret",
23+
})
24+
if err != nil {
25+
log.Fatalln("Error: ", err)
26+
}
27+
28+
return &Monitoring{
29+
connection: c,
30+
chPoint: make(chan *client.Point, chPointCap),
31+
}
32+
}
33+
34+
func (m *Monitoring) Run() error {
35+
for {
36+
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
37+
Database: "trigger_hook",
38+
Precision: "s",
39+
})
40+
if err != nil {
41+
return err
42+
}
43+
expire := time.After(5 * time.Second)
44+
for {
45+
select {
46+
case point := <-m.chPoint:
47+
bp.AddPoint(point)
48+
49+
if len(bp.Points()) == sampleSize {
50+
goto done
51+
}
52+
53+
case <-expire:
54+
goto done
55+
}
56+
}
57+
58+
done:
59+
if len(bp.Points()) > 0 {
60+
err = m.connection.Write(bp)
61+
if err != nil {
62+
return err
63+
}
64+
}
65+
}
66+
}
67+
68+
func (m *Monitoring) AddMeasurement(name string, event contracts.MeasurementEvent) {
69+
point, err := client.NewPoint(
70+
name,
71+
nil,
72+
map[string]interface{}{
73+
"value": event.Measurement,
74+
},
75+
event.Time,
76+
)
77+
if err != nil {
78+
log.Fatalln("Error: ", err)
79+
}
80+
81+
m.chPoint <- point
82+
}

0 commit comments

Comments
 (0)