Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <memory>
#include <string>
#include <variant>

namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
Expand All @@ -45,6 +46,8 @@ typedef std::function<void(Result, TableView)> TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;

using CreateProducerV2Callback = std::function<void(std::variant<Error, Producer>)>;

class ClientImpl;
class PulsarFriend;
class PulsarWrapper;
Expand Down Expand Up @@ -108,7 +111,9 @@ class PULSAR_PUBLIC Client {
* @return ResultOk if the producer has been successfully created
* @return ResultError if there was an error
*/
Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer);
[[deprecated("use createProducerAsyncV2")]] Result createProducer(const std::string& topic,
const ProducerConfiguration& conf,
Producer& producer);
Comment on lines +114 to +116

/**
* Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific
Expand All @@ -118,7 +123,8 @@ class PULSAR_PUBLIC Client {
* @param callback the callback that is triggered when the producer is created successfully or not
* @param callback Callback function that is invoked when the operation is completed
*/
void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback);
[[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
const std::string& topic, const CreateProducerCallback& callback);

/**
* Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific
Expand All @@ -127,8 +133,11 @@ class PULSAR_PUBLIC Client {
* @param topic the name of the topic where to produce
* @param conf the customized ProducerConfiguration
*/
void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
const CreateProducerCallback& callback);
[[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback);

void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
CreateProducerV2Callback callback);

/**
* Subscribe to a given topic and subscription combination with the default ConsumerConfiguration
Expand Down
16 changes: 16 additions & 0 deletions include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include <cstdint>
#include <iosfwd>
#include <ostream>
#include <string>

namespace pulsar {

Expand Down Expand Up @@ -101,6 +103,20 @@ enum Result : int8_t
PULSAR_PUBLIC const char* strResult(Result result);

PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);

struct PULSAR_PUBLIC Error {
Result result;
std::string message;
};

inline std::ostream& operator<<(std::ostream& os, const Error& error) {
os << error.result;
if (!error.message.empty()) {
os << " " << error.message;
}
return os;
}

} // namespace pulsar

#endif /* ERROR_HPP_ */
72 changes: 35 additions & 37 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho

// NOTE: we can use move capture for topic since C++14
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
redirectCount](Result result,
redirectCount](const auto& error,
const ClientConnectionWeakPtr& weakCnx) {
if (result != ResultOk) {
promise->setFailed(result);
if (error.result != ResultOk) {
promise->setFailed(error.result);
return;
}
auto cnx = weakCnx.lock();
Expand All @@ -62,10 +62,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise);
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount](
Result result, const LookupDataResultPtr& data) {
if (result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
promise->setFailed(result);
const Error& error, const LookupDataResultPtr& data) {
if (error.result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << error);
promise->setFailed(error.result);
return;
}

Expand Down Expand Up @@ -96,15 +96,11 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
return promise->getFuture();
}

/*
* @param topicName topic to get number of partitions.
*
*/
Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
Future<Error, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
const TopicNamePtr& topicName) {
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
promise->setFailed(Error{ResultInvalidTopicName, ""});
return promise->getFuture();
}
std::string lookupName = topicName->toString();
Expand All @@ -115,16 +111,17 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
return promise->getFuture();
}

void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName,
const Error& error,
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise) {
if (result != ResultOk) {
promise->setFailed(result);
if (error.result != ResultOk) {
promise->setFailed(error);
return;
}
auto conn = clientCnx.lock();
if (!conn) {
promise->setFailed(ResultConnectError);
promise->setFailed(Error{ResultConnectError, ""});
return;
}
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
Expand All @@ -135,7 +132,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
std::placeholders::_2, clientCnx, promise));
}

void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result,
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, const Error& error,
const LookupDataResultPtr& data,
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise) {
Expand All @@ -144,8 +141,8 @@ void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string&
<< data->getBrokerUrl());
promise->setValue(data);
} else {
LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << result);
promise->setFailed(result);
LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << error);
promise->setFailed(error);
}
}

Expand All @@ -168,38 +165,39 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}

Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
const std::string& version) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, SchemaInfo>>();

Future<Error, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
const std::string& version) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Error, SchemaInfo>>();
if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
promise->setFailed(Error{ResultInvalidTopicName, ""});
return promise->getFuture();
}
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())

const auto topic = topicName->toString();
const auto address = serviceNameResolver_.resolveHost();
cnxPool_.getConnectionAsync(address, address)
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
version, std::placeholders::_1, std::placeholders::_2, promise));

return promise->getFuture();
}

void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, const std::string& version,
Result result, const ClientConnectionWeakPtr& clientCnx,
const Error& error,
const ClientConnectionWeakPtr& clientCnx,
const GetSchemaPromisePtr& promise) {
if (result != ResultOk) {
promise->setFailed(result);
if (error.result != ResultOk) {
promise->setFailed(error);
return;
}

ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName
<< " version: " << version);

conn->newGetSchema(topicName, version, requestId)
.addListener([promise](Result result, const SchemaInfo& schemaInfo) {
if (result != ResultOk) {
promise->setFailed(result);
.addListener([promise](const auto& error, const SchemaInfo& schemaInfo) {
if (error.result != ResultOk) {
promise->setFailed(error);
return;
}
promise->setValue(schemaInfo);
Expand All @@ -208,11 +206,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName

void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode,
Result result,
const Error& error,
const ClientConnectionWeakPtr& clientCnx,
const NamespaceTopicsPromisePtr& promise) {
if (result != ResultOk) {
promise->setFailed(result);
if (error.result != ResultOk) {
promise->setFailed(error.result);
return;
}

Expand Down
14 changes: 7 additions & 7 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Error, SchemaInfo>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
Expand All @@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

LookupResultFuture getBroker(const TopicName& topicName) override;

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;

Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }

Expand All @@ -71,20 +71,20 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
std::string listenerName_;
const int32_t maxLookupRedirects_;

void sendPartitionMetadataLookupRequest(const std::string& topicName, Result result,
void sendPartitionMetadataLookupRequest(const std::string& topicName, const Error& error,
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise);

void handlePartitionMetadataLookup(const std::string& topicName, Result result,
void handlePartitionMetadataLookup(const std::string& topicName, const Error& error,
const LookupDataResultPtr& data,
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise);

void sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode,
Result result, const ClientConnectionWeakPtr& clientCnx,
const Error& error, const ClientConnectionWeakPtr& clientCnx,
const NamespaceTopicsPromisePtr& promise);

void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result,
void sendGetSchemaRequest(const std::string& topicName, const std::string& version, const Error& error,
const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise);

void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& topicsPtr,
Expand Down
17 changes: 15 additions & 2 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,18 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC

void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
const CreateProducerCallback& callback) {
impl_->createProducerAsync(topic, conf, callback);
impl_->createProducerAsync(topic, conf, [callback](const auto& v) {
if (const auto* error = std::get_if<Error>(&v)) {
callback(error->result, Producer());
} else {
callback(ResultOk, std::get<Producer>(v));
}
});
}

void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
CreateProducerV2Callback callback) {
impl_->createProducerAsync(topic, conf, std::move(callback));
}

Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer) {
Expand Down Expand Up @@ -199,7 +210,9 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers();
void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)> callback) {
impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
.addListener(std::move(callback));
.addListener([callback{std::move(callback)}](const Error& error, const SchemaInfo& schemaInfo) {
callback(error.result, schemaInfo);
});
}

ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }
Expand Down
Loading
Loading