feat: introduce a v2 createProducer API to carry error message when fail#579
Draft
BewareMyPower wants to merge 2 commits into
Draft
feat: introduce a v2 createProducer API to carry error message when fail#579BewareMyPower wants to merge 2 commits into
BewareMyPower wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new Error { Result result; std::string message; } type and wires it through internal async/Future-based flows so producer creation (and several related operations like lookup/connect/schema) can carry an error message on failures, alongside a new createProducerAsyncV2 callback API that returns std::variant<Error, Producer>.
Changes:
- Add public
pulsar::Errortype (+ stream operator) and update multiple internalFuture/Promisepaths fromResulttoError. - Introduce
Client::createProducerAsyncV2and updateClientImpl::createProducerAsyncto use a V2 callback that can carry error messages. - Update connection/lookup/retry infrastructure and unit tests to consume
Error-based futures/callbacks.
Reviewed changes
Copilot reviewed 38 out of 38 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/SchemaTest.cc | Updates producer-creation test to handle V2 callback and Error future results. |
| tests/RetryableOperationCacheTest.cc | Migrates retryable operation cache tests from Result futures to Error futures. |
| tests/MultiTopicsConsumerTest.cc | Adjusts mocked connection close calls to pass Error. |
| tests/MockClientImpl.h | Updates mock producer creation to interpret variant<Error, Producer>. |
| tests/LookupServiceTest.cc | Migrates lookup/schema tests to Future<Error, ...> and updated listeners. |
| tests/ConnectionTest.cc | Updates mock close signature to accept Error. |
| tests/ClientTest.cc | Adapts timeout assertions to use .result from Error. |
| lib/RetryableOperationCache.h | Changes cache runner to operate on Future/Promise<Error, T>. |
| lib/RetryableOperation.h | Converts retry logic to reason over Error.result and propagate Error. |
| lib/RetryableLookupService.h | Adds conversion helpers between Future<Result,T> and Future<Error,T> and updates cache keys. |
| lib/ProducerImplBase.h | Updates producer-created future interface to Future<Error, ...>. |
| lib/ProducerImpl.h | Updates handler signatures and internal promises to use Error. |
| lib/ProducerImpl.cc | Propagates Error through producer creation/reconnect flow and logging. |
| lib/PendingRequest.h | Changes pending request failure path to store Error. |
| lib/PartitionedProducerImpl.h | Updates partitioned producer created promise/future and callbacks to use Error. |
| lib/PartitionedProducerImpl.cc | Uses Error for partition producer creation completion and error propagation. |
| lib/MultiTopicsConsumerImpl.cc | Updates partition-metadata listeners to consume Error. |
| lib/LookupService.h | Updates lookup service virtual methods to return Future<Error, ...> for partition/schema. |
| lib/LookupDataResult.h | Updates LookupDataResultPromise to Promise<Error, ...>. |
| lib/HTTPLookupService.h | Migrates HTTP lookup promises and request helpers to return Error. |
| lib/HTTPLookupService.cc | Builds richer error messages for HTTP failures and schema JSON parsing. |
| lib/HandlerBase.h | Updates connectionOpened contract to Future<Error, bool>. |
| lib/HandlerBase.cc | Adapts reconnection logic to use error.result. |
| lib/ConsumerImplBase.h | Updates default connectionOpened implementation to Future<Error, bool>. |
| lib/ConsumerImpl.h | Updates overridden connectionOpened signature to Future<Error, bool>. |
| lib/ConsumerImpl.cc | Adapts connection, unsubscribe/close/ack/seek flows to use Error-based request futures. |
| lib/ConnectionPool.h | Updates pool connection futures to return Error. |
| lib/ConnectionPool.cc | Adapts connection creation/close to propagate Error details. |
| lib/ClientImpl.h | Switches producer-creation callback type to V2 and updates helper signatures. |
| lib/ClientImpl.cc | Implements V2 producer creation flow using variant<Error, Producer>. |
| lib/ClientConnectionAdaptor.h | Updates server-error handling to close connections with Error (including message when available). |
| lib/ClientConnection.h | Migrates connect/request futures and close() to use Error. |
| lib/ClientConnection.cc | Propagates broker/server error messages via Error into pending requests and connection lifecycle. |
| lib/Client.cc | Adds createProducerAsyncV2 and bridges legacy callback API from V2 variant results. |
| lib/BinaryProtoLookupService.h | Updates partition/schema futures and internal callbacks to use Error. |
| lib/BinaryProtoLookupService.cc | Adapts lookup/schema request flows to propagate Error through futures/listeners. |
| include/pulsar/Result.h | Introduces public Error type and stream operator. |
| include/pulsar/Client.h | Adds V2 callback type and API; deprecates legacy createProducer APIs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } catch (const std::runtime_error& e) { | ||
| LOG_ERROR("Failed to create producer: " << e.what()); | ||
| callback(ResultConnectError, {}); | ||
| callback(error); |
Comment on lines
275
to
+280
| if (existingProducer) { | ||
| auto producer = existingProducer.value().lock(); | ||
| LOG_ERROR("Unexpected existing producer at the same address: " | ||
| << address << ", producer: " << (producer ? producer->getProducerName() : "(null)")); | ||
| callback(ResultUnknownError, {}); | ||
| callback(Error{ResultUnknownError, | ||
| "Unexpected existing producer for name " + producer->getProducerName()}); |
Comment on lines
+114
to
+116
| [[deprecated("use createProducerAsyncV2")]] Result createProducer(const std::string& topic, | ||
| const ProducerConfiguration& conf, | ||
| Producer& producer); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.