Skip to content

Commit b35098a

Browse files
use compressed buffer
1 parent f267090 commit b35098a

File tree

11 files changed

+165
-260
lines changed

11 files changed

+165
-260
lines changed

cmd/collector/main.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ import (
1111

1212
"github.com/langchain-ai/langsmith-collector-proxy/internal/aggregator"
1313
"github.com/langchain-ai/langsmith-collector-proxy/internal/config"
14-
"github.com/langchain-ai/langsmith-collector-proxy/internal/queue"
14+
"github.com/langchain-ai/langsmith-collector-proxy/internal/model"
1515
"github.com/langchain-ai/langsmith-collector-proxy/internal/server"
1616
"github.com/langchain-ai/langsmith-collector-proxy/internal/uploader"
1717
)
1818

1919
func main() {
2020
cfg := config.Load()
2121

22-
qm := queue.New(cfg.QueueSize)
23-
trSrv := server.NewRouter(cfg, qm)
22+
ch := make(chan *model.Run, 1024)
23+
trSrv := server.NewRouter(cfg, ch)
2424

2525
up := uploader.New(uploader.Config{
2626
BaseURL: cfg.LangsmithHost,
@@ -30,13 +30,12 @@ func main() {
3030
InFlight: 100,
3131
APIKey: cfg.DefaultAPIKey,
3232
})
33-
up.Start()
34-
defer up.Stop()
3533

36-
agg := aggregator.New(qm, up, aggregator.Config{
37-
BatchSize: cfg.BatchSize,
38-
FlushInterval: cfg.FlushInterval,
39-
})
34+
agg := aggregator.New(up, aggregator.Config{
35+
BatchSize: cfg.BatchSize,
36+
FlushInterval: cfg.FlushInterval,
37+
MaxBufferBytes: cfg.MaxBufferBytes,
38+
}, ch)
4039
agg.Start()
4140
defer agg.Stop()
4241

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/go-chi/chi/v5 v5.2.1
88
github.com/google/uuid v1.6.0
99
go.opentelemetry.io/proto/otlp v1.6.0
10+
golang.org/x/sync v0.14.0
1011
google.golang.org/protobuf v1.36.6
1112
)
1213

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ go.opentelemetry.io/proto/otlp v1.6.0 h1:jQjP+AQyTf+Fe7OKj/MfkDrmK4MNVtw2NpXsf9f
3030
go.opentelemetry.io/proto/otlp v1.6.0/go.mod h1:cicgGehlFuNdgZkcALOCh3VE6K/u2tAjzlRhDwmVpZc=
3131
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
3232
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
33+
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
34+
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
3335
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
3436
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
3537
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=

internal/aggregator/aggregator.go

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,59 +7,44 @@ import (
77
"time"
88

99
"github.com/langchain-ai/langsmith-collector-proxy/internal/model"
10-
"github.com/langchain-ai/langsmith-collector-proxy/internal/queue"
1110
"github.com/langchain-ai/langsmith-collector-proxy/internal/serializer"
1211
"github.com/langchain-ai/langsmith-collector-proxy/internal/uploader"
1312
"github.com/langchain-ai/langsmith-collector-proxy/internal/util"
1413
)
1514

1615
type Config struct {
17-
BatchSize int
18-
FlushInterval time.Duration
19-
20-
GCInterval time.Duration
21-
EntryTTL time.Duration
16+
BatchSize int
17+
FlushInterval time.Duration
18+
MaxBufferBytes int
19+
GCInterval time.Duration
20+
EntryTTL time.Duration
2221
}
2322

2423
type Aggregator struct {
25-
qm *queue.QueueManager
24+
ch chan *model.Run
2625
cfg Config
2726
up *uploader.Uploader
2827
cancel context.CancelFunc
2928
}
3029

31-
func New(qm *queue.QueueManager, up *uploader.Uploader, cfg Config) *Aggregator {
30+
func New(up *uploader.Uploader, cfg Config, ch chan *model.Run) *Aggregator {
3231
if cfg.GCInterval == 0 {
3332
cfg.GCInterval = 2 * time.Minute
3433
}
3534
if cfg.EntryTTL == 0 {
3635
cfg.EntryTTL = 5 * time.Minute
3736
}
38-
return &Aggregator{qm: qm, cfg: cfg, up: up}
37+
if cfg.MaxBufferBytes == 0 {
38+
cfg.MaxBufferBytes = 10 * 1024 * 1024 // 10MB
39+
}
40+
return &Aggregator{up: up, cfg: cfg, ch: ch}
3941
}
4042

4143
func (a *Aggregator) Start() {
4244
ctx, cancel := context.WithCancel(context.Background())
4345
a.cancel = cancel
4446

45-
started := map[string]struct{}{}
46-
go func() {
47-
tick := time.NewTicker(1 * time.Second)
48-
defer tick.Stop()
49-
for {
50-
select {
51-
case <-ctx.Done():
52-
return
53-
case <-tick.C:
54-
for project, ch := range a.qm.All() {
55-
if _, ok := started[project]; !ok {
56-
started[project] = struct{}{}
57-
go a.worker(ctx, project, ch)
58-
}
59-
}
60-
}
61-
}
62-
}()
47+
go a.worker(ctx, a.ch)
6348
}
6449

6550
func (a *Aggregator) Stop() {
@@ -68,10 +53,9 @@ func (a *Aggregator) Stop() {
6853
}
6954
}
7055

71-
func (a *Aggregator) worker(ctx context.Context, project string, ch <-chan *model.Run) {
72-
batch := make([]*model.Run, 0, a.cfg.BatchSize)
73-
flushTicker := time.NewTicker(a.cfg.FlushInterval)
74-
defer flushTicker.Stop()
56+
func (a *Aggregator) worker(ctx context.Context, ch <-chan *model.Run) {
57+
sc := serializer.NewStreamingCompressor()
58+
var scMu sync.Mutex
7559

7660
type entry struct {
7761
dotted string
@@ -82,28 +66,36 @@ func (a *Aggregator) worker(ctx context.Context, project string, ch <-chan *mode
8266
waitingChildren := make(map[string][]*model.Run) // parentID → parked kids
8367
waitingSince := make(map[string]time.Time) // parentID → first‑park time
8468

85-
var mu sync.Mutex
86-
8769
flush := func() {
88-
if len(batch) == 0 {
70+
scMu.Lock()
71+
defer scMu.Unlock()
72+
if sc == nil || sc.RunCount() == 0 {
8973
return
9074
}
91-
comp, boundary, _, err := serializer.SerialiseAndCompress(batch)
75+
comp, boundary, _, err := sc.Close()
9276
if err != nil {
9377
slog.Error("serialize", "err", err)
9478
return
9579
}
96-
a.up.Enqueue(uploader.Batch{Project: project, Data: comp, Boundary: boundary})
97-
batch = batch[:0]
80+
go a.up.Send(ctx, uploader.Batch{Data: comp, Boundary: boundary})
81+
sc = serializer.NewStreamingCompressor()
9882
}
9983

10084
add := func(r *model.Run) {
101-
batch = append(batch, r)
102-
if len(batch) >= a.cfg.BatchSize {
103-
flush()
85+
scMu.Lock()
86+
defer scMu.Unlock()
87+
if err := sc.AddRun(r); err != nil {
88+
slog.Error("stream add", "err", err)
89+
return
90+
}
91+
if sc.RunCount() >= a.cfg.BatchSize || sc.Uncompressed() >= a.cfg.MaxBufferBytes {
92+
go flush()
10493
}
10594
}
10695

96+
flushTicker := time.NewTicker(a.cfg.FlushInterval)
97+
defer flushTicker.Stop()
98+
10799
var cascade func(parentID, parentDotted string)
108100
cascade = func(parentID, parentDotted string) {
109101
kids := waitingChildren[parentID]
@@ -157,16 +149,13 @@ func (a *Aggregator) worker(ctx context.Context, project string, ch <-chan *mode
157149
for {
158150
select {
159151
case <-ctx.Done():
160-
mu.Lock()
161152
flush()
162-
mu.Unlock()
163153
return
164154

165155
case r := <-ch:
166156
if r == nil {
167157
continue
168158
}
169-
mu.Lock()
170159
switch {
171160
case r.ParentRunID == nil || *r.ParentRunID == "":
172161
d := util.NewDottedOrder(*r.ID)
@@ -190,17 +179,12 @@ func (a *Aggregator) worker(ctx context.Context, project string, ch <-chan *mode
190179
}
191180
}
192181
}
193-
mu.Unlock()
194182

195183
case <-flushTicker.C:
196-
mu.Lock()
197184
flush()
198-
mu.Unlock()
199185

200186
case <-gcTicker.C:
201-
mu.Lock()
202187
gc()
203-
mu.Unlock()
204188
}
205189
}
206190
}

internal/config/config.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import (
99
type Config struct {
1010
Port string
1111
MaxBodyBytes int64
12-
QueueSize int
1312
LangsmithHost string
1413
DefaultAPIKey string
1514
DefaultProject string
1615
BatchSize int
1716
FlushInterval time.Duration
17+
MaxBufferBytes int
1818
MaxRetries int
1919
BackoffInitial time.Duration
2020
}
@@ -44,10 +44,16 @@ func Load() Config {
4444
LangsmithHost: env("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com"),
4545
DefaultAPIKey: env("LANGSMITH_API_KEY", ""),
4646
DefaultProject: env("LANGSMITH_PROJECT", ""),
47-
// Collector Config
48-
QueueSize: int(envInt64("QUEUE_SIZE", 1000)),
49-
BatchSize: int(envInt64("BATCH_SIZE", 100)), // default 100 runs
50-
FlushInterval: time.Duration(envInt64("FLUSH_INTERVAL_MS", 5000)) * time.Millisecond,
47+
// Collector Config. The following values control how frequently and how many runs are sent to LangSmith.
48+
// The collector will buffer zstd compressed runs in memory until one of the following conditions is met:
49+
// 1. The buffer size exceeds MaxBufferBytes.
50+
// 2. The flush interval is reached.
51+
// 3. The batch size is reached.
52+
// Because zstd compression can be extremely effective, we use the uncompressed size of the buffer to determine how much data
53+
// will be sent to LangSmith.
54+
BatchSize: int(envInt64("BATCH_SIZE", 100)), // default 100 runs
55+
FlushInterval: time.Duration(envInt64("FLUSH_INTERVAL_MS", 5000)) * time.Millisecond,
56+
MaxBufferBytes: int(envInt64("MAX_BUFFER_BYTES", 10*1024*1024)), // default 10MB
5157
// Uploader Config
5258
MaxRetries: int(envInt64("MAX_RETRIES", 3)),
5359
BackoffInitial: time.Duration(envInt64("RETRY_BACKOFF_MS", 100)) * time.Millisecond,

internal/handler/traces.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import (
99
"net/http"
1010
"strings"
1111

12-
"github.com/langchain-ai/langsmith-collector-proxy/internal/contextkey"
13-
"github.com/langchain-ai/langsmith-collector-proxy/internal/queue"
12+
"github.com/langchain-ai/langsmith-collector-proxy/internal/model"
1413
"github.com/langchain-ai/langsmith-collector-proxy/internal/translator"
1514
collectortracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
1615
"google.golang.org/protobuf/encoding/protojson"
@@ -41,14 +40,14 @@ func handleError(w http.ResponseWriter, r *http.Request, status int, err error)
4140
}
4241

4342
// TracesHandler ingests OTLP/HTTP requests.
44-
func TracesHandler(maxBody int64, tr *translator.Translator, qm *queue.QueueManager) http.HandlerFunc {
43+
func TracesHandler(maxBody int64, tr *translator.Translator, ch chan *model.Run) http.HandlerFunc {
4544
return func(w http.ResponseWriter, r *http.Request) {
4645
r.Body = http.MaxBytesReader(w, r.Body, maxBody)
4746
defer r.Body.Close()
4847
var reader io.Reader = r.Body
4948
switch strings.ToLower(strings.TrimSpace(r.Header.Get("Content-Encoding"))) {
5049
case "", "identity":
51-
case "gzip":
50+
case "gzip": // todo add comment about this ( add comments in genral)
5251
gzr, err := gzip.NewReader(reader)
5352
if err != nil {
5453
handleError(w, r, http.StatusBadRequest, err)
@@ -82,24 +81,12 @@ func TracesHandler(maxBody int64, tr *translator.Translator, qm *queue.QueueMana
8281

8382
runs := tr.Translate(&req)
8483

85-
project := r.Header.Get("Langsmith-Project")
86-
if project == "" {
87-
if v := r.Context().Value(contextkey.ProjectIDKey); v != nil {
88-
project, _ = v.(string)
89-
}
90-
}
84+
project := r.Header.Get("Langsmith-Project") // todo: remove queues by project
9185
for _, run := range runs {
9286
if project != "" {
9387
run.SessionName = &project
9488
}
95-
if err := qm.Enqueue(r.Context(), project, run); err != nil {
96-
if errors.Is(err, queue.ErrContextDone) {
97-
handleError(w, r, 499, err)
98-
} else {
99-
handleError(w, r, http.StatusServiceUnavailable, err)
100-
}
101-
return
102-
}
89+
ch <- run
10390
}
10491
w.WriteHeader(http.StatusAccepted)
10592
}

internal/queue/queue.go

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)