diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index f52879e8..0c67411f 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -197,6 +197,11 @@ class PULSAR_PUBLIC Message { */ const std::string& getSchemaVersion() const; + /** + * Set the schema version of the message. + */ + void setSchemaVersion(const std::string& schemaVersion); + /** * Get the producer name which produced this message. * diff --git a/lib/Message.cc b/lib/Message.cc index 9505565b..df6cff9a 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -209,6 +209,12 @@ const std::string& Message::getSchemaVersion() const { return impl_->getSchemaVersion(); } +void Message::setSchemaVersion(const std::string& schemaVersion) { + if (impl_) { + impl_->metadata.set_schema_version(schemaVersion); + } +} + uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; } uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; } diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index bbcf4f07..7f50cf12 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -18,6 +18,8 @@ */ #pragma once +#include + #include "LookupDataResult.h" #include "LookupService.h" #include "NamespaceName.h" @@ -64,7 +66,7 @@ class RetryableLookupService : public LookupService { Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { return namespaceLookupCache_->run( - "get-topics-of-namespace-" + nsName->toString(), + "get-topics-of-namespace-" + nsName->toString() + "-" + std::to_string(mode), [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc index 02c4ce32..cca04602 100644 --- a/lib/c/c_Message.cc +++ b/lib/c/c_Message.cc @@ -141,6 +141,10 @@ int pulsar_message_has_schema_version(pulsar_message_t *message) { return message->message.hasSchemaVersion(); } +void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion) { + message->message.setSchemaVersion(schemaVersion ? schemaVersion : ""); +} + const char *pulsar_message_get_producer_name(pulsar_message_t *message) { return message->message.getProducerName().c_str(); }