From a794f69623f272c2eb17ba2f78ede772ac42192e Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 31 Mar 2026 08:43:34 -0500 Subject: [PATCH 1/8] FIX: Fix Python type stubs --- src/enums.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enums.cpp b/src/enums.cpp index d3ab534..8fd3148 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -114,7 +114,7 @@ const char* ToString(SlowReaderBehavior slow_reader_behavior) { return "warn"; } case SlowReaderBehavior::Skip: { - return "skip"; + return "drop"; } default: { return "Unknown"; From c2a4f8630255510fb948d0656d8060c474654eda Mon Sep 17 00:00:00 2001 From: Rob Maierle Date: Mon, 6 Apr 2026 13:23:29 -0400 Subject: [PATCH 2/8] MOD: Update MEMOIR capitalization --- include/databento/publishers.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/databento/publishers.hpp b/include/databento/publishers.hpp index 10ca10a..c44850b 100644 --- a/include/databento/publishers.hpp +++ b/include/databento/publishers.hpp @@ -145,7 +145,7 @@ enum class Dataset : std::uint16_t { XcisBbo = 13, // NYSE National Trades XcisTrades = 14, - // MEMX Memoir Depth + // MEMX MEMOIR Depth MemxMemoir = 15, // MIAX Pearl Depth EprlDom = 16, @@ -231,7 +231,7 @@ enum class Publisher : std::uint16_t { XcisBboXcis = 13, // NYSE National Trades XcisTradesXcis = 14, - // MEMX Memoir Depth + // MEMX MEMOIR Depth MemxMemoirMemx = 15, // MIAX Pearl Depth EprlDomEprl = 16, From 34c9816a5de8738309521233aea569aad18fa884 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 6 Apr 2026 16:06:38 -0500 Subject: [PATCH 3/8] REF: Refactor `ts_out` handling in Python structs --- src/enums.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/enums.cpp b/src/enums.cpp index 8fd3148..d3ab534 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -114,7 +114,7 @@ const char* ToString(SlowReaderBehavior slow_reader_behavior) { return "warn"; } case SlowReaderBehavior::Skip: { - return "drop"; + return "skip"; } default: { return "Unknown"; From ada2f31c56613f584977555233c2bd91a652dbef Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 6 Apr 2026 16:03:08 -0500 Subject: [PATCH 4/8] ADD: Add `TryNextRecord` and `FillBuffer` to C++ --- CHANGELOG.md | 6 ++ include/databento/live_blocking.hpp | 22 ++++- src/live_blocking.cpp | 36 +++++-- tests/src/live_blocking_tests.cpp | 146 ++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7935fd3..8d78603 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.53.0 - TBD + +### Enhancements +- Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained + control around I/O + ## 0.52.0 - 2026-03-31 ### Enhancements diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index d8ae6ef..5af357c 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -81,6 +81,26 @@ class LiveBlocking { // // This method should only be called after `Start`. const Record* NextRecord(std::chrono::milliseconds timeout); + // Returns the next record from the internal buffer without performing any + // I/O. Returns `nullptr` if no complete record is buffered. The returned + // pointer is valid until the next call to `TryNextRecord`, `NextRecord`, + // or `FillBuffer`. + // + // This method should only be called after `Start`. + const Record* TryNextRecord(); + // Reads available data from the connection into the internal buffer using + // the heartbeat timeout. Returns the number of bytes read and the status. + // A `read_size` of 0 with `Status::Closed` indicates the connection was + // closed by the gateway. + // + // This method should only be called after `Start`. + IReadable::Result FillBuffer(); + // Reads available data from the connection into the internal buffer. + // Returns the number of bytes read and the status. A `read_size` of 0 with + // `Status::Closed` indicates the connection was closed by the gateway. + // + // This method should only be called after `Start`. + IReadable::Result FillBuffer(std::chrono::milliseconds timeout); // Stops the session with the gateway. Once stopped, the session cannot be // restarted. void Stop(); @@ -117,7 +137,7 @@ class LiveBlocking { void IncrementSubCounter(); void Subscribe(std::string_view sub_msg, const std::vector& symbols, bool use_snapshot); - IReadable::Result FillBuffer(std::chrono::milliseconds timeout); + const Record* ConsumeBufferedRecord(); RecordHeader* BufferRecordHeader(); std::chrono::milliseconds HeartbeatTimeout() const; void CheckHeartbeatTimeout() const; diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 85101f4..8f1e745 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -196,9 +196,8 @@ const databento::Record& LiveBlocking::NextRecord() { } const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds timeout) { - // need some unread_bytes - const auto unread_bytes = buffer_.ReadCapacity(); - if (unread_bytes == 0) { + // need at least a header to read the record size + if (buffer_.ReadCapacity() < sizeof(RecordHeader)) { const auto read_res = FillBuffer(timeout); if (read_res.status == Status::Timeout) { CheckHeartbeatTimeout(); @@ -208,7 +207,7 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time throw LiveApiError{"Gateway closed the session"}; } } - // check length + // wait for the full record while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) { const auto read_res = FillBuffer(timeout); if (read_res.status == Status::Timeout) { @@ -219,12 +218,17 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time throw LiveApiError{"Gateway closed the session"}; } } - current_record_ = Record{BufferRecordHeader()}; - const auto bytes_to_consume = current_record_.Size(); - buffer_.ConsumeNoShift(bytes_to_consume); - current_record_ = DbnDecoder::DecodeRecordCompat( - version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_); - return ¤t_record_; + return ConsumeBufferedRecord(); +} + +const databento::Record* LiveBlocking::TryNextRecord() { + if (buffer_.ReadCapacity() < sizeof(RecordHeader)) { + return nullptr; + } + if (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) { + return nullptr; + } + return ConsumeBufferedRecord(); } void LiveBlocking::Stop() { connection_.Close(); } @@ -447,6 +451,10 @@ void LiveBlocking::IncrementSubCounter() { } } +databento::IReadable::Result LiveBlocking::FillBuffer() { + return FillBuffer(HeartbeatTimeout()); +} + databento::IReadable::Result LiveBlocking::FillBuffer( std::chrono::milliseconds timeout) { buffer_.Shift(); @@ -459,6 +467,14 @@ databento::IReadable::Result LiveBlocking::FillBuffer( return read_res; } +const databento::Record* LiveBlocking::ConsumeBufferedRecord() { + current_record_ = Record{BufferRecordHeader()}; + buffer_.ConsumeNoShift(current_record_.Size()); + current_record_ = DbnDecoder::DecodeRecordCompat( + version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_); + return ¤t_record_; +} + databento::RecordHeader* LiveBlocking::BufferRecordHeader() { return reinterpret_cast(buffer_.ReadBegin()); } diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index fabcfc8..e2d1cfe 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -15,6 +15,7 @@ #include "databento/datetime.hpp" #include "databento/enums.hpp" // Schema, SType #include "databento/exceptions.hpp" +#include "databento/ireadable.hpp" #include "databento/live.hpp" #include "databento/live_blocking.hpp" #include "databento/live_subscription.hpp" @@ -765,4 +766,149 @@ TEST_F(LiveBlockingTests, TestHeartbeatTimeoutOnNextRecordWithTimeout) { EXPECT_TRUE(got_timeout_exception) << "Expected heartbeat timeout exception"; } +TEST_F(LiveBlockingTests, TestTryNextRecordEmptyBuffer) { + constexpr auto kTsOut = false; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, [](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + std::this_thread::sleep_for(std::chrono::milliseconds{200}); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + // Buffer is empty, no I/O should be performed + const auto* rec = target.TryNextRecord(); + EXPECT_EQ(rec, nullptr); +} + +TEST_F(LiveBlockingTests, TestTryNextRecordAfterFillBuffer) { + constexpr auto kTsOut = false; + constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; + const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, + [kRec](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.SendRecord(kRec); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); + ASSERT_EQ(fill_res.status, IReadable::Status::Ok); + ASSERT_GT(fill_res.read_size, 0); + const auto* rec = target.TryNextRecord(); + ASSERT_NE(rec, nullptr); + ASSERT_TRUE(rec->Holds()); + EXPECT_EQ(rec->Get(), kRec); + // Buffer drained + EXPECT_EQ(target.TryNextRecord(), nullptr); +} + +TEST_F(LiveBlockingTests, TestFillBufferReturnsClosed) { + constexpr auto kTsOut = false; + const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, + [](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.Close(); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); + EXPECT_EQ(fill_res.status, IReadable::Status::Closed); + EXPECT_EQ(fill_res.read_size, 0); +} + +TEST_F(LiveBlockingTests, TestTryNextRecordPollLoop) { + constexpr auto kTsOut = false; + constexpr auto kRecCount = 5; + constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; + const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, + [kRec](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + for (size_t i = 0; i < kRecCount; ++i) { + self.SendRecord(kRec); + } + self.Close(); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + int record_count = 0; + while (true) { + while (const auto* rec = target.TryNextRecord()) { + ASSERT_TRUE(rec->Holds()); + EXPECT_EQ(rec->Get(), kRec); + ++record_count; + } + const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); + if (fill_res.status == IReadable::Status::Closed) { + break; + } + } + EXPECT_EQ(record_count, kRecCount); +} + +TEST_F(LiveBlockingTests, TestTryNextRecordPartialRecord) { + constexpr auto kTsOut = false; + constexpr MboMsg kRec{DummyHeader(RType::Mbo), + 1, + 2, + 3, + {}, + 4, + Action::Add, + Side::Bid, + UnixNanos{}, + TimeDeltaNanos{}, + 100}; + + bool send_remaining{}; + std::mutex send_remaining_mutex; + std::condition_variable send_remaining_cv; + const mock::MockLsgServer mock_server{ + dataset::kGlbxMdp3, kTsOut, + [kRec, &send_remaining, &send_remaining_mutex, + &send_remaining_cv](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.SplitSendRecord(kRec, send_remaining, send_remaining_mutex, + send_remaining_cv); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kGlbxMdp3) + .SetSendTsOut(kTsOut) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + // Read partial record (just header) + auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); + ASSERT_EQ(fill_res.status, IReadable::Status::Ok); + // Record is incomplete + EXPECT_EQ(target.TryNextRecord(), nullptr); + // Signal server to send remaining bytes + { + const std::lock_guard lock{send_remaining_mutex}; + send_remaining = true; + send_remaining_cv.notify_one(); + } + // Read the rest + fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); + ASSERT_EQ(fill_res.status, IReadable::Status::Ok); + const auto* rec = target.TryNextRecord(); + ASSERT_NE(rec, nullptr); + ASSERT_TRUE(rec->Holds()); + EXPECT_EQ(rec->Get(), kRec); +} + } // namespace databento::tests From d8fda10af6b5bef02f74f15088fcc2f51bac4aa1 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 6 Apr 2026 16:04:40 -0500 Subject: [PATCH 5/8] ADD: Add connection and auth timeouts for Live --- CHANGELOG.md | 3 + include/databento/detail/live_connection.hpp | 2 + include/databento/detail/tcp_client.hpp | 1 + include/databento/live.hpp | 4 + include/databento/live_blocking.hpp | 20 ++++- include/databento/live_threaded.hpp | 9 +- src/detail/live_connection.cpp | 4 + src/detail/tcp_client.cpp | 86 +++++++++++++++++--- src/live.cpp | 17 +++- src/live_blocking.cpp | 61 ++++++++++---- src/live_threaded.cpp | 17 +++- tests/src/live_blocking_tests.cpp | 57 +++++++++++-- 12 files changed, 232 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d78603..55f98c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ ### Enhancements - Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained control around I/O +- Added `TimeoutConf` struct and `SetTimeoutConf()` builder method for configuring connect + and auth timeouts on the Live client (defaults to 10s and 30s) +- Added `SessionId()` and `Timeouts()` getters to `LiveBlocking` and `LiveThreaded` ## 0.52.0 - 2026-03-31 diff --git a/include/databento/detail/live_connection.hpp b/include/databento/detail/live_connection.hpp index ae389fd..1093cfe 100644 --- a/include/databento/detail/live_connection.hpp +++ b/include/databento/detail/live_connection.hpp @@ -25,6 +25,8 @@ class LiveConnection : IWritable { public: LiveConnection(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port); + LiveConnection(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port, TcpClient::RetryConf retry_conf); void WriteAll(std::string_view str); void WriteAll(const std::byte* buffer, std::size_t size); diff --git a/include/databento/detail/tcp_client.hpp b/include/databento/detail/tcp_client.hpp index 13a9989..8c45a0b 100644 --- a/include/databento/detail/tcp_client.hpp +++ b/include/databento/detail/tcp_client.hpp @@ -20,6 +20,7 @@ class TcpClient { struct RetryConf { std::uint32_t max_attempts{1}; std::chrono::seconds max_wait{std::chrono::minutes{1}}; + std::chrono::seconds connect_timeout{std::chrono::seconds{10}}; }; TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port); diff --git a/include/databento/live.hpp b/include/databento/live.hpp index d9f31ed..2abcd19 100644 --- a/include/databento/live.hpp +++ b/include/databento/live.hpp @@ -57,6 +57,9 @@ class LiveBuilder { LiveBuilder& SetCompression(Compression compression); // Sets the behavior of the gateway when the client falls behind real time. LiveBuilder& SetSlowReaderBehavior(SlowReaderBehavior slow_reader_behavior); + // Sets the timeouts for connecting and authenticating with the gateway. + // Defaults to 10 seconds for connect and 30 seconds for auth. + LiveBuilder& SetTimeoutConf(TimeoutConf timeout_conf); /* * Build a live client instance @@ -85,5 +88,6 @@ class LiveBuilder { std::string user_agent_ext_; Compression compression_{Compression::None}; std::optional slow_reader_behavior_{}; + TimeoutConf timeout_conf_{}; }; } // namespace databento diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index 5af357c..9282f44 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -21,6 +21,13 @@ namespace databento { // Forward declaration class ILogReceiver; class LiveBuilder; + +// Timeouts for the Live client's connection and authentication phases. +struct TimeoutConf { + std::chrono::seconds connect{10}; + std::chrono::seconds auth{30}; +}; + class LiveThreaded; // A client for interfacing with Databento's real-time and intraday replay @@ -48,6 +55,8 @@ class LiveBlocking { std::optional SlowReaderBehavior() const { return slow_reader_behavior_; } + const databento::TimeoutConf& TimeoutConf() const { return timeout_conf_; } + std::uint64_t SessionId() const { return session_id_; } const std::vector& Subscriptions() const { return subscriptions_; } std::vector& Subscriptions() { return subscriptions_; } @@ -119,21 +128,23 @@ class LiveBlocking { std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior); + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf); LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior); + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf); std::string DetermineGateway() const; std::uint64_t Authenticate(); - std::string DecodeChallenge(); + std::string DecodeChallenge(std::chrono::milliseconds timeout); std::string GenerateCramReply(std::string_view challenge_key); std::string EncodeAuthReq(std::string_view auth); - std::uint64_t DecodeAuthResp(); + std::uint64_t DecodeAuthResp(std::chrono::milliseconds timeout); void IncrementSubCounter(); void Subscribe(std::string_view sub_msg, const std::vector& symbols, bool use_snapshot); @@ -156,6 +167,7 @@ class LiveBlocking { const std::optional heartbeat_interval_; const databento::Compression compression_; const std::optional slow_reader_behavior_; + const databento::TimeoutConf timeout_conf_; detail::LiveConnection connection_; std::uint32_t sub_counter_{}; std::vector subscriptions_; diff --git a/include/databento/live_threaded.hpp b/include/databento/live_threaded.hpp index e427597..6c97c0f 100644 --- a/include/databento/live_threaded.hpp +++ b/include/databento/live_threaded.hpp @@ -12,6 +12,7 @@ #include "databento/datetime.hpp" // UnixNanos #include "databento/detail/scoped_thread.hpp" // ScopedThread #include "databento/enums.hpp" // Schema, SType +#include "databento/live_blocking.hpp" // TimeoutConf #include "databento/live_subscription.hpp" #include "databento/timeseries.hpp" // MetadataCallback, RecordCallback @@ -56,6 +57,8 @@ class LiveThreaded { std::optional HeartbeatInterval() const; databento::Compression Compression() const; std::optional SlowReaderBehavior() const; + const databento::TimeoutConf& TimeoutConf() const; + std::uint64_t SessionId() const; const std::vector& Subscriptions() const; std::vector& Subscriptions(); @@ -111,14 +114,16 @@ class LiveThreaded { std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior); + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf); LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior); + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf); // unique_ptr to be movable std::unique_ptr impl_; diff --git a/src/detail/live_connection.cpp b/src/detail/live_connection.cpp index dd5a057..31537b6 100644 --- a/src/detail/live_connection.cpp +++ b/src/detail/live_connection.cpp @@ -10,6 +10,10 @@ LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& ga std::uint16_t port) : client_{log_receiver, gateway, port} {} +LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port, TcpClient::RetryConf retry_conf) + : client_{log_receiver, gateway, port, retry_conf} {} + void LiveConnection::WriteAll(std::string_view str) { client_.WriteAll(str); } void LiveConnection::WriteAll(const std::byte* buffer, std::size_t size) { diff --git a/src/detail/tcp_client.cpp b/src/detail/tcp_client.cpp index e1941fa..1fc6eb0 100644 --- a/src/detail/tcp_client.cpp +++ b/src/detail/tcp_client.cpp @@ -3,11 +3,12 @@ #ifdef _WIN32 #include // closesocket, recv, send, socket #else +#include // fcntl, F_GETFL, F_SETFL, O_NONBLOCK #include // addrinfo, gai_strerror, getaddrinfo, freeaddrinfo #include // htons, IPPROTO_TCP #include // pollfd -#include // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM -#include // close, ssize_t +#include // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM, getsockopt, SO_ERROR, SOL_SOCKET +#include // close, ssize_t #include // errno #endif @@ -31,6 +32,55 @@ int GetErrNo() { return errno; #endif } + +int Poll(::pollfd* fds, std::uint32_t nfds, int timeout_ms) { +#ifdef _WIN32 + return ::WSAPoll(fds, nfds, timeout_ms); +#else + return ::poll(fds, static_cast<::nfds_t>(nfds), timeout_ms); +#endif +} + +#ifdef _WIN32 +constexpr int kConnectInProgress = WSAEWOULDBLOCK; +#else +constexpr int kConnectInProgress = EINPROGRESS; +#endif + +// Saves the current blocking state, sets non-blocking, and returns a RAII guard +// that restores the original state on destruction. +struct BlockingGuard { + databento::detail::Socket fd; +#ifdef _WIN32 + // No state to save on Windows +#else + int original_flags; +#endif + + explicit BlockingGuard(databento::detail::Socket fd) : fd{fd} { +#ifdef _WIN32 + unsigned long mode = 1; + ::ioctlsocket(fd, FIONBIO, &mode); +#else + original_flags = ::fcntl(fd, F_GETFL, 0); + ::fcntl(fd, F_SETFL, original_flags | O_NONBLOCK); +#endif + } + + ~BlockingGuard() { +#ifdef _WIN32 + unsigned long mode = 0; + ::ioctlsocket(fd, FIONBIO, &mode); +#else + ::fcntl(fd, F_SETFL, original_flags); +#endif + } + + BlockingGuard(const BlockingGuard&) = delete; + BlockingGuard& operator=(const BlockingGuard&) = delete; + BlockingGuard(BlockingGuard&&) = delete; + BlockingGuard& operator=(BlockingGuard&&) = delete; +}; } // namespace TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway, @@ -83,12 +133,7 @@ databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer, // having no timeout const auto timeout_ms = timeout.count() ? static_cast(timeout.count()) : -1; while (true) { - const int poll_status = -#ifdef _WIN32 - ::WSAPoll(&fds, 1, timeout_ms); -#else - ::poll(&fds, 1, timeout_ms); -#endif + const int poll_status = Poll(&fds, 1, timeout_ms); if (poll_status > 0) { return ReadSome(buffer, max_size); } @@ -130,13 +175,34 @@ databento::detail::ScopedFd TcpClient::InitSocket(ILogReceiver* log_receiver, } std::unique_ptr res{out, &::freeaddrinfo}; const auto max_attempts = std::max(retry_conf.max_attempts, 1); + const auto timeout_ms = static_cast( + std::chrono::duration_cast(retry_conf.connect_timeout) + .count()); std::chrono::seconds backoff{1}; for (std::uint32_t attempt = 0; attempt < max_attempts; ++attempt) { - if (::connect(scoped_fd.Get(), res->ai_addr, res->ai_addrlen) == 0) { + BlockingGuard guard{scoped_fd.Get()}; + + const int connect_ret = ::connect(scoped_fd.Get(), res->ai_addr, res->ai_addrlen); + bool connected = (connect_ret == 0); + if (!connected && ::GetErrNo() == kConnectInProgress) { + pollfd pfd{scoped_fd.Get(), POLLOUT, {}}; + const int poll_ret = Poll(&pfd, 1, timeout_ms); + if (poll_ret > 0) { + int so_error = 0; + socklen_t len = sizeof(so_error); + ::getsockopt(scoped_fd.Get(), SOL_SOCKET, SO_ERROR, &so_error, &len); + connected = (so_error == 0); + if (!connected) { + errno = so_error; + } + } + } + + if (connected) { break; } else if (attempt + 1 == max_attempts) { std::ostringstream err_msg; - err_msg << "Socket failed to connect after " << max_attempts << " attempts"; + err_msg << "Socket failed to connect after " << max_attempts << " attempt(s)"; throw TcpError{::GetErrNo(), err_msg.str()}; } std::ostringstream log_msg; diff --git a/src/live.cpp b/src/live.cpp index cfd8515..7e00174 100644 --- a/src/live.cpp +++ b/src/live.cpp @@ -90,6 +90,11 @@ LiveBuilder& LiveBuilder::SetSlowReaderBehavior( return *this; } +LiveBuilder& LiveBuilder::SetTimeoutConf(TimeoutConf timeout_conf) { + timeout_conf_ = timeout_conf; + return *this; +} + databento::LiveBlocking LiveBuilder::BuildBlocking() { Validate(); if (gateway_.empty()) { @@ -97,14 +102,16 @@ databento::LiveBlocking LiveBuilder::BuildBlocking() { dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_reader_behavior_}; + compression_, slow_reader_behavior_, + timeout_conf_}; } return databento::LiveBlocking{log_receiver_, key_, dataset_, gateway_, port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_reader_behavior_}; + compression_, slow_reader_behavior_, + timeout_conf_}; } databento::LiveThreaded LiveBuilder::BuildThreaded() { @@ -114,14 +121,16 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() { dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_reader_behavior_}; + compression_, slow_reader_behavior_, + timeout_conf_}; } return databento::LiveThreaded{log_receiver_, key_, dataset_, gateway_, port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, buffer_size_, user_agent_ext_, - compression_, slow_reader_behavior_}; + compression_, slow_reader_behavior_, + timeout_conf_}; } void LiveBuilder::Validate() { diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 8f1e745..9602d10 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -36,8 +36,8 @@ LiveBlocking::LiveBlocking( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior) - + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf) : log_receiver_{log_receiver}, key_{std::move(key)}, dataset_{std::move(dataset)}, @@ -49,7 +49,8 @@ LiveBlocking::LiveBlocking( heartbeat_interval_{heartbeat_interval}, compression_{compression}, slow_reader_behavior_{slow_reader_behavior}, - connection_{log_receiver_, gateway_, port_}, + timeout_conf_{timeout_conf}, + connection_{log_receiver_, gateway_, port_, {{}, {}, timeout_conf_.connect}}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -59,7 +60,8 @@ LiveBlocking::LiveBlocking( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior) + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf) : log_receiver_{log_receiver}, key_{std::move(key)}, dataset_{std::move(dataset)}, @@ -71,7 +73,8 @@ LiveBlocking::LiveBlocking( heartbeat_interval_{heartbeat_interval}, compression_{compression}, slow_reader_behavior_{slow_reader_behavior}, - connection_{log_receiver_, gateway_, port_}, + timeout_conf_{timeout_conf}, + connection_{log_receiver_, gateway_, port_, {{}, {}, timeout_conf_.connect}}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -239,7 +242,8 @@ void LiveBlocking::Reconnect() { log_msg << "Reconnecting to " << gateway_ << ':' << port_; log_receiver_->Receive(LogLevel::Info, log_msg.str()); } - connection_ = detail::LiveConnection{log_receiver_, gateway_, port_}; + connection_ = detail::LiveConnection{ + log_receiver_, gateway_, port_, {{}, {}, timeout_conf_.connect}}; buffer_.Clear(); sub_counter_ = 0; session_id_ = this->Authenticate(); @@ -262,10 +266,14 @@ void LiveBlocking::Resubscribe() { } } -std::string LiveBlocking::DecodeChallenge() { +std::string LiveBlocking::DecodeChallenge(std::chrono::milliseconds timeout) { static constexpr auto kMethodName = "LiveBlocking::DecodeChallenge"; - const auto read_size = - connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; + const auto result = + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); + if (result.status == Status::Timeout) { + throw TcpError{0, "Authentication timed out waiting for challenge"}; + } + const auto read_size = result.read_size; if (read_size == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } @@ -288,8 +296,12 @@ std::string LiveBlocking::DecodeChallenge() { : response.find('\n', find_start); while (next_nl_pos == std::string::npos) { // read more - buffer_.Fill( - connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size); + const auto loop_result = + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); + if (loop_result.status == Status::Timeout) { + throw TcpError{0, "Authentication timed out waiting for challenge"}; + } + buffer_.Fill(loop_result.read_size); if (buffer_.ReadCapacity() == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } @@ -326,7 +338,17 @@ std::string LiveBlocking::DetermineGateway() const { std::uint64_t LiveBlocking::Authenticate() { static constexpr auto kMethodName = "LiveBlocking::Authenticate"; - const std::string challenge_key = DecodeChallenge() + '|' + key_; + const auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration_cast( + timeout_conf_.auth); + auto remaining = [&deadline]() { + const auto now = std::chrono::steady_clock::now(); + if (now >= deadline) { + throw TcpError{0, "Authentication timed out"}; + } + return std::chrono::duration_cast(deadline - now); + }; + const std::string challenge_key = DecodeChallenge(remaining()) + '|' + key_; const std::string auth = GenerateCramReply(challenge_key); const std::string req = EncodeAuthReq(auth); @@ -336,7 +358,7 @@ std::uint64_t LiveBlocking::Authenticate() { log_receiver_->Receive(LogLevel::Debug, log_ss.str()); } connection_.WriteAll(req); - const std::uint64_t session_id = DecodeAuthResp(); + const std::uint64_t session_id = DecodeAuthResp(remaining()); if (log_receiver_->ShouldLog(LogLevel::Info)) { std::ostringstream log_ss; @@ -372,19 +394,22 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { return req_stream.str(); } -std::uint64_t LiveBlocking::DecodeAuthResp() { +std::uint64_t LiveBlocking::DecodeAuthResp(std::chrono::milliseconds timeout) { // handle split packet read const std::byte* newline_ptr; buffer_.Clear(); do { - const auto read_size = - connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; - if (read_size == 0) { + const auto result = + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); + if (result.status == Status::Timeout) { + throw TcpError{0, "Authentication timed out waiting for auth response"}; + } + if (result.read_size == 0) { throw LiveApiError{ "Unexpected end of message received from server after replying to " "CRAM"}; } - buffer_.Fill(read_size); + buffer_.Fill(result.read_size); newline_ptr = std::find(buffer_.ReadBegin(), buffer_.ReadEnd(), static_cast('\n')); } while (newline_ptr == buffer_.ReadEnd()); diff --git a/src/live_threaded.cpp b/src/live_threaded.cpp index 0ec15d0..a9f20cb 100644 --- a/src/live_threaded.cpp +++ b/src/live_threaded.cpp @@ -62,11 +62,12 @@ LiveThreaded::LiveThreaded( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior) + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf) : impl_{std::make_unique(log_receiver, std::move(key), std::move(dataset), send_ts_out, upgrade_policy, heartbeat_interval, buffer_size, std::move(user_agent_ext), compression, - slow_reader_behavior)} {} + slow_reader_behavior, timeout_conf)} {} LiveThreaded::LiveThreaded( ILogReceiver* log_receiver, std::string key, std::string dataset, @@ -74,11 +75,13 @@ LiveThreaded::LiveThreaded( VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, std::size_t buffer_size, std::string user_agent_ext, databento::Compression compression, - std::optional slow_reader_behavior) + std::optional slow_reader_behavior, + databento::TimeoutConf timeout_conf) : impl_{std::make_unique( log_receiver, std::move(key), std::move(dataset), std::move(gateway), port, send_ts_out, upgrade_policy, heartbeat_interval, buffer_size, - std::move(user_agent_ext), compression, slow_reader_behavior)} {} + std::move(user_agent_ext), compression, slow_reader_behavior, timeout_conf)} { +} const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); } @@ -106,6 +109,12 @@ std::optional LiveThreaded::SlowReaderBehavior() return impl_->blocking.SlowReaderBehavior(); } +const databento::TimeoutConf& LiveThreaded::TimeoutConf() const { + return impl_->blocking.TimeoutConf(); +} + +std::uint64_t LiveThreaded::SessionId() const { return impl_->blocking.SessionId(); } + const std::vector& LiveThreaded::Subscriptions() const { return impl_->blocking.Subscriptions(); } diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index e2d1cfe..b36c402 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -1,3 +1,4 @@ +#include #include #include // SHA256_DIGEST_LENGTH @@ -25,6 +26,7 @@ #include "databento/with_ts_out.hpp" #include "mock/mock_log_receiver.hpp" #include "mock/mock_lsg_server.hpp" // MockLsgServer +#include "mock/mock_tcp_server.hpp" // MockTcpServer namespace databento::tests { class LiveBlockingTests : public testing::Test { @@ -787,17 +789,30 @@ TEST_F(LiveBlockingTests, TestTryNextRecordEmptyBuffer) { TEST_F(LiveBlockingTests, TestTryNextRecordAfterFillBuffer) { constexpr auto kTsOut = false; constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; - const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, - [kRec](mock::MockLsgServer& self) { - self.Accept(); - self.Authenticate(); - self.SendRecord(kRec); - }}; + bool sent = false; + std::mutex sent_mutex; + std::condition_variable sent_cv; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, + [kRec, &sent, &sent_mutex, &sent_cv](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.SendRecord(kRec); + { + const std::lock_guard lock{sent_mutex}; + sent = true; + sent_cv.notify_one(); + } + }}; LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) .SetSendTsOut(kTsOut) .SetAddress(kLocalhost, mock_server.Port()) .BuildBlocking(); + { + std::unique_lock lock{sent_mutex}; + sent_cv.wait(lock, [&sent] { return sent; }); + } const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); ASSERT_EQ(fill_res.status, IReadable::Status::Ok); ASSERT_GT(fill_res.read_size, 0); @@ -832,7 +847,7 @@ TEST_F(LiveBlockingTests, TestTryNextRecordPollLoop) { constexpr auto kRecCount = 5; constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, - [kRec](mock::MockLsgServer& self) { + [kRec, kRecCount](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); for (size_t i = 0; i < kRecCount; ++i) { @@ -911,4 +926,32 @@ TEST_F(LiveBlockingTests, TestTryNextRecordPartialRecord) { EXPECT_EQ(rec->Get(), kRec); } +TEST_F(LiveBlockingTests, TestConnectTimeout) { + const auto connect = [this] { + builder_.SetDataset(dataset::kGlbxMdp3) + .SetAddress("192.0.2.1", 13000) + .SetTimeoutConf({std::chrono::seconds{1}, std::chrono::seconds{30}}) + .BuildBlocking(); + }; + const auto matcher = + testing::ThrowsMessage(testing::HasSubstr("failed to connect")); + EXPECT_THAT(connect, matcher); +} + +TEST_F(LiveBlockingTests, TestAuthTimeout) { + const mock::MockTcpServer mock_server{[](mock::MockTcpServer& self) { + self.Accept(); + std::this_thread::sleep_for(std::chrono::seconds{3}); + }}; + const auto connect = [this, &mock_server] { + builder_.SetDataset(dataset::kGlbxMdp3) + .SetAddress(kLocalhost, mock_server.Port()) + .SetTimeoutConf({std::chrono::seconds{10}, std::chrono::seconds{1}}) + .BuildBlocking(); + }; + const auto matcher = + testing::ThrowsMessage(testing::HasSubstr("Authentication timed out")); + EXPECT_THAT(connect, matcher); +} + } // namespace databento::tests From da99187efc813d4806fcebb7579c0bce4a8e5eab Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 7 Apr 2026 15:50:57 -0500 Subject: [PATCH 6/8] VER: Release C++ client 0.53.0 --- CHANGELOG.md | 2 +- CMakeLists.txt | 2 +- pkg/PKGBUILD | 2 +- tests/src/live_blocking_tests.cpp | 43 ++++++++++++++++++++++++++----- 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55f98c7..0134d06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.53.0 - TBD +## 0.53.0 - 2026-04-07 ### Enhancements - Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained diff --git a/CMakeLists.txt b/CMakeLists.txt index 0647dc6..d58e9c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2) project( databento - VERSION 0.52.0 + VERSION 0.53.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index f062726..77e380e 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.52.0 +pkgver=0.53.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index b36c402..019e81f 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -340,7 +340,9 @@ TEST_F(LiveBlockingTests, TestNextRecordTimeout) { std::mutex receive_mutex; std::condition_variable receive_cv; const mock::MockLsgServer mock_server{ - dataset::kXnasItch, kTsOut, [&](mock::MockLsgServer& self) { + dataset::kXnasItch, kTsOut, + [kRec, &sent_first_msg, &send_mutex, &send_cv, &received_first_msg, + &receive_mutex, &receive_cv](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); self.SendRecord(kRec); @@ -397,7 +399,9 @@ TEST_F(LiveBlockingTests, TestNextRecordTimeoutWithZstdCompression) { std::mutex receive_mutex; std::condition_variable receive_cv; const mock::MockLsgServer mock_server{ - dataset::kXnasItch, kTsOut, Compression::Zstd, [&](mock::MockLsgServer& self) { + dataset::kXnasItch, kTsOut, Compression::Zstd, + [kRec, &sent_first_msg, &send_mutex, &send_cv, &received_first_msg, + &receive_mutex, &receive_cv](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); self.StartCompressed(); @@ -789,14 +793,23 @@ TEST_F(LiveBlockingTests, TestTryNextRecordEmptyBuffer) { TEST_F(LiveBlockingTests, TestTryNextRecordAfterFillBuffer) { constexpr auto kTsOut = false; constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; - bool sent = false; + bool client_ready{}; + std::mutex client_ready_mutex; + std::condition_variable client_ready_cv; + bool sent{}; std::mutex sent_mutex; std::condition_variable sent_cv; const mock::MockLsgServer mock_server{ dataset::kXnasItch, kTsOut, - [kRec, &sent, &sent_mutex, &sent_cv](mock::MockLsgServer& self) { + [kRec, &client_ready, &client_ready_mutex, &client_ready_cv, &sent, &sent_mutex, + &sent_cv](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); + { + // wait for client to finish auth to prevent TCP coalescing + std::unique_lock lock{client_ready_mutex}; + client_ready_cv.wait(lock, [&client_ready] { return client_ready; }); + } self.SendRecord(kRec); { const std::lock_guard lock{sent_mutex}; @@ -809,6 +822,11 @@ TEST_F(LiveBlockingTests, TestTryNextRecordAfterFillBuffer) { .SetSendTsOut(kTsOut) .SetAddress(kLocalhost, mock_server.Port()) .BuildBlocking(); + { + const std::lock_guard lock{client_ready_mutex}; + client_ready = true; + client_ready_cv.notify_one(); + } { std::unique_lock lock{sent_mutex}; sent_cv.wait(lock, [&sent] { return sent; }); @@ -889,15 +907,23 @@ TEST_F(LiveBlockingTests, TestTryNextRecordPartialRecord) { TimeDeltaNanos{}, 100}; + bool client_ready{}; + std::mutex client_ready_mutex; + std::condition_variable client_ready_cv; bool send_remaining{}; std::mutex send_remaining_mutex; std::condition_variable send_remaining_cv; const mock::MockLsgServer mock_server{ dataset::kGlbxMdp3, kTsOut, - [kRec, &send_remaining, &send_remaining_mutex, - &send_remaining_cv](mock::MockLsgServer& self) { + [kRec, &client_ready, &client_ready_mutex, &client_ready_cv, &send_remaining, + &send_remaining_mutex, &send_remaining_cv](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); + { + // wait for client to finish auth to prevent TCP coalescing + std::unique_lock lock{client_ready_mutex}; + client_ready_cv.wait(lock, [&client_ready] { return client_ready; }); + } self.SplitSendRecord(kRec, send_remaining, send_remaining_mutex, send_remaining_cv); }}; @@ -906,6 +932,11 @@ TEST_F(LiveBlockingTests, TestTryNextRecordPartialRecord) { .SetSendTsOut(kTsOut) .SetAddress(kLocalhost, mock_server.Port()) .BuildBlocking(); + { + const std::lock_guard lock{client_ready_mutex}; + client_ready = true; + client_ready_cv.notify_one(); + } // Read partial record (just header) auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000}); ASSERT_EQ(fill_res.status, IReadable::Status::Ok); From 326c92f160084ea25ba7705d61b6f04d53104a7d Mon Sep 17 00:00:00 2001 From: Rob Maierle Date: Tue, 7 Apr 2026 17:29:33 -0400 Subject: [PATCH 7/8] MOD: Update CFE description --- include/databento/publishers.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/databento/publishers.hpp b/include/databento/publishers.hpp index c44850b..a886178 100644 --- a/include/databento/publishers.hpp +++ b/include/databento/publishers.hpp @@ -195,7 +195,7 @@ enum class Dataset : std::uint16_t { XeurEobi = 38, // European Energy Exchange EOBI XeeeEobi = 39, - // Cboe Futures Exchange PITCH + // CFE Depth XcbfPitch = 40, // Blue Ocean ATS MEMOIR Depth OceaMemoir = 41, @@ -411,9 +411,9 @@ enum class Publisher : std::uint16_t { XeurEobiXoff = 103, // European Energy Exchange EOBI - Off-Market Trades XeeeEobiXoff = 104, - // Cboe Futures Exchange + // Cboe Futures Exchange (CFE) XcbfPitchXcbf = 105, - // Cboe Futures Exchange - Off-Market Trades + // Cboe Futures Exchange (CFE) - Off-Market Trades XcbfPitchXoff = 106, // Blue Ocean ATS MEMOIR OceaMemoirOcea = 107, From 6ad5d286c2f4df93311284bbe6682ef7e6a5d6bb Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 8 Apr 2026 08:12:50 -0500 Subject: [PATCH 8/8] FIX: Fix Windows C++ build --- CHANGELOG.md | 2 +- src/detail/tcp_client.cpp | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0134d06..e33f045 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.53.0 - 2026-04-07 +## 0.53.0 - 2026-04-08 ### Enhancements - Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained diff --git a/src/detail/tcp_client.cpp b/src/detail/tcp_client.cpp index 1fc6eb0..f0aacb9 100644 --- a/src/detail/tcp_client.cpp +++ b/src/detail/tcp_client.cpp @@ -41,6 +41,16 @@ int Poll(::pollfd* fds, std::uint32_t nfds, int timeout_ms) { #endif } +int GetSockOpt(databento::detail::Socket fd, int level, int optname, int* optval) { +#ifdef _WIN32 + int len = sizeof(*optval); + return ::getsockopt(fd, level, optname, reinterpret_cast(optval), &len); +#else + socklen_t len = sizeof(*optval); + return ::getsockopt(fd, level, optname, optval, &len); +#endif +} + #ifdef _WIN32 constexpr int kConnectInProgress = WSAEWOULDBLOCK; #else @@ -189,8 +199,7 @@ databento::detail::ScopedFd TcpClient::InitSocket(ILogReceiver* log_receiver, const int poll_ret = Poll(&pfd, 1, timeout_ms); if (poll_ret > 0) { int so_error = 0; - socklen_t len = sizeof(so_error); - ::getsockopt(scoped_fd.Get(), SOL_SOCKET, SO_ERROR, &so_error, &len); + GetSockOpt(scoped_fd.Get(), SOL_SOCKET, SO_ERROR, &so_error); connected = (so_error == 0); if (!connected) { errno = so_error;