Skip to content

Commit a3ff00f

Browse files
perf improvements
1 parent 1b51311 commit a3ff00f

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

internal/aggregator/aggregator.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,22 @@ func (a *Aggregator) worker(ctx context.Context, ch <-chan *model.Run) {
9797
if len(comp) > 0 {
9898
go a.up.Send(ctx, uploader.Batch{Data: comp, Boundary: boundary})
9999
}
100-
sc = serializer.NewStreamingCompressor()
101100
}
102101

103102
add := func(r *model.Run) {
103+
needFlush := false
104+
104105
scMu.Lock()
105-
defer scMu.Unlock()
106106
if err := sc.AddRun(r); err != nil {
107-
slog.Error("stream add", "err", err)
107+
scMu.Unlock()
108108
return
109109
}
110-
if sc.RunCount() >= a.cfg.BatchSize || sc.Uncompressed() >= a.cfg.MaxBufferBytes {
111-
go flush()
110+
needFlush = sc.RunCount() >= a.cfg.BatchSize ||
111+
sc.Uncompressed() >= a.cfg.MaxBufferBytes
112+
scMu.Unlock()
113+
114+
if needFlush {
115+
flush()
112116
}
113117
}
114118

internal/serializer/stream.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import (
1313

1414
// StreamingCompressor uses zstd + multipart/form-data to format and compress traces.
1515
//
16-
// Treat StreamingCompressor as a short-lived object for a single upload. Call Close()
17-
// when done to get the final bytes and cleanup resources. Then create a new StreamingCompressor
18-
// for the next upload.
16+
// Treat StreamingCompressor as a long-lived object for a multiple uploads. Call Close()
17+
// when done with a single upload to get the final bytes and cleanup resources. You should
18+
// reuse the StreamingCompressor object for multiple uploads.
1919
//
2020
// When interacting with the underlying buffer, the StreamingCompressor is not thread-safe.
2121
// It is the responsibility of the caller to ensure thread-safety by using a mutex.
@@ -89,7 +89,13 @@ func (sc *StreamingCompressor) Close() ([]byte, string, int, error) {
8989
if err := sc.w.Close(); err != nil {
9090
return nil, "", 0, err
9191
}
92-
return sc.buf.Bytes(), sc.boundary, sc.uncompressed, nil
92+
out := sc.buf.Bytes()
93+
uncompressed := sc.uncompressed
94+
// reset the buffer and writer for the next upload
95+
sc.buf = bytes.NewBuffer(nil)
96+
sc.w = zstd.NewWriter(sc.buf)
97+
sc.runCount, sc.uncompressed = 0, 0
98+
return out, sc.boundary, uncompressed, nil
9399
}
94100

95101
func (sc *StreamingCompressor) Uncompressed() int {

internal/translator/translator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type spanEntry struct {
1515
}
1616

1717
type Translator struct {
18+
converter *GenAiConverter
1819
mu sync.RWMutex
1920
span2trace map[string]spanEntry
2021
ttl time.Duration
@@ -50,7 +51,6 @@ func (t *Translator) Translate(req *collectortracepb.ExportTraceServiceRequest)
5051
}
5152

5253
runs := make([]*model.Run, 0, total)
53-
converter := &GenAiConverter{}
5454
for _, rs := range req.ResourceSpans {
5555
for _, ss := range rs.ScopeSpans {
5656
for _, span := range ss.Spans {
@@ -63,7 +63,7 @@ func (t *Translator) Translate(req *collectortracepb.ExportTraceServiceRequest)
6363
}
6464
}
6565
}
66-
run, err := converter.ConvertSpan(span, false)
66+
run, err := t.converter.ConvertSpan(span, false)
6767
if err != nil || run == nil {
6868
continue
6969
}

0 commit comments

Comments
 (0)