From de0fec6ce3844a53df7047e11cc23ed88cde433e Mon Sep 17 00:00:00 2001 From: Minggang Wang Date: Wed, 18 Mar 2026 10:28:36 +0800 Subject: [PATCH 1/2] Add waitForMessage utility for one-shot message reception --- index.js | 21 ++++ lib/wait_for_message.js | 108 ++++++++++++++++++++ test/test-wait-for-message.js | 179 ++++++++++++++++++++++++++++++++++ types/index.d.ts | 32 ++++++ 4 files changed, 340 insertions(+) create mode 100644 lib/wait_for_message.js create mode 100644 test/test-wait-for-message.js diff --git a/index.js b/index.js index 118ceaed..0e677b84 100644 --- a/index.js +++ b/index.js @@ -64,6 +64,7 @@ const ParameterClient = require('./lib/parameter_client.js'); const errors = require('./lib/errors.js'); const ParameterWatcher = require('./lib/parameter_watcher.js'); const ParameterEventHandler = require('./lib/parameter_event_handler.js'); +const waitForMessage = require('./lib/wait_for_message.js'); const MessageIntrospector = require('./lib/message_introspector.js'); const MessageInfo = require('./lib/message_info.js'); const ObservableSubscription = require('./lib/observable_subscription.js'); @@ -466,6 +467,26 @@ let rcl = { node.spinOnce(timeout); }, + /** + * Wait for a single message on a topic. + * + * Creates a temporary subscription, waits for the first message to arrive, + * and returns it. The temporary subscription is always cleaned up, even on + * timeout or error. The node must be spinning before calling this function. + * + * This is the rclnodejs equivalent of rclpy's `wait_for_message`. + * + * @param {function|string|object} typeClass - The ROS message type class. + * @param {Node} node - The node to create the temporary subscription on. + * @param {string} topic - The topic name to listen on. + * @param {object} [options] - Options. + * @param {number} [options.timeout] - Timeout in milliseconds. If omitted, waits indefinitely. + * @param {object} [options.qos] - QoS profile for the subscription. + * @returns {Promise} - Resolves with the received message. + * @throws {Error} If timeout expires before a message arrives. + */ + waitForMessage: waitForMessage, + /** * Shutdown an RCL environment identified by a context. The shutdown process will * destroy all nodes and related resources in the context. If no context is diff --git a/lib/wait_for_message.js b/lib/wait_for_message.js new file mode 100644 index 00000000..5919a113 --- /dev/null +++ b/lib/wait_for_message.js @@ -0,0 +1,108 @@ +// Copyright (c) 2026, The Robot Web Tools Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +/** + * Wait for a single message on a topic. + * + * Creates a temporary subscription, waits for the first message to arrive, + * and returns it. The temporary subscription is always cleaned up, even on + * timeout or error. The node must be spinning before calling this function. + * + * This is the rclnodejs equivalent of rclpy's `wait_for_message`. + * + * @param {function|string|object} typeClass - The ROS message type class. + * @param {Node} node - The node to create the temporary subscription on. + * @param {string} topic - The topic name to listen on. + * @param {object} [options] - Options. + * @param {number} [options.timeout] - Timeout in milliseconds. If omitted, waits indefinitely. + * @param {object} [options.qos] - QoS profile for the subscription. + * @returns {Promise} - Resolves with the received message. + * @throws {Error} If timeout expires before a message arrives. + * + * @example + * node.spin(); + * const msg = await waitForMessage( + * 'std_msgs/msg/String', + * node, + * '/my_topic', + * { timeout: 5000 } + * ); + * console.log('Received:', msg.data); + */ +function waitForMessage(typeClass, node, topic, options = {}) { + return new Promise((resolve, reject) => { + let subscription = null; + let timer = null; + let settled = false; + + const cleanup = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + if (subscription) { + try { + node.destroySubscription(subscription); + } catch (e) { + // Subscription may already be destroyed if node is shutting down + } + subscription = null; + } + }; + + const settle = (err, msg) => { + if (settled) return; + settled = true; + cleanup(); + if (err) { + reject(err); + } else { + resolve(msg); + } + }; + + try { + const subOptions = {}; + if (options.qos) { + subOptions.qos = options.qos; + } + + subscription = node.createSubscription( + typeClass, + topic, + subOptions, + (msg) => { + settle(null, msg); + } + ); + + if (options.timeout != null && options.timeout >= 0) { + timer = setTimeout(() => { + settle( + new Error( + `waitForMessage timed out after ${options.timeout}ms on topic '${topic}'` + ) + ); + }, options.timeout); + } + } catch (err) { + cleanup(); + reject(err); + } + }); +} + +module.exports = waitForMessage; diff --git a/test/test-wait-for-message.js b/test/test-wait-for-message.js new file mode 100644 index 00000000..6fd3a6a4 --- /dev/null +++ b/test/test-wait-for-message.js @@ -0,0 +1,179 @@ +// Copyright (c) 2026, The Robot Web Tools Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const assert = require('assert'); +const rclnodejs = require('../index.js'); + +describe('waitForMessage tests', function () { + this.timeout(60 * 1000); + + let node; + + before(function () { + return rclnodejs.init(); + }); + + after(function () { + rclnodejs.shutdown(); + }); + + beforeEach(function () { + node = rclnodejs.createNode('wait_for_message_test_node'); + node.spin(); + }); + + afterEach(function () { + node.stop(); + node.destroy(); + }); + + it('should receive a message', async function () { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'wfm_test_topic_1' + ); + + // Publish after a short delay + setTimeout(() => { + publisher.publish('hello waitForMessage'); + }, 200); + + const msg = await rclnodejs.waitForMessage( + 'std_msgs/msg/String', + node, + 'wfm_test_topic_1', + { timeout: 5000 } + ); + + assert.strictEqual(msg.data, 'hello waitForMessage'); + }); + + it('should timeout when no message arrives', async function () { + await assert.rejects( + () => + rclnodejs.waitForMessage( + 'std_msgs/msg/String', + node, + 'wfm_nonexistent_topic', + { timeout: 500 } + ), + { message: /timed out/ } + ); + }); + + it('should receive only the first message', async function () { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'wfm_test_topic_2' + ); + + let receiveCount = 0; + + setTimeout(() => { + publisher.publish('first'); + publisher.publish('second'); + publisher.publish('third'); + }, 200); + + const msg = await rclnodejs.waitForMessage( + 'std_msgs/msg/String', + node, + 'wfm_test_topic_2', + { timeout: 5000 } + ); + + assert.strictEqual(msg.data, 'first'); + }); + + it('should work with different message types', async function () { + const publisher = node.createPublisher( + 'std_msgs/msg/Int32', + 'wfm_test_topic_3' + ); + + setTimeout(() => { + publisher.publish({ data: 42 }); + }, 200); + + const msg = await rclnodejs.waitForMessage( + 'std_msgs/msg/Int32', + node, + 'wfm_test_topic_3', + { timeout: 5000 } + ); + + assert.strictEqual(msg.data, 42); + }); + + it('should wait indefinitely when no timeout is specified', async function () { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'wfm_test_topic_4' + ); + + // Publish after a delay — should still be caught without timeout + setTimeout(() => { + publisher.publish('delayed message'); + }, 500); + + const msg = await rclnodejs.waitForMessage( + 'std_msgs/msg/String', + node, + 'wfm_test_topic_4' + ); + + assert.strictEqual(msg.data, 'delayed message'); + }); + + it('should clean up subscription after receiving', async function () { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'wfm_test_topic_5' + ); + + const subCountBefore = node._subscriptions.length; + + setTimeout(() => { + publisher.publish('cleanup test'); + }, 200); + + await rclnodejs.waitForMessage( + 'std_msgs/msg/String', + node, + 'wfm_test_topic_5', + { timeout: 5000 } + ); + + // Subscription should be cleaned up + assert.strictEqual(node._subscriptions.length, subCountBefore); + }); + + it('should clean up subscription on timeout', async function () { + const subCountBefore = node._subscriptions.length; + + await assert.rejects(() => + rclnodejs.waitForMessage( + 'std_msgs/msg/String', + node, + 'wfm_timeout_cleanup_topic', + { timeout: 300 } + ) + ); + + // Subscription should be cleaned up even on timeout + assert.strictEqual(node._subscriptions.length, subCountBefore); + }); +}); diff --git a/types/index.d.ts b/types/index.d.ts index c0dd56a7..3cbd160b 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -87,6 +87,38 @@ declare module 'rclnodejs' { * @deprecated since 0.18.0, Use Node.spinOnce(timeout)*/ function spinOnce(node: Node, timeout?: number): void; + /** + * Options for waitForMessage. + */ + interface WaitForMessageOptions { + /** Timeout in milliseconds. If omitted, waits indefinitely. */ + timeout?: number; + /** QoS profile for the temporary subscription. */ + qos?: QoS; + } + + /** + * Wait for a single message on a topic. + * + * Creates a temporary subscription, waits for the first message to arrive, + * and returns it. The node must be spinning before calling this function. + * + * This is the rclnodejs equivalent of rclpy's `wait_for_message`. + * + * @param typeClass - The ROS message type class. + * @param node - The node to create the temporary subscription on. + * @param topic - The topic name to listen on. + * @param options - Options including timeout and QoS. + * @returns Resolves with the received message. + * @throws Error if timeout expires before a message arrives. + */ + function waitForMessage>( + typeClass: T, + node: Node, + topic: string, + options?: WaitForMessageOptions + ): Promise>; + /** * Stop all activity, destroy all nodes and node components. * From 11d2700b7420dfb53d019dcf8e468e71f4273ac0 Mon Sep 17 00:00:00 2001 From: Minggang Wang Date: Wed, 18 Mar 2026 10:43:53 +0800 Subject: [PATCH 2/2] Address comments --- lib/wait_for_message.js | 9 ++++++--- test/test-wait-for-message.js | 7 ++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/wait_for_message.js b/lib/wait_for_message.js index 5919a113..1fe14231 100644 --- a/lib/wait_for_message.js +++ b/lib/wait_for_message.js @@ -14,6 +14,8 @@ 'use strict'; +const { TimeoutError } = require('./errors.js'); + /** * Wait for a single message on a topic. * @@ -56,7 +58,7 @@ function waitForMessage(typeClass, node, topic, options = {}) { if (subscription) { try { node.destroySubscription(subscription); - } catch (e) { + } catch { // Subscription may already be destroyed if node is shutting down } subscription = null; @@ -92,8 +94,9 @@ function waitForMessage(typeClass, node, topic, options = {}) { if (options.timeout != null && options.timeout >= 0) { timer = setTimeout(() => { settle( - new Error( - `waitForMessage timed out after ${options.timeout}ms on topic '${topic}'` + new TimeoutError( + `waitForMessage timed out after ${options.timeout}ms on topic '${topic}'`, + { entityType: 'topic', entityName: topic } ) ); }, options.timeout); diff --git a/test/test-wait-for-message.js b/test/test-wait-for-message.js index 6fd3a6a4..26eaf4ce 100644 --- a/test/test-wait-for-message.js +++ b/test/test-wait-for-message.js @@ -70,7 +70,10 @@ describe('waitForMessage tests', function () { 'wfm_nonexistent_topic', { timeout: 500 } ), - { message: /timed out/ } + (error) => { + assert.strictEqual(error.name, 'TimeoutError'); + return true; + } ); }); @@ -80,8 +83,6 @@ describe('waitForMessage tests', function () { 'wfm_test_topic_2' ); - let receiveCount = 0; - setTimeout(() => { publisher.publish('first'); publisher.publish('second');