Skip to content

Commit d781707

Browse files
authored
feat: add async last message id support (#284)
1 parent 2704dd7 commit d781707

3 files changed

Lines changed: 32 additions & 1 deletion

File tree

pulsar/asyncio.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,15 @@ async def close(self) -> None:
399399
self._consumer.close_async(functools.partial(_set_future, future, value=None))
400400
await future
401401

402+
async def get_last_message_id(self) -> _pulsar.MessageId:
403+
"""
404+
Asynchronously get the last message id.
405+
"""
406+
future = asyncio.get_running_loop().create_future()
407+
self._consumer.get_last_message_id_async(functools.partial(_set_future, future))
408+
id = await future
409+
return id
410+
402411
def redeliver_unacknowledged_messages(self):
403412
"""
404413
Redelivers all the unacknowledged messages. In failover mode, the

src/consumer.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ MessageId Consumer_get_last_message_id(Consumer& consumer) {
106106
return msgId;
107107
}
108108

109+
void Consumer_get_last_message_id_async(Consumer& consumer, GetLastMessageIdCallback callback) {
110+
py::gil_scoped_release release;
111+
consumer.getLastMessageIdAsync(callback);
112+
}
113+
109114
void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
110115
py::gil_scoped_release release;
111116
consumer.receiveAsync(callback);
@@ -194,7 +199,8 @@ void export_consumer(py::module_& m) {
194199
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
195200
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
196201
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
197-
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
202+
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
203+
.def("get_last_message_id_async", &Consumer_get_last_message_id_async)
198204
.def("close_async", &Consumer_closeAsync)
199205
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
200206
.def("seek_async", &Consumer_seekAsync)

tests/asyncio_test.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
)
3434

3535
import pulsar # pylint: disable=import-error
36+
import _pulsar # pylint: disable=import-error
3637
from pulsar.asyncio import ( # pylint: disable=import-error
3738
Client,
3839
Consumer,
@@ -267,6 +268,21 @@ async def verify_receive(consumer: Consumer):
267268
await verify_receive(consumer)
268269
await consumer.close()
269270

271+
async def test_consumer_get_last_message_id(self):
272+
topic = f'asyncio-test-get-last-message-id-{time.time()}'
273+
sub = 'sub'
274+
consumer = await self._client.subscribe(topic, sub,
275+
consumer_type=pulsar.ConsumerType.Shared)
276+
producer = await self._client.create_producer(topic)
277+
for i in range(5):
278+
msg = f'msg-{i}'.encode()
279+
await producer.send(msg)
280+
last_msg_id = await consumer.get_last_message_id()
281+
assert isinstance(last_msg_id, _pulsar.MessageId)
282+
assert last_msg_id.entry_id() == i
283+
await consumer.acknowledge(last_msg_id)
284+
await consumer.close()
285+
270286
async def test_async_dead_letter_policy(self):
271287
topic = f'asyncio-test-dlq-{time.time()}'
272288
dlq_topic = 'dlq-' + topic

0 commit comments

Comments
 (0)