Search before reporting
Read release policy
User environment
Version: 4.0.8 via Helm chart pulsar-4.4.0 (but the bug is also present in current master)
Related: #25119 (different bug, same area — phase 2 __compaction cursor)
Issue Description
Summary
This bug was found because we update topic policies frequently in our pulsar cluster, and we noticed an accumulation of thousands of backlogged events in __change_events. Because compaction is triggered automatically when change events are added, we see this bug:
Compaction phase 2 fails every time with a ConnectException because the broker disconnects the compaction consumer as part of processing its own seek request, causing channelInactive to fire on the client and kill the in-flight seek future before the broker sends the success response. Since AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop has no retry logic for transient seek failures, every compaction attempt aborts. The __compaction subscription backlog grows without bound.
Affected Topics
Any topic with compaction enabled. Most visibly affects __change_events system topics because SystemTopic.isCompactionEnabled() hardcodes true and the effective compactionThreshold is 0 bytes, so compaction is triggered on any non-zero backlog. Frequent topic-level policy writes (each write appends a message to __change_events) cause compaction to be triggered and fail in a continuous loop.
Expected Behavior
Compaction phase 2 seeks the __compaction reader back to the start of the compacted range and reads forward, producing a new compacted
ledger.
Actual Behavior
Compaction fails on every attempt and the __compaction subscription backlog grows indefinitely.
Error messages
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.compaction.AbstractTwoPhaseCompactor - Commencing phase two of compaction for
persistent://my-tenant/my-namespace/__change_events-partition-2, from 1218818:0:2:-1 to 1330087:5:2:-1, compacting 12 keys to ledger
1341961
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.client.impl.ConsumerImpl -
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Seeking subscription to the message 1218818:0:2:-1
[pulsar-io-3-4] INFO o.a.p.broker.service.Consumer - Disconnecting consumer:
Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2,
name=__compaction}, consumerId=27241, consumerName=6QQBD, address=[id: 0x6da7169d, L:/10.10.x.x:6650 - R:/10.10.x.x:53634] [SR:10.10.x.x,
state:Connected]}
[pulsar-io-3-4] INFO o.a.p.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer
Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2,
name=__compaction}, ...}
[pulsar-io-3-4] INFO o.a.p.broker.service.persistent.PersistentSubscription -
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Successfully disconnected consumers from subscription,
proceeding with cursor reset
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.b.mledger.impl.ManagedCursorImpl -
[my-tenant/my-namespace/persistent/__change_events-partition-2] Initiate reset readPosition from 1330087:6 to 1218818:0 (ackSet is null)
on cursor __compaction
[pulsar-io-3-6] INFO o.a.p.client.impl.ConnectionHandler - [persistent://my-tenant/my-namespace/__change_events-partition-2]
[__compaction] Closed connection [id: 0xb3fcdb76, L:/10.10.x.x:53634 ! R:/10.10.x.x:6650] -- Will try again in 0.1 s, hostUrl: null
[pulsar-io-3-6] ERROR o.a.p.client.impl.ConsumerImpl - [persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction]
Failed to reset subscription: Disconnected from server at /10.10.x.x:6650
[broker-client-shared-internal-executor-5-1] WARN o.a.p.broker.service.persistent.PersistentTopic -
[persistent://my-tenant/my-namespace/__change_events-partition-2] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to seek the
subscription __compaction of the topic persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 1218818:0:2:-1
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
at org.apache.pulsar.client.impl.ConsumerImpl.failSeek(ConsumerImpl.java:2631)
~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$seekAsyncInternal$60(ConsumerImpl.java:2603)
~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
at org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:342)
~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to seek the subscription __compaction of the topic
persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 1218818:0:2:-1
Disconnected from server at /10.10.x.x:6650
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.b.mledger.impl.ManagedCursorImpl -
[my-tenant/my-namespace/persistent/__change_events-partition-2] reset readPosition to 1218818:0 (ackSet is null) before current read
readPosition 1330087:6 on cursor __compaction
[BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.broker.service.ServerCnx - [/10.10.x.x:53634]
[persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Reset subscription to message id 1218818:0 (ackSet is
null)
Reproducing the issue
- Create a namespace with frequent topic-level policy writes to cause
__change_events backlog to grow. For example, callingsetMaxUnackedMessagesPerConsumer per topic on every consumer restart. This is a real-world use case that triggered these findings.
- Observe compaction triggering automatically (threshold = 0 bytes for system topics) or trigger manually:
pulsar-admin topics compact persistent://my-tenant/my-namespace/__change_events
- Observe compaction failure in broker logs
Additional information
Root Cause
The sequence in PersistentSubscription.resetCursorInternal (PersistentSubscription.java:916) is:
- disconnect active consumers via
dispatcher.disconnectActiveConsumers(true)
- reset the managed cursor position via
ManagedCursorImpl.internalResetCursor
- send
commandSender.sendSuccessResponse(requestId) back in ServerCnx.handleSeek's thenRun.
Step 1 closes the consumer's Netty channel server-side, which fires channelInactive on the client (ClientCnx.java:328). channelInactive immediately fails all entries inpendingRequests with ConnectException (ClientCnx.java:341–344), including the seek request that triggered the reset. By the time the server sends the success response in step 3, the client has already failed the seek future and aborted compaction.
The disconnect in step 1 is necessary for correctness — the dispatcher may have messages in flight to the consumer that would be inconsistent with the new cursor position — so the server-side ordering cannot simply be reversed. The fix belongs on the compactor side.
AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop has no retry logic — a failed seekAsync propagates directly to whenComplete and aborts:
reader.seekAsync(from).thenCompose((v) -> {
// phase two loop
}).whenComplete((res, exception) -> {
if (exception != null) {
deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
promise.completeExceptionally(exception); // no retry
});
}
});
Potential Fix
The seek is idempotent and the ConnectException is always transient in this context — the broker disconnects the consumer as part of processing the seek, so by the time the consumer reconnects the cursor is already at the correct position. Retrying seekAsync with a short delay allows the consumer to reconnect, at which point the seek succeeds immediately. But I leave that up to the implementors.
Are you willing to submit a PR?
Search before reporting
Read release policy
User environment
Version: 4.0.8 via Helm chart
pulsar-4.4.0(but the bug is also present in current master)Related: #25119 (different bug, same area — phase 2 __compaction cursor)
Issue Description
Summary
This bug was found because we update topic policies frequently in our pulsar cluster, and we noticed an accumulation of thousands of backlogged events in
__change_events. Because compaction is triggered automatically when change events are added, we see this bug:Compaction phase 2 fails every time with a
ConnectExceptionbecause the broker disconnects the compaction consumer as part of processing its own seek request, causingchannelInactiveto fire on the client and kill the in-flight seek future before the broker sends the success response. SinceAbstractTwoPhaseCompactor.phaseTwoSeekThenLoophas no retry logic for transient seek failures, every compaction attempt aborts. The__compactionsubscription backlog grows without bound.Affected Topics
Any topic with compaction enabled. Most visibly affects
__change_eventssystem topics becauseSystemTopic.isCompactionEnabled()hardcodestrueand the effectivecompactionThresholdis0bytes, so compaction is triggered on any non-zero backlog. Frequent topic-level policy writes (each write appends a message to__change_events) cause compaction to be triggered and fail in a continuous loop.Expected Behavior
Compaction phase 2 seeks the
__compactionreader back to the start of the compacted range and reads forward, producing a new compactedledger.
Actual Behavior
Compaction fails on every attempt and the
__compactionsubscription backlog grows indefinitely.Error messages
Reproducing the issue
__change_eventsbacklog to grow. For example, callingsetMaxUnackedMessagesPerConsumerper topic on every consumer restart. This is a real-world use case that triggered these findings.pulsar-admin topics compact persistent://my-tenant/my-namespace/__change_eventsAdditional information
Root Cause
The sequence in
PersistentSubscription.resetCursorInternal(PersistentSubscription.java:916) is:dispatcher.disconnectActiveConsumers(true)ManagedCursorImpl.internalResetCursorcommandSender.sendSuccessResponse(requestId)back inServerCnx.handleSeek'sthenRun.Step 1 closes the consumer's Netty channel server-side, which fires
channelInactiveon the client (ClientCnx.java:328).channelInactiveimmediately fails all entries inpendingRequestswithConnectException(ClientCnx.java:341–344), including the seek request that triggered the reset. By the time the server sends the success response in step 3, the client has already failed the seek future and aborted compaction.The disconnect in step 1 is necessary for correctness — the dispatcher may have messages in flight to the consumer that would be inconsistent with the new cursor position — so the server-side ordering cannot simply be reversed. The fix belongs on the compactor side.
AbstractTwoPhaseCompactor.phaseTwoSeekThenLoophas no retry logic — a failedseekAsyncpropagates directly towhenCompleteand aborts:Potential Fix
The seek is idempotent and the ConnectException is always transient in this context — the broker disconnects the consumer as part of processing the seek, so by the time the consumer reconnects the cursor is already at the correct position. Retrying seekAsync with a short delay allows the consumer to reconnect, at which point the seek succeeds immediately. But I leave that up to the implementors.
Are you willing to submit a PR?