Skip to content

Commit 79ab9fc

Browse files
final cleanup
1 parent 906823e commit 79ab9fc

File tree

3 files changed

+36
-25
lines changed

3 files changed

+36
-25
lines changed

internal/aggregator/aggregator.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ func (a *Aggregator) worker(ctx context.Context, ch <-chan *model.Run) {
9191
}
9292
comp, boundary, _, err := sc.Close()
9393
if err != nil {
94-
slog.Error("serialize", "err", err)
94+
slog.Error("Failed to flush runs", "err", err)
9595
return
9696
}
9797
if len(comp) > 0 {
98-
go a.up.Send(ctx, uploader.Batch{Data: comp, Boundary: boundary})
98+
go a.up.Send(context.Background(), uploader.Batch{Data: comp, Boundary: boundary})
9999
}
100100
}
101101

@@ -104,6 +104,7 @@ func (a *Aggregator) worker(ctx context.Context, ch <-chan *model.Run) {
104104

105105
scMu.Lock()
106106
if err := sc.AddRun(r); err != nil {
107+
slog.Error("failed to queue run", "err", err)
107108
scMu.Unlock()
108109
return
109110
}

internal/handler/traces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ func TracesHandler(maxBody int64, tr *translator.Translator, ch chan *model.Run)
6666

6767
switch {
6868
case strings.Contains(ct, "json"):
69-
if err := decodeJSON(r.Body, &req); err != nil {
69+
if err := decodeJSON(reader, &req); err != nil {
7070
handleError(w, r, http.StatusBadRequest, err)
7171
return
7272
}
7373
case strings.Contains(ct, "protobuf"), strings.HasSuffix(ct, "+proto"):
74-
if err := decodeProto(r.Body, &req); err != nil {
74+
if err := decodeProto(reader, &req); err != nil {
7575
handleError(w, r, http.StatusBadRequest, err)
7676
return
7777
}

internal/serializer/stream.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55
"encoding/json"
66
"fmt"
77
"io"
8+
"log/slog"
9+
"math/rand/v2"
10+
"strconv"
811

912
"github.com/DataDog/zstd"
10-
"github.com/google/uuid"
1113
"github.com/langchain-ai/langsmith-collector-proxy/internal/model"
1214
)
1315

@@ -29,12 +31,12 @@ type StreamingCompressor struct {
2931
}
3032

3133
func NewStreamingCompressor() *StreamingCompressor {
32-
var buf bytes.Buffer
33-
zw := zstd.NewWriter(&buf)
34+
buf := &bytes.Buffer{}
35+
zw := zstd.NewWriter(buf)
3436
return &StreamingCompressor{
35-
boundary: "BOUNDARY-" + uuid.NewString(),
37+
boundary: "----LangSmithFormBoundary-" + strconv.FormatUint(rand.Uint64(), 36),
3638
w: zw,
37-
buf: &buf,
39+
buf: buf,
3840
}
3941
}
4042

@@ -78,24 +80,32 @@ func (sc *StreamingCompressor) AddRun(r *model.Run) error {
7880
}
7981

8082
func (sc *StreamingCompressor) Close() ([]byte, string, int, error) {
81-
// Write the final multipart boundary.
82-
if sc.runCount == 0 || sc.uncompressed == 0 {
83-
return nil, "", 0, nil
84-
}
85-
if _, err := sc.w.Write([]byte(fmt.Sprintf("--%s--\r\n", sc.boundary))); err != nil {
86-
return nil, "", 0, err
87-
}
88-
// Close the zstd writer.
89-
if err := sc.w.Close(); err != nil {
90-
return nil, "", 0, err
83+
// Write the final multipart boundary and reset state.
84+
hasData := sc.runCount > 0 || sc.uncompressed > 0
85+
var (
86+
outBytes []byte
87+
boundary string
88+
uncompressed int
89+
err error
90+
)
91+
92+
if hasData {
93+
if _, err = sc.w.Write([]byte(fmt.Sprintf("--%s--\r\n", sc.boundary))); err == nil {
94+
err = sc.w.Close()
95+
}
96+
if err == nil {
97+
outBytes = sc.buf.Bytes()
98+
boundary = sc.boundary
99+
uncompressed = sc.uncompressed
100+
}
101+
} else {
102+
slog.Error("Failed to write final multipart boundary", "err", err)
91103
}
92-
out := sc.buf.Bytes()
93-
uncompressed := sc.uncompressed
94-
// reset the buffer and writer for the next upload
95-
sc.buf = bytes.NewBuffer(nil)
104+
sc.buf = &bytes.Buffer{}
105+
sc.boundary = "----LangSmithFormBoundary-" + strconv.FormatUint(rand.Uint64(), 36)
96106
sc.w = zstd.NewWriter(sc.buf)
97-
sc.runCount, sc.uncompressed = 0, 0
98-
return out, sc.boundary, uncompressed, nil
107+
sc.runCount = 0
108+
return outBytes, boundary, uncompressed, err
99109
}
100110

101111
func (sc *StreamingCompressor) Uncompressed() int {

0 commit comments

Comments
 (0)