Skip to content

feat: introduce a v2 createProducer API to carry error message when fail#579

Draft
BewareMyPower wants to merge 2 commits into
apache:mainfrom
BewareMyPower:bewaremypower/v2-create-producer
Draft

feat: introduce a v2 createProducer API to carry error message when fail#579
BewareMyPower wants to merge 2 commits into
apache:mainfrom
BewareMyPower:bewaremypower/v2-create-producer

Conversation

@BewareMyPower
Copy link
Copy Markdown
Contributor

No description provided.

@BewareMyPower BewareMyPower requested a review from Copilot May 22, 2026 09:51
@BewareMyPower BewareMyPower self-assigned this May 22, 2026
@BewareMyPower BewareMyPower added this to the 4.2.0 milestone May 22, 2026
@BewareMyPower BewareMyPower marked this pull request as draft May 22, 2026 10:00
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new Error { Result result; std::string message; } type and wires it through internal async/Future-based flows so producer creation (and several related operations like lookup/connect/schema) can carry an error message on failures, alongside a new createProducerAsyncV2 callback API that returns std::variant<Error, Producer>.

Changes:

  • Add public pulsar::Error type (+ stream operator) and update multiple internal Future/Promise paths from Result to Error.
  • Introduce Client::createProducerAsyncV2 and update ClientImpl::createProducerAsync to use a V2 callback that can carry error messages.
  • Update connection/lookup/retry infrastructure and unit tests to consume Error-based futures/callbacks.

Reviewed changes

Copilot reviewed 38 out of 38 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/SchemaTest.cc Updates producer-creation test to handle V2 callback and Error future results.
tests/RetryableOperationCacheTest.cc Migrates retryable operation cache tests from Result futures to Error futures.
tests/MultiTopicsConsumerTest.cc Adjusts mocked connection close calls to pass Error.
tests/MockClientImpl.h Updates mock producer creation to interpret variant<Error, Producer>.
tests/LookupServiceTest.cc Migrates lookup/schema tests to Future<Error, ...> and updated listeners.
tests/ConnectionTest.cc Updates mock close signature to accept Error.
tests/ClientTest.cc Adapts timeout assertions to use .result from Error.
lib/RetryableOperationCache.h Changes cache runner to operate on Future/Promise<Error, T>.
lib/RetryableOperation.h Converts retry logic to reason over Error.result and propagate Error.
lib/RetryableLookupService.h Adds conversion helpers between Future<Result,T> and Future<Error,T> and updates cache keys.
lib/ProducerImplBase.h Updates producer-created future interface to Future<Error, ...>.
lib/ProducerImpl.h Updates handler signatures and internal promises to use Error.
lib/ProducerImpl.cc Propagates Error through producer creation/reconnect flow and logging.
lib/PendingRequest.h Changes pending request failure path to store Error.
lib/PartitionedProducerImpl.h Updates partitioned producer created promise/future and callbacks to use Error.
lib/PartitionedProducerImpl.cc Uses Error for partition producer creation completion and error propagation.
lib/MultiTopicsConsumerImpl.cc Updates partition-metadata listeners to consume Error.
lib/LookupService.h Updates lookup service virtual methods to return Future<Error, ...> for partition/schema.
lib/LookupDataResult.h Updates LookupDataResultPromise to Promise<Error, ...>.
lib/HTTPLookupService.h Migrates HTTP lookup promises and request helpers to return Error.
lib/HTTPLookupService.cc Builds richer error messages for HTTP failures and schema JSON parsing.
lib/HandlerBase.h Updates connectionOpened contract to Future<Error, bool>.
lib/HandlerBase.cc Adapts reconnection logic to use error.result.
lib/ConsumerImplBase.h Updates default connectionOpened implementation to Future<Error, bool>.
lib/ConsumerImpl.h Updates overridden connectionOpened signature to Future<Error, bool>.
lib/ConsumerImpl.cc Adapts connection, unsubscribe/close/ack/seek flows to use Error-based request futures.
lib/ConnectionPool.h Updates pool connection futures to return Error.
lib/ConnectionPool.cc Adapts connection creation/close to propagate Error details.
lib/ClientImpl.h Switches producer-creation callback type to V2 and updates helper signatures.
lib/ClientImpl.cc Implements V2 producer creation flow using variant<Error, Producer>.
lib/ClientConnectionAdaptor.h Updates server-error handling to close connections with Error (including message when available).
lib/ClientConnection.h Migrates connect/request futures and close() to use Error.
lib/ClientConnection.cc Propagates broker/server error messages via Error into pending requests and connection lifecycle.
lib/Client.cc Adds createProducerAsyncV2 and bridges legacy callback API from V2 variant results.
lib/BinaryProtoLookupService.h Updates partition/schema futures and internal callbacks to use Error.
lib/BinaryProtoLookupService.cc Adapts lookup/schema request flows to propagate Error through futures/listeners.
include/pulsar/Result.h Introduces public Error type and stream operator.
include/pulsar/Client.h Adds V2 callback type and API; deprecates legacy createProducer APIs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread lib/ClientImpl.cc
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create producer: " << e.what());
callback(ResultConnectError, {});
callback(error);
Comment thread lib/ClientImpl.cc
Comment on lines 275 to +280
if (existingProducer) {
auto producer = existingProducer.value().lock();
LOG_ERROR("Unexpected existing producer at the same address: "
<< address << ", producer: " << (producer ? producer->getProducerName() : "(null)"));
callback(ResultUnknownError, {});
callback(Error{ResultUnknownError,
"Unexpected existing producer for name " + producer->getProducerName()});
Comment thread include/pulsar/Client.h
Comment on lines +114 to +116
[[deprecated("use createProducerAsyncV2")]] Result createProducer(const std::string& topic,
const ProducerConfiguration& conf,
Producer& producer);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants