Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@

#include <memory>
#include <string>
#include <variant>

namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
typedef std::function<void(std::variant<Producer, Error>)> CreateProducerCallbackV2;
typedef std::function<void(Result, Consumer)> SubscribeCallback;
typedef std::function<void(std::variant<Consumer, Error>)> SubscribeCallbackV2;
typedef std::function<void(Result, Reader)> ReaderCallback;
typedef std::function<void(std::variant<Reader, Error>)> ReaderCallbackV2;
typedef std::function<void(Result, TableView)> TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;
Expand Down Expand Up @@ -108,7 +112,9 @@ class PULSAR_PUBLIC Client {
* @return ResultOk if the producer has been successfully created
* @return ResultError if there was an error
*/
Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer);
[[deprecated("use createProducerV2 instead")]] Result createProducer(const std::string& topic,
const ProducerConfiguration& conf,
Producer& producer);

/**
* Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific
Expand All @@ -118,7 +124,18 @@ class PULSAR_PUBLIC Client {
* @param callback the callback that is triggered when the producer is created successfully or not
* @param callback Callback function that is invoked when the operation is completed
*/
void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback);
[[deprecated("use createProducerAsyncV2 instead")]] void createProducerAsync(
const std::string& topic, const CreateProducerCallback& callback);

std::variant<Producer, Error> createProducerV2(const std::string& topic);

std::variant<Producer, Error> createProducerV2(const std::string& topic,
const ProducerConfiguration& conf);

void createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback);

void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
const CreateProducerCallbackV2& callback);

/**
* Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific
Expand Down Expand Up @@ -151,6 +168,11 @@ class PULSAR_PUBLIC Client {
Result subscribe(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer);

std::variant<Consumer, Error> subscribeV2(const std::string& topic, const std::string& subscriptionName);

std::variant<Consumer, Error> subscribeV2(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf);

/**
* Asynchronously subscribe to a given topic and subscription combination with the default
* ConsumerConfiguration
Expand All @@ -163,6 +185,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const SubscribeCallback& callback);

void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
const SubscribeCallbackV2& callback);

/**
* Asynchronously subscribe to a given topic and subscription combination with the customized
* ConsumerConfiguration
Expand All @@ -176,6 +201,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback);

void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback);

/**
* Subscribe to multiple topics under the same namespace.
*
Expand All @@ -197,6 +225,13 @@ class PULSAR_PUBLIC Client {
Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer);

std::variant<Consumer, Error> subscribeV2(const std::vector<std::string>& topics,
const std::string& subscriptionName);

std::variant<Consumer, Error> subscribeV2(const std::vector<std::string>& topics,
const std::string& subscriptionName,
const ConsumerConfiguration& conf);

/**
* Asynchronously subscribe to a list of topics and subscription combination using the default
ConsumerConfiguration
Expand All @@ -210,6 +245,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const SubscribeCallback& callback);

void subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
const SubscribeCallbackV2& callback);

/**
* Asynchronously subscribe to a list of topics and subscription combination using the customized
* ConsumerConfiguration
Expand All @@ -223,6 +261,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback);

void subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback);

/**
* Subscribe to multiple topics, which match given regexPattern, under the same namespace.
*/
Expand Down Expand Up @@ -291,6 +332,9 @@ class PULSAR_PUBLIC Client {
Result createReader(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, Reader& reader);

std::variant<Reader, Error> createReaderV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf);

/**
* Asynchronously create a topic reader with the customized ReaderConfiguration for reading messages from
* the specified topic.
Expand Down Expand Up @@ -320,6 +364,9 @@ class PULSAR_PUBLIC Client {
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallback& callback);

void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallbackV2& callback);

/**
* Create a table view with given {@code TableViewConfiguration} for specified topic.
*
Expand Down
14 changes: 14 additions & 0 deletions include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include <cstdint>
#include <iosfwd>
#include <string>
#include <utility>

namespace pulsar {

Expand Down Expand Up @@ -101,6 +103,18 @@ enum Result : int8_t
PULSAR_PUBLIC const char* strResult(Result result);

PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);

struct Error {
Error() = default;
Error(Result result) : result(result) {}
Error(Result result, std::string message) : result(result), message(std::move(message)) {}

operator Result() const { return result; }

Result result = ResultOk;
std::string message;
};

} // namespace pulsar

#endif /* ERROR_HPP_ */
27 changes: 13 additions & 14 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
-> LookupResultFuture {
LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic
<< ", redirect count: " << redirectCount);
auto promise = std::make_shared<Promise<Result, LookupResult>>();
auto promise = std::make_shared<Promise<Error, LookupResult>>();
if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) {
LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is "
<< maxLookupRedirects_);
Expand All @@ -62,7 +62,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise);
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount](
Result result, const LookupDataResultPtr& data) {
Error result, const LookupDataResultPtr& data) {
if (result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
promise->setFailed(result);
Expand All @@ -74,7 +74,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1)
.addListener([promise](Result result, const LookupResult& value) {
.addListener([promise](Error result, const LookupResult& value) {
if (result == ResultOk) {
promise->setValue(value);
} else {
Expand All @@ -100,7 +100,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
* @param topicName topic to get number of partitions.
*
*/
Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
Future<Error, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
const TopicNamePtr& topicName) {
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
if (!topicName) {
Expand Down Expand Up @@ -135,7 +135,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
std::placeholders::_2, clientCnx, promise));
}

void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result,
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Error result,
const LookupDataResultPtr& data,
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise) {
Expand All @@ -154,9 +154,9 @@ uint64_t BinaryProtoLookupService::newRequestId() {
return ++requestIdGenerator_;
}

Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
Future<Error, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result, NamespaceTopicsPtr>>();
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Error, NamespaceTopicsPtr>>();
if (!nsName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
Expand All @@ -168,9 +168,9 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}

Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
const std::string& version) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, SchemaInfo>>();
Future<Error, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
const std::string& version) {
GetSchemaPromisePtr promise = std::make_shared<Promise<Error, SchemaInfo>>();

if (!topicName) {
promise->setFailed(ResultInvalidTopicName);
Expand All @@ -197,7 +197,7 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName
<< " version: " << version);

conn->newGetSchema(topicName, version, requestId)
.addListener([promise](Result result, const SchemaInfo& schemaInfo) {
.addListener([promise](Error result, const SchemaInfo& schemaInfo) {
if (result != ResultOk) {
promise->setFailed(result);
return;
Expand Down Expand Up @@ -228,11 +228,10 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
std::placeholders::_1, std::placeholders::_2, promise));
}

void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result,
const NamespaceTopicsPtr& topicsPtr,
void BinaryProtoLookupService::getTopicsOfNamespaceListener(Error result, const NamespaceTopicsPtr& topicsPtr,
const NamespaceTopicsPromisePtr& promise) {
if (result != ResultOk) {
promise->setFailed(ResultLookupError);
promise->setFailed(Error{ResultLookupError, result.message});
return;
}

Expand Down
14 changes: 7 additions & 7 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Error, NamespaceTopicsPtr>>;
using GetSchemaPromisePtr = std::shared_ptr<Promise<Error, SchemaInfo>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
Expand All @@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

LookupResultFuture getBroker(const TopicName& topicName) override;

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
Future<Error, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;

Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }

Expand All @@ -75,7 +75,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise);

void handlePartitionMetadataLookup(const std::string& topicName, Result result,
void handlePartitionMetadataLookup(const std::string& topicName, Error result,
const LookupDataResultPtr& data,
const ClientConnectionWeakPtr& clientCnx,
const LookupDataResultPromisePtr& promise);
Expand All @@ -87,7 +87,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result,
const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise);

void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& topicsPtr,
void getTopicsOfNamespaceListener(Error result, const NamespaceTopicsPtr& topicsPtr,
const NamespaceTopicsPromisePtr& promise);

uint64_t newRequestId();
Expand Down
Loading
Loading