diff --git a/include/pulsar/AutoClusterFailover.h b/include/pulsar/AutoClusterFailover.h new file mode 100644 index 00000000..40c27ecd --- /dev/null +++ b/include/pulsar/AutoClusterFailover.h @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_ +#define PULSAR_AUTO_CLUSTER_FAILOVER_H_ + +#include + +#include + +namespace pulsar { + +class Client; +class AutoClusterFailoverImpl; + +class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider { + public: + struct Config { + ServiceInfo primary; + std::vector secondary; + std::chrono::milliseconds checkInterval{30000}; // 30 seconds + std::chrono::milliseconds failoverDelay{30000}; // 30 seconds + std::chrono::milliseconds switchBackDelay{60000}; // 60 seconds + }; + + /** + * Builder helps create an AutoClusterFailover configuration. + * + * Example: + * ServiceInfo primary{...}; + * std::vector secondaries{...}; + * AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries) + * .withCheckInterval(std::chrono::seconds(30)) + * .withFailoverDelay(std::chrono::seconds(30)) + * .withSwitchBackDelay(std::chrono::seconds(60)) + * .build(); + * + * Notes: + * - primary: the preferred cluster to use when available. + * - secondary: ordered list of fallback clusters. + * - checkInterval: frequency of health probes. + * - failoverDelay: how long the current cluster must be unreachable before switching. + * - switchBackDelay: how long the primary must remain healthy before switching back. + */ + class Builder { + public: + Builder(ServiceInfo primary, std::vector secondary) { + config_.primary = std::move(primary); + config_.secondary = std::move(secondary); + } + + // Set how frequently probes run against the active cluster(s). + Builder& withCheckInterval(std::chrono::milliseconds interval) { + config_.checkInterval = interval; + return *this; + } + + // Set how long the current cluster must be unreachable before attempting failover. + Builder& withFailoverDelay(std::chrono::milliseconds delay) { + config_.failoverDelay = delay; + return *this; + } + + // Set how long the primary must remain healthy before switching back from a secondary. + Builder& withSwitchBackDelay(std::chrono::milliseconds delay) { + config_.switchBackDelay = delay; + return *this; + } + + AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); } + + private: + Config config_; + }; + + explicit AutoClusterFailover(Config&& config); + + ~AutoClusterFailover() final; + + ServiceInfo initialServiceInfo() final; + + void initialize(std::function onServiceInfoUpdate) final; + + private: + std::shared_ptr impl_; +}; + +} // namespace pulsar + +#endif diff --git a/include/pulsar/ServiceInfo.h b/include/pulsar/ServiceInfo.h index 1f63ce38..23d3b6d5 100644 --- a/include/pulsar/ServiceInfo.h +++ b/include/pulsar/ServiceInfo.h @@ -32,6 +32,7 @@ namespace pulsar { */ class PULSAR_PUBLIC ServiceInfo final { public: + ServiceInfo() = default; // only for storing in containers like std::vector, not for public use ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(), std::optional tlsTrustCertsFilePath = std::nullopt); diff --git a/lib/AutoClusterFailover.cc b/lib/AutoClusterFailover.cc new file mode 100644 index 00000000..13f2405e --- /dev/null +++ b/lib/AutoClusterFailover.cc @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "ServiceURI.h" +#include "Url.h" +#include "lib/LogUtils.h" + +#ifdef USE_ASIO +#include +#include +#include +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#include "AsioDefines.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +class AutoClusterFailoverImpl : public std::enable_shared_from_this { + public: + AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) + : config_(std::move(config)), currentIndex_(0) { + clusters_.reserve(1 + config_.secondary.size()); + clusters_.emplace_back(config_.primary); + for (const auto& info : config_.secondary) { + clusters_.emplace_back(info); + } + } + + ~AutoClusterFailoverImpl() { + using namespace std::chrono_literals; + if (timer_) { + timer_->cancel(); + } + workGuard_.reset(); + if (future_.valid()) { + if (auto result = future_.wait_for(3s); result != std::future_status::ready) { + LOG_WARN("AutoClusterFailoverImpl is not stopped within 3 seconds, force stop it"); + ioContext_.stop(); + if (auto result = future_.wait_for(1s); result != std::future_status::ready) { + LOG_ERROR("Failed to stop AutoClusterFailoverImpl within 1 seconds after force stop"); + } + } + } + } + + auto primary() const noexcept { return config_.primary; } + + void initialize(std::function&& onServiceInfoUpdate) { + onServiceInfoUpdate_ = std::move(onServiceInfoUpdate); + workGuard_.emplace(ASIO::make_work_guard(ioContext_)); + timer_.emplace(ioContext_); + + auto weakSelf = weak_from_this(); + ASIO::post(ioContext_, [weakSelf] { + auto self = weakSelf.lock(); + if (self) { + self->onServiceInfoUpdate_(self->current()); + self->scheduleFailoverCheck(); + } + }); + + future_ = std::async(std::launch::async, [this] { ioContext_.run(); }); + } + + private: + static constexpr std::chrono::milliseconds probeTimeout_{30000}; + using CompletionCallback = std::function; + using ProbeCallback = std::function; + + struct ProbeContext { + ASIO::ip::tcp::resolver resolver; + ASIO::ip::tcp::socket socket; + ASIO::steady_timer timer; + ProbeCallback callback; + bool done{false}; + std::string hostUrl; + + ProbeContext(ASIO::io_context& ioContext, std::string hostUrl, ProbeCallback callback) + : resolver(ioContext), + socket(ioContext), + timer(ioContext), + callback(std::move(callback)), + hostUrl(std::move(hostUrl)) {} + }; + + AutoClusterFailover::Config config_; + std::vector clusters_; + size_t currentIndex_; + std::optional failedSince_; + std::optional recoveredSince_; + std::future future_; + ASIO::io_context ioContext_; + std::function onServiceInfoUpdate_; + + std::optional> workGuard_; + std::optional timer_; + + const ServiceInfo& current() const noexcept { return clusters_[currentIndex_]; } + + void scheduleFailoverCheck() { + if (!timer_) { + return; + } + timer_->expires_after(config_.checkInterval); + auto weakSelf = weak_from_this(); + timer_->async_wait([this, weakSelf](ASIO_ERROR error) { + if (error) { + LOG_INFO("Failover check timer is cancelled or failed: " << error.message()); + return; + } + auto self = weakSelf.lock(); + if (!self) { + LOG_INFO("AutoClusterFailoverImpl is destroyed, skip failover check"); + return; + } + executeFailoverCheck(); + }); + } + + void executeFailoverCheck() { + auto done = [weakSelf = weak_from_this()] { + if (auto self = weakSelf.lock()) { + self->scheduleFailoverCheck(); + } + }; + + if (currentIndex_ == 0) { + checkAndFailoverToSecondaryAsync(std::move(done)); + } else { + checkSecondaryAndPrimaryAsync(std::move(done)); + } + } + + static void completeProbe(const std::shared_ptr& context, bool success, + const ASIO_ERROR& error = ASIO_SUCCESS) { + if (context->done) { + return; + } + + context->done = true; + ASIO_ERROR ignored; + context->resolver.cancel(); + context->socket.close(ignored); + context->timer.cancel(ignored); + + if (error && error != ASIO::error::operation_aborted) { + LOG_DEBUG("Probe error for " << context->hostUrl << ": " << error.message()); + } + + auto callback = std::move(context->callback); + callback(success); + } + + void probeHostAsync(const std::string& hostUrl, ProbeCallback callback) { + Url parsedUrl; + if (!Url::parse(hostUrl, parsedUrl)) { + LOG_WARN("Failed to parse service URL for probing: " << hostUrl); + callback(false); + return; + } + + auto context = std::make_shared(ioContext_, hostUrl, std::move(callback)); + context->timer.expires_after(probeTimeout_); + context->timer.async_wait([context](const ASIO_ERROR& error) { + if (!error) { + completeProbe(context, false, ASIO::error::timed_out); + } + }); + + context->resolver.async_resolve( + parsedUrl.host(), std::to_string(parsedUrl.port()), + [context](const ASIO_ERROR& error, const ASIO::ip::tcp::resolver::results_type& endpoints) { + if (error) { + completeProbe(context, false, error); + return; + } + + ASIO::async_connect( + context->socket, endpoints, + [context](const ASIO_ERROR& connectError, const ASIO::ip::tcp::endpoint&) { + completeProbe(context, !connectError, connectError); + }); + }); + } + + void probeHostsAsync(const std::shared_ptr>& hosts, size_t index, + ProbeCallback callback) { + if (index >= hosts->size()) { + callback(false); + return; + } + + auto hostUrl = (*hosts)[index]; + probeHostAsync(hostUrl, [this, hosts, index, callback = std::move(callback)](bool available) mutable { + if (available) { + callback(true); + return; + } + probeHostsAsync(hosts, index + 1, std::move(callback)); + }); + } + + void probeAvailableAsync(const ServiceInfo& serviceInfo, ProbeCallback callback) { + try { + ServiceURI serviceUri{serviceInfo.serviceUrl()}; + auto hosts = std::make_shared>(serviceUri.getServiceHosts()); + if (hosts->empty()) { + callback(false); + return; + } + probeHostsAsync(hosts, 0, std::move(callback)); + } catch (const std::exception& e) { + LOG_WARN("Failed to probe service URL " << serviceInfo.serviceUrl() << ": " << e.what()); + callback(false); + } + } + + void switchTo(size_t index) { + if (currentIndex_ == index) { + return; + } + + LOG_INFO("Switch service URL from " << current().serviceUrl() << " to " + << clusters_[index].serviceUrl()); + currentIndex_ = index; + failedSince_.reset(); + recoveredSince_.reset(); + onServiceInfoUpdate_(current()); + } + + void probeSecondaryFrom(size_t index, CompletionCallback done) { + if (index >= clusters_.size()) { + done(); + return; + } + + auto weakSelf = weak_from_this(); + probeAvailableAsync(clusters_[index], + [weakSelf, index, done = std::move(done)](bool available) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (available) { + self->switchTo(index); + done(); + return; + } + + self->probeSecondaryFrom(index + 1, std::move(done)); + }); + } + + void checkAndFailoverToSecondaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + const auto now = std::chrono::steady_clock::now(); + if (primaryAvailable) { + self->failedSince_.reset(); + done(); + return; + } + + if (!self->failedSince_) { + self->failedSince_ = now; + done(); + return; + } + + if (now - *self->failedSince_ < self->config_.failoverDelay) { + done(); + return; + } + + self->probeSecondaryFrom(1, std::move(done)); + }); + } + + void checkSwitchBackToPrimaryAsync(CompletionCallback done, std::optional primaryAvailableHint) { + const auto now = std::chrono::steady_clock::now(); + auto handlePrimaryAvailable = [weakSelf = weak_from_this(), now, + done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (!primaryAvailable) { + self->recoveredSince_.reset(); + done(); + return; + } + + if (!self->recoveredSince_) { + self->recoveredSince_ = now; + done(); + return; + } + + if (now - *self->recoveredSince_ >= self->config_.switchBackDelay) { + self->switchTo(0); + } + done(); + }; + + if (primaryAvailableHint.has_value()) { + handlePrimaryAvailable(*primaryAvailableHint); + return; + } + + probeAvailableAsync(config_.primary, std::move(handlePrimaryAvailable)); + } + + void checkSecondaryAndPrimaryAsync(CompletionCallback done) { + auto weakSelf = weak_from_this(); + probeAvailableAsync(current(), [weakSelf, done = std::move(done)](bool secondaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + const auto now = std::chrono::steady_clock::now(); + if (secondaryAvailable) { + self->failedSince_.reset(); + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + if (!self->failedSince_) { + self->failedSince_ = now; + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + if (now - *self->failedSince_ < self->config_.failoverDelay) { + self->checkSwitchBackToPrimaryAsync(std::move(done), std::nullopt); + return; + } + + self->probeAvailableAsync(self->config_.primary, + [weakSelf, done = std::move(done)](bool primaryAvailable) mutable { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (primaryAvailable) { + self->switchTo(0); + done(); + return; + } + + self->checkSwitchBackToPrimaryAsync(std::move(done), false); + }); + }); + } +}; + +AutoClusterFailover::AutoClusterFailover(Config&& config) + : impl_(std::make_shared(std::move(config))) {} + +AutoClusterFailover::~AutoClusterFailover() {} + +ServiceInfo AutoClusterFailover::initialServiceInfo() { return impl_->primary(); } + +void AutoClusterFailover::initialize(std::function onServiceInfoUpdate) { + impl_->initialize(std::move(onServiceInfoUpdate)); +} + +} // namespace pulsar diff --git a/tests/ServiceInfoProviderTest.cc b/tests/ServiceInfoProviderTest.cc index 82f5f6f7..95521179 100644 --- a/tests/ServiceInfoProviderTest.cc +++ b/tests/ServiceInfoProviderTest.cc @@ -17,16 +17,20 @@ * under the License. */ #include +#include #include #include +#include #include #include #include #include +#include #include "PulsarFriend.h" #include "WaitUtils.h" +#include "lib/AsioDefines.h" #include "lib/LogUtils.h" DECLARE_LOG_OBJECT() @@ -34,6 +38,113 @@ DECLARE_LOG_OBJECT() using namespace pulsar; using namespace std::chrono_literals; +namespace { + +class ProbeTcpServer { + public: + ProbeTcpServer() { start(); } + + ~ProbeTcpServer() { stop(); } + + void start() { + if (running_) { + return; + } + + auto ioContext = std::unique_ptr(new ASIO::io_context); + auto acceptor = std::unique_ptr(new ASIO::ip::tcp::acceptor(*ioContext)); + ASIO::ip::tcp::endpoint endpoint{ASIO::ip::tcp::v4(), static_cast(port_)}; + acceptor->open(endpoint.protocol()); + acceptor->set_option(ASIO::ip::tcp::acceptor::reuse_address(true)); + acceptor->bind(endpoint); + acceptor->listen(); + + port_ = acceptor->local_endpoint().port(); + ioContext_ = std::move(ioContext); + acceptor_ = std::move(acceptor); + running_ = true; + + scheduleAccept(); + serverThread_ = std::thread([this] { ioContext_->run(); }); + } + + void stop() { + if (!running_.exchange(false)) { + return; + } + + ASIO::post(*ioContext_, [this] { + ASIO_ERROR ignored; + if (acceptor_ && acceptor_->is_open()) { + acceptor_->close(ignored); + } + }); + + if (serverThread_.joinable()) { + serverThread_.join(); + } + + acceptor_.reset(); + ioContext_.reset(); + } + + std::string getServiceUrl() const { return "pulsar://127.0.0.1:" + std::to_string(port_); } + + private: + void scheduleAccept() { + if (!running_ || !acceptor_ || !acceptor_->is_open()) { + return; + } + + auto socket = std::make_shared(*ioContext_); + acceptor_->async_accept(*socket, [this, socket](const ASIO_ERROR &error) { + if (!error) { + ASIO_ERROR ignored; + socket->close(ignored); + } + + if (running_ && acceptor_ && acceptor_->is_open()) { + scheduleAccept(); + } + }); + } + + int port_{0}; + std::atomic_bool running_{false}; + std::unique_ptr ioContext_; + std::unique_ptr acceptor_; + std::thread serverThread_; +}; + +class ServiceUrlObserver { + public: + void onUpdate(const ServiceInfo &serviceInfo) { + std::lock_guard lock(mutex_); + serviceUrls_.emplace_back(serviceInfo.serviceUrl()); + } + + size_t size() const { + std::lock_guard lock(mutex_); + return serviceUrls_.size(); + } + + std::string last() const { + std::lock_guard lock(mutex_); + return serviceUrls_.empty() ? std::string() : serviceUrls_.back(); + } + + std::vector snapshot() const { + std::lock_guard lock(mutex_); + return serviceUrls_; + } + + private: + mutable std::mutex mutex_; + std::vector serviceUrls_; +}; + +} // namespace + class ServiceInfoHolder { public: ServiceInfoHolder(ServiceInfo info) : serviceInfo_(std::move(info)) {} @@ -93,6 +204,83 @@ class TestServiceInfoProvider : public ServiceInfoProvider { mutable std::mutex mutex_; }; +TEST(AutoClusterFailoverTest, testFailoverToFirstAvailableSecondaryAfterDelay) { + try { + ProbeTcpServer availableSecondary; + ProbeTcpServer unavailableSecondary; + const auto primaryUrl = unavailableSecondary.getServiceUrl(); + unavailableSecondary.stop(); + + ProbeTcpServer skippedSecondary; + const auto skippedSecondaryUrl = skippedSecondary.getServiceUrl(); + skippedSecondary.stop(); + + const auto availableSecondaryUrl = availableSecondary.getServiceUrl(); + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(skippedSecondaryUrl), + ServiceInfo(availableSecondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverDelay(120ms) + .withSwitchBackDelay(120ms) + .build(); + + ASSERT_EQ(provider.initialServiceInfo().serviceUrl(), primaryUrl); + + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE(waitUntil(1s, [&observer] { return observer.size() >= 1; })); + ASSERT_EQ(observer.last(), primaryUrl); + ASSERT_FALSE(waitUntil( + 80ms, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + ASSERT_TRUE(waitUntil( + 2s, [&observer, &availableSecondaryUrl] { return observer.last() == availableSecondaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 2u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], availableSecondaryUrl); + } catch (const ASIO_SYSTEM_ERROR &e) { + GTEST_SKIP() << "Cannot bind local probe server in this environment: " << e.what(); + } +} + +TEST(AutoClusterFailoverTest, testSwitchBackToPrimaryAfterRecoveryDelay) { + try { + ProbeTcpServer primary; + const auto primaryUrl = primary.getServiceUrl(); + primary.stop(); + + ProbeTcpServer secondary; + const auto secondaryUrl = secondary.getServiceUrl(); + + ServiceUrlObserver observer; + AutoClusterFailover provider = + AutoClusterFailover::Builder(ServiceInfo(primaryUrl), {ServiceInfo(secondaryUrl)}) + .withCheckInterval(20ms) + .withFailoverDelay(80ms) + .withSwitchBackDelay(120ms) + .build(); + + provider.initialize([&observer](const ServiceInfo &serviceInfo) { observer.onUpdate(serviceInfo); }); + + ASSERT_TRUE(waitUntil(2s, [&observer, &secondaryUrl] { return observer.last() == secondaryUrl; })); + + primary.start(); + + ASSERT_FALSE(waitUntil(80ms, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + ASSERT_TRUE(waitUntil(2s, [&observer, &primaryUrl] { return observer.last() == primaryUrl; })); + + const auto updates = observer.snapshot(); + ASSERT_EQ(updates.size(), 3u); + ASSERT_EQ(updates[0], primaryUrl); + ASSERT_EQ(updates[1], secondaryUrl); + ASSERT_EQ(updates[2], primaryUrl); + } catch (const ASIO_SYSTEM_ERROR &e) { + GTEST_SKIP() << "Cannot bind local probe server in this environment: " << e.what(); + } +} + TEST(ServiceInfoProviderTest, testSwitchCluster) { extern std::string getToken(); // from tests/AuthTokenTest.cc // Access "private/auth" namespace in cluster 1