Skip to content

Commit 7489b09

Browse files
committed
FIX: Fix shared channel
1 parent 36bac42 commit 7489b09

18 files changed

+1085
-1228
lines changed

CHANGELOG.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# Changelog
22

3-
## 0.35.0 - TBD
3+
## 0.34.1 - TBD
44

55
### Enhancements
66
- Added `InstitutionalPrioritization` variant to `MatchingAlgorithm`
77

8+
### Bug fixes
9+
- Improved memory usage of historical streaming requests (`TimeseriesGetRange`)
10+
811
## 0.34.0 - 2025-04-22
912

1013
### Enhancements

cmake/SourcesAndHeaders.cmake

+4-2
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ set(headers
77
include/databento/dbn_decoder.hpp
88
include/databento/dbn_encoder.hpp
99
include/databento/dbn_file_store.hpp
10+
include/databento/detail/buffer.hpp
11+
include/databento/detail/dbn_buffer_decoder.hpp
1012
include/databento/detail/http_client.hpp
1113
include/databento/detail/json_helpers.hpp
1214
include/databento/detail/scoped_fd.hpp
1315
include/databento/detail/scoped_thread.hpp
14-
include/databento/detail/shared_channel.hpp
1516
include/databento/detail/tcp_client.hpp
1617
include/databento/detail/zstd_stream.hpp
1718
include/databento/enums.hpp
@@ -47,10 +48,11 @@ set(sources
4748
src/dbn_decoder.cpp
4849
src/dbn_encoder.cpp
4950
src/dbn_file_store.cpp
51+
src/detail/buffer.cpp
52+
src/detail/dbn_buffer_decoder.cpp
5053
src/detail/http_client.cpp
5154
src/detail/json_helpers.cpp
5255
src/detail/scoped_fd.cpp
53-
src/detail/shared_channel.cpp
5456
src/detail/tcp_client.cpp
5557
src/detail/zstd_stream.cpp
5658
src/enums.cpp

include/databento/dbn_decoder.hpp

+11-16
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include <string>
77

88
#include "databento/dbn.hpp"
9-
#include "databento/detail/shared_channel.hpp"
109
#include "databento/enums.hpp" // Upgrade Policy
1110
#include "databento/file_stream.hpp"
1211
#include "databento/ireadable.hpp"
@@ -18,7 +17,6 @@ namespace databento {
1817
// handled. Defaults to upgrading DBNv1 data to version 2 (the current version).
1918
class DbnDecoder {
2019
public:
21-
DbnDecoder(ILogReceiver* log_receiver, detail::SharedChannel channel);
2220
DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream);
2321
DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr<IReadable> input);
2422
DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr<IReadable> input,
@@ -27,7 +25,8 @@ class DbnDecoder {
2725
static std::pair<std::uint8_t, std::size_t> DecodeMetadataVersionAndSize(
2826
const std::byte* buffer, std::size_t size);
2927
static Metadata DecodeMetadataFields(std::uint8_t version,
30-
const std::vector<std::byte>& buffer);
28+
const std::byte* buffer,
29+
const std::byte* buffer_end);
3130
// Decodes a record possibly applying upgrading the data according to the
3231
// given version and upgrade policy. If an upgrade is applied,
3332
// compat_buffer is modified.
@@ -42,21 +41,17 @@ class DbnDecoder {
4241
const Record* DecodeRecord();
4342

4443
private:
45-
static std::string DecodeSymbol(
46-
std::size_t symbol_cstr_len,
47-
std::vector<std::byte>::const_iterator& buffer_it);
44+
static std::string DecodeSymbol(std::size_t symbol_cstr_len,
45+
const std::byte*& buffer);
4846
static std::vector<std::string> DecodeRepeatedSymbol(
49-
std::size_t symbol_cstr_len,
50-
std::vector<std::byte>::const_iterator& buffer_it,
51-
std::vector<std::byte>::const_iterator buffer_end_it);
47+
std::size_t symbol_cstr_len, const std::byte*& buffer,
48+
const std::byte* buffer_end);
5249
static std::vector<SymbolMapping> DecodeSymbolMappings(
53-
std::size_t symbol_cstr_len,
54-
std::vector<std::byte>::const_iterator& buffer_it,
55-
std::vector<std::byte>::const_iterator buffer_end_it);
56-
static SymbolMapping DecodeSymbolMapping(
57-
std::size_t symbol_cstr_len,
58-
std::vector<std::byte>::const_iterator& buffer_it,
59-
std::vector<std::byte>::const_iterator buffer_end_it);
50+
std::size_t symbol_cstr_len, const std::byte*& buffer,
51+
const std::byte* buffer_end);
52+
static SymbolMapping DecodeSymbolMapping(std::size_t symbol_cstr_len,
53+
const std::byte*& buffer,
54+
const std::byte* buffer_end);
6055
bool DetectCompression();
6156
std::size_t FillBuffer();
6257
std::size_t GetReadBufferSize() const;

include/databento/detail/buffer.hpp

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#pragma once
2+
3+
#include <cstddef>
4+
#include <memory>
5+
6+
#include "databento/ireadable.hpp"
7+
#include "databento/iwritable.hpp"
8+
9+
namespace databento::detail {
10+
class Buffer : public IReadable, public IWritable {
11+
public:
12+
Buffer() : Buffer(64 * std::size_t{1 << 10}) {}
13+
explicit Buffer(std::size_t init_capacity)
14+
: buf_{std::make_unique<std::byte[]>(init_capacity)},
15+
end_{buf_.get() + init_capacity},
16+
read_pos_{buf_.get()},
17+
write_pos_{buf_.get()} {}
18+
19+
size_t Write(const char* data, std::size_t length);
20+
size_t Write(const std::byte* data, std::size_t length);
21+
void WriteAll(const char* data, std::size_t length);
22+
void WriteAll(const std::byte* data, std::size_t length) override;
23+
24+
std::byte*& WriteBegin() { return write_pos_; }
25+
std::byte* WriteEnd() const { return end_; }
26+
std::size_t WriteCapacity() const {
27+
return static_cast<std::size_t>(end_ - write_pos_);
28+
}
29+
30+
/// Will throw if `length > ReadCapacity()`.
31+
void ReadExact(std::byte* buffer, std::size_t length) override;
32+
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
33+
34+
std::byte*& ReadBegin() { return read_pos_; }
35+
std::byte* ReadEnd() const { return write_pos_; }
36+
std::size_t ReadCapacity() const {
37+
return static_cast<std::size_t>(write_pos_ - read_pos_);
38+
}
39+
40+
std::size_t Capacity() const {
41+
return static_cast<std::size_t>(end_ - buf_.get());
42+
}
43+
void Clear() {
44+
read_pos_ = buf_.get();
45+
write_pos_ = buf_.get();
46+
}
47+
void Reserve(std::size_t capacity);
48+
void Shift();
49+
50+
private:
51+
std::unique_ptr<std::byte[]> buf_;
52+
std::byte* end_;
53+
std::byte* read_pos_{};
54+
std::byte* write_pos_{};
55+
};
56+
} // namespace databento::detail
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#pragma once
2+
3+
#include <cstddef>
4+
#include <cstdint>
5+
#include <memory>
6+
7+
#include "databento/detail/buffer.hpp"
8+
#include "databento/detail/zstd_stream.hpp"
9+
#include "databento/ireadable.hpp"
10+
#include "databento/record.hpp"
11+
#include "databento/timeseries.hpp"
12+
13+
namespace databento::detail {
14+
class DbnBufferDecoder {
15+
public:
16+
// The instance cannot outlive the lifetime of these references.
17+
DbnBufferDecoder(const MetadataCallback& metadata_callback,
18+
const RecordCallback& record_callback)
19+
: metadata_callback_{metadata_callback},
20+
record_callback_{record_callback},
21+
zstd_stream_{InitZstdBuffer()} {}
22+
23+
KeepGoing Process(const char* data, std::size_t length);
24+
25+
private:
26+
enum class DecoderState : std::uint8_t {
27+
Init,
28+
Metadata,
29+
Records,
30+
};
31+
32+
std::unique_ptr<IReadable> InitZstdBuffer() {
33+
auto zstd_buffer = std::make_unique<Buffer>();
34+
zstd_buffer_ = zstd_buffer.get();
35+
return zstd_buffer;
36+
}
37+
38+
const MetadataCallback& metadata_callback_;
39+
const RecordCallback& record_callback_;
40+
ZstdDecodeStream zstd_stream_;
41+
Buffer* zstd_buffer_;
42+
Buffer dbn_buffer_{};
43+
std::size_t bytes_needed_{};
44+
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
45+
std::uint8_t input_version_{};
46+
bool ts_out_;
47+
DecoderState state_{DecoderState::Init};
48+
};
49+
} // namespace databento::detail

include/databento/detail/shared_channel.hpp

-29
This file was deleted.

include/databento/live_blocking.hpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
#include <utility> // pair
1010
#include <vector>
1111

12-
#include "databento/datetime.hpp" // UnixNanos
13-
#include "databento/dbn.hpp" // Metadata
12+
#include "databento/datetime.hpp" // UnixNanos
13+
#include "databento/dbn.hpp" // Metadata
14+
#include "databento/detail/buffer.hpp"
1415
#include "databento/detail/tcp_client.hpp" // TcpClient
1516
#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy
1617
#include "databento/live_subscription.hpp"
@@ -118,10 +119,7 @@ class LiveBlocking {
118119
detail::TcpClient client_;
119120
std::uint32_t sub_counter_{};
120121
std::vector<LiveSubscription> subscriptions_;
121-
// Must be 8-byte aligned for records
122-
alignas(RecordHeader) std::array<std::byte, kMaxStrLen> read_buffer_{};
123-
std::size_t buffer_size_{};
124-
std::size_t buffer_idx_{};
122+
detail::Buffer buffer_{};
125123
// Must be 8-byte aligned for records
126124
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
127125
std::uint64_t session_id_;

0 commit comments

Comments
 (0)