From 47ec85ebc2fb301ff4dac4965f64fb7d41e49414 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Sun, 8 Mar 2026 18:12:09 +0800 Subject: [PATCH 1/5] fix log after client close --- lib/ConsumerImpl.cc | 15 ++++++++++++--- lib/ProducerImpl.cc | 11 ++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 757b6e84..6544ae60 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -186,7 +186,12 @@ ConsumerImpl::~ConsumerImpl() { // consumer seek, caused reconnection, if consumer close happened before connection ready, // then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in // broker. - LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed"); + auto client = client_.lock(); + if (client) { + LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed"); + } else { + LOG_DEBUG(consumerStr_ << "Destroyed consumer which was not properly closed (client already destroyed)"); + } ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -194,9 +199,13 @@ ConsumerImpl::~ConsumerImpl() { cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); - LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_); + LOG_DEBUG(consumerStr_ << "Closed consumer for race condition: " << consumerId_); } else { - LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); + if (client) { + LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); + } else { + LOG_DEBUG(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); + } } } internalShutdown(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index c9a16e8f..5c2e0793 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -118,7 +118,12 @@ ProducerImpl::~ProducerImpl() { internalShutdown(); printStats(); if (state_ == Ready || state_ == Pending) { - LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); + auto client = client_.lock(); + if (client) { + LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); + } else { + LOG_DEBUG(producerStr_ << "Destroyed producer which was not properly closed (client already destroyed)"); + } } } @@ -753,10 +758,10 @@ void ProducerImpl::sendMessage(std::unique_ptr opSendMsg) { void ProducerImpl::printStats() { if (batchMessageContainer_) { - LOG_INFO("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_ + LOG_DEBUG("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_ << "]"); } else { - LOG_INFO("Producer - " << producerStr_ << ", [batching = off]"); + LOG_DEBUG("Producer - " << producerStr_ << ", [batching = off]"); } } From 5c4bb9c611a134670049ca645f954a5242e3b655 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Sun, 8 Mar 2026 18:50:09 +0800 Subject: [PATCH 2/5] format --- lib/ConsumerImpl.cc | 3 ++- lib/ProducerImpl.cc | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 6544ae60..4834d85f 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -190,7 +190,8 @@ ConsumerImpl::~ConsumerImpl() { if (client) { LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed"); } else { - LOG_DEBUG(consumerStr_ << "Destroyed consumer which was not properly closed (client already destroyed)"); + LOG_DEBUG(consumerStr_ + << "Destroyed consumer which was not properly closed (client already destroyed)"); } ClientConnectionPtr cnx = getCnx().lock(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 5c2e0793..1b2acfce 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -122,7 +122,8 @@ ProducerImpl::~ProducerImpl() { if (client) { LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); } else { - LOG_DEBUG(producerStr_ << "Destroyed producer which was not properly closed (client already destroyed)"); + LOG_DEBUG(producerStr_ + << "Destroyed producer which was not properly closed (client already destroyed)"); } } } @@ -759,7 +760,7 @@ void ProducerImpl::sendMessage(std::unique_ptr opSendMsg) { void ProducerImpl::printStats() { if (batchMessageContainer_) { LOG_DEBUG("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_ - << "]"); + << "]"); } else { LOG_DEBUG("Producer - " << producerStr_ << ", [batching = off]"); } From e7562cb786a1354d6fcd1c72219b2079731af802 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 10 Mar 2026 08:34:21 +0800 Subject: [PATCH 3/5] no log --- lib/BatchMessageContainer.cc | 6 +----- lib/BatchMessageKeyBasedContainer.cc | 6 +----- lib/ConsumerImpl.cc | 12 ++++-------- lib/ProducerImpl.cc | 12 ++++-------- 4 files changed, 10 insertions(+), 26 deletions(-) diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index cd7ddc85..360303bc 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -30,11 +30,7 @@ namespace pulsar { BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer) : BatchMessageContainerBase(producer) {} -BatchMessageContainer::~BatchMessageContainer() { - LOG_DEBUG(*this << " destructed"); - LOG_DEBUG("[numberOfBatchesSent = " << numberOfBatchesSent_ - << "] [averageBatchSize_ = " << averageBatchSize_ << "]"); -} +BatchMessageContainer::~BatchMessageContainer() {} bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 5b181843..dc035c2b 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -40,11 +40,7 @@ inline std::string getKey(const Message& msg) { BatchMessageKeyBasedContainer::BatchMessageKeyBasedContainer(const ProducerImpl& producer) : BatchMessageContainerBase(producer) {} -BatchMessageKeyBasedContainer::~BatchMessageKeyBasedContainer() { - LOG_DEBUG(*this << " destructed"); - LOG_INFO("[numberOfBatchesSent = " << numberOfBatchesSent_ - << "] [averageBatchSize_ = " << averageBatchSize_ << "]"); -} +BatchMessageKeyBasedContainer::~BatchMessageKeyBasedContainer() {} bool BatchMessageKeyBasedContainer::isFirstMessageToAdd(const Message& msg) const { auto it = batches_.find(getKey(msg)); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 4834d85f..5658c423 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -180,18 +180,14 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic } ConsumerImpl::~ConsumerImpl() { - LOG_DEBUG(consumerStr_ << "~ConsumerImpl"); + auto client = client_.lock(); if (state_ == Ready) { // this could happen at least in this condition: // consumer seek, caused reconnection, if consumer close happened before connection ready, // then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in // broker. - auto client = client_.lock(); if (client) { LOG_WARN(consumerStr_ << "Destroyed consumer which was not properly closed"); - } else { - LOG_DEBUG(consumerStr_ - << "Destroyed consumer which was not properly closed (client already destroyed)"); } ClientConnectionPtr cnx = getCnx().lock(); @@ -200,12 +196,12 @@ ConsumerImpl::~ConsumerImpl() { cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); - LOG_DEBUG(consumerStr_ << "Closed consumer for race condition: " << consumerId_); + if (client) { + LOG_DEBUG(consumerStr_ << "Closed consumer for race condition: " << consumerId_); + } } else { if (client) { LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); - } else { - LOG_DEBUG(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); } } } diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 1b2acfce..13e4052f 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -114,16 +114,12 @@ ProducerImpl::ProducerImpl(const ClientImplPtr& client, const TopicName& topicNa } ProducerImpl::~ProducerImpl() { - LOG_DEBUG(producerStr_ << "~ProducerImpl"); + auto client = client_.lock(); internalShutdown(); - printStats(); - if (state_ == Ready || state_ == Pending) { - auto client = client_.lock(); - if (client) { + if (client) { + printStats(); + if (state_ == Ready || state_ == Pending) { LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); - } else { - LOG_DEBUG(producerStr_ - << "Destroyed producer which was not properly closed (client already destroyed)"); } } } From d0639c92a5c1ce650ec3bbee4a13990a8f411f82 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 10 Mar 2026 09:36:22 +0800 Subject: [PATCH 4/5] no log --- lib/ConsumerImpl.cc | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 5658c423..803bb4fa 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -196,13 +196,6 @@ ConsumerImpl::~ConsumerImpl() { cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); - if (client) { - LOG_DEBUG(consumerStr_ << "Closed consumer for race condition: " << consumerId_); - } - } else { - if (client) { - LOG_WARN(consumerStr_ << "Client is destroyed and cannot send the CloseConsumer command"); - } } } internalShutdown(); From 74b25c59175beb36ac7dc6cc121a2b498107ace9 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 11 Mar 2026 14:16:17 +0800 Subject: [PATCH 5/5] fix --- lib/ProducerImpl.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 13e4052f..2bb5ed78 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -117,7 +117,6 @@ ProducerImpl::~ProducerImpl() { auto client = client_.lock(); internalShutdown(); if (client) { - printStats(); if (state_ == Ready || state_ == Pending) { LOG_WARN(producerStr_ << "Destroyed producer which was not properly closed"); } @@ -755,10 +754,10 @@ void ProducerImpl::sendMessage(std::unique_ptr opSendMsg) { void ProducerImpl::printStats() { if (batchMessageContainer_) { - LOG_DEBUG("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_ + LOG_INFO("Producer - " << producerStr_ << ", [batchMessageContainer = " << *batchMessageContainer_ << "]"); } else { - LOG_DEBUG("Producer - " << producerStr_ << ", [batching = off]"); + LOG_INFO("Producer - " << producerStr_ << ", [batching = off]"); } }