Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const ParameterClient = require('./lib/parameter_client.js');
const errors = require('./lib/errors.js');
const ParameterWatcher = require('./lib/parameter_watcher.js');
const ParameterEventHandler = require('./lib/parameter_event_handler.js');
const waitForMessage = require('./lib/wait_for_message.js');
const MessageIntrospector = require('./lib/message_introspector.js');
const MessageInfo = require('./lib/message_info.js');
const ObservableSubscription = require('./lib/observable_subscription.js');
Expand Down Expand Up @@ -466,6 +467,26 @@ let rcl = {
node.spinOnce(timeout);
},

/**
* Wait for a single message on a topic.
*
* Creates a temporary subscription, waits for the first message to arrive,
* and returns it. The temporary subscription is always cleaned up, even on
* timeout or error. The node must be spinning before calling this function.
*
* This is the rclnodejs equivalent of rclpy's `wait_for_message`.
*
* @param {function|string|object} typeClass - The ROS message type class.
* @param {Node} node - The node to create the temporary subscription on.
* @param {string} topic - The topic name to listen on.
* @param {object} [options] - Options.
* @param {number} [options.timeout] - Timeout in milliseconds. If omitted, waits indefinitely.
* @param {object} [options.qos] - QoS profile for the subscription.
* @returns {Promise<object>} - Resolves with the received message.
* @throws {Error} If timeout expires before a message arrives.
*/
waitForMessage: waitForMessage,

/**
* Shutdown an RCL environment identified by a context. The shutdown process will
* destroy all nodes and related resources in the context. If no context is
Expand Down
111 changes: 111 additions & 0 deletions lib/wait_for_message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2026, The Robot Web Tools Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const { TimeoutError } = require('./errors.js');

/**
* Wait for a single message on a topic.
*
* Creates a temporary subscription, waits for the first message to arrive,
* and returns it. The temporary subscription is always cleaned up, even on
* timeout or error. The node must be spinning before calling this function.
*
* This is the rclnodejs equivalent of rclpy's `wait_for_message`.
*
* @param {function|string|object} typeClass - The ROS message type class.
* @param {Node} node - The node to create the temporary subscription on.
* @param {string} topic - The topic name to listen on.
* @param {object} [options] - Options.
* @param {number} [options.timeout] - Timeout in milliseconds. If omitted, waits indefinitely.
* @param {object} [options.qos] - QoS profile for the subscription.
* @returns {Promise<object>} - Resolves with the received message.
* @throws {Error} If timeout expires before a message arrives.
*
* @example
* node.spin();
* const msg = await waitForMessage(
* 'std_msgs/msg/String',
* node,
* '/my_topic',
* { timeout: 5000 }
* );
* console.log('Received:', msg.data);
*/
function waitForMessage(typeClass, node, topic, options = {}) {
return new Promise((resolve, reject) => {
let subscription = null;
let timer = null;
let settled = false;

const cleanup = () => {
if (timer) {
clearTimeout(timer);
timer = null;
}
if (subscription) {
try {
node.destroySubscription(subscription);
} catch {
// Subscription may already be destroyed if node is shutting down
}
subscription = null;
}
};

const settle = (err, msg) => {
if (settled) return;
settled = true;
cleanup();
if (err) {
reject(err);
} else {
resolve(msg);
}
};

try {
const subOptions = {};
if (options.qos) {
subOptions.qos = options.qos;
}

subscription = node.createSubscription(
typeClass,
topic,
subOptions,
(msg) => {
settle(null, msg);
}
);

if (options.timeout != null && options.timeout >= 0) {
timer = setTimeout(() => {
settle(
new TimeoutError(
`waitForMessage timed out after ${options.timeout}ms on topic '${topic}'`,
{ entityType: 'topic', entityName: topic }
)
);
}, options.timeout);
Comment on lines +94 to +102
}
} catch (err) {
cleanup();
reject(err);
}
});
}

module.exports = waitForMessage;
180 changes: 180 additions & 0 deletions test/test-wait-for-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) 2026, The Robot Web Tools Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const assert = require('assert');
const rclnodejs = require('../index.js');

describe('waitForMessage tests', function () {
this.timeout(60 * 1000);

let node;

before(function () {
return rclnodejs.init();
});

after(function () {
rclnodejs.shutdown();
});

beforeEach(function () {
node = rclnodejs.createNode('wait_for_message_test_node');
node.spin();
});

afterEach(function () {
node.stop();
node.destroy();
});

it('should receive a message', async function () {
const publisher = node.createPublisher(
'std_msgs/msg/String',
'wfm_test_topic_1'
);

// Publish after a short delay
setTimeout(() => {
publisher.publish('hello waitForMessage');
}, 200);

const msg = await rclnodejs.waitForMessage(
'std_msgs/msg/String',
node,
'wfm_test_topic_1',
{ timeout: 5000 }
);

assert.strictEqual(msg.data, 'hello waitForMessage');
});

it('should timeout when no message arrives', async function () {
await assert.rejects(
() =>
rclnodejs.waitForMessage(
'std_msgs/msg/String',
node,
'wfm_nonexistent_topic',
{ timeout: 500 }
),
(error) => {
assert.strictEqual(error.name, 'TimeoutError');
return true;
}
);
});

it('should receive only the first message', async function () {
const publisher = node.createPublisher(
'std_msgs/msg/String',
'wfm_test_topic_2'
);

setTimeout(() => {
publisher.publish('first');
publisher.publish('second');
publisher.publish('third');
}, 200);

const msg = await rclnodejs.waitForMessage(
'std_msgs/msg/String',
node,
'wfm_test_topic_2',
{ timeout: 5000 }
);

assert.strictEqual(msg.data, 'first');
});

it('should work with different message types', async function () {
const publisher = node.createPublisher(
'std_msgs/msg/Int32',
'wfm_test_topic_3'
);

setTimeout(() => {
publisher.publish({ data: 42 });
}, 200);

const msg = await rclnodejs.waitForMessage(
'std_msgs/msg/Int32',
node,
'wfm_test_topic_3',
{ timeout: 5000 }
);

assert.strictEqual(msg.data, 42);
});

it('should wait indefinitely when no timeout is specified', async function () {
const publisher = node.createPublisher(
'std_msgs/msg/String',
'wfm_test_topic_4'
);

// Publish after a delay — should still be caught without timeout
setTimeout(() => {
publisher.publish('delayed message');
}, 500);

const msg = await rclnodejs.waitForMessage(
'std_msgs/msg/String',
node,
'wfm_test_topic_4'
);

assert.strictEqual(msg.data, 'delayed message');
});

it('should clean up subscription after receiving', async function () {
const publisher = node.createPublisher(
'std_msgs/msg/String',
'wfm_test_topic_5'
);

const subCountBefore = node._subscriptions.length;

setTimeout(() => {
publisher.publish('cleanup test');
}, 200);

await rclnodejs.waitForMessage(
'std_msgs/msg/String',
node,
'wfm_test_topic_5',
{ timeout: 5000 }
);

// Subscription should be cleaned up
assert.strictEqual(node._subscriptions.length, subCountBefore);
});

it('should clean up subscription on timeout', async function () {
const subCountBefore = node._subscriptions.length;

await assert.rejects(() =>
rclnodejs.waitForMessage(
'std_msgs/msg/String',
node,
'wfm_timeout_cleanup_topic',
{ timeout: 300 }
)
);

// Subscription should be cleaned up even on timeout
assert.strictEqual(node._subscriptions.length, subCountBefore);
});
});
32 changes: 32 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,38 @@ declare module 'rclnodejs' {
* @deprecated since 0.18.0, Use Node.spinOnce(timeout)*/
function spinOnce(node: Node, timeout?: number): void;

/**
* Options for waitForMessage.
*/
interface WaitForMessageOptions {
/** Timeout in milliseconds. If omitted, waits indefinitely. */
timeout?: number;
/** QoS profile for the temporary subscription. */
qos?: QoS;
}

/**
* Wait for a single message on a topic.
*
* Creates a temporary subscription, waits for the first message to arrive,
* and returns it. The node must be spinning before calling this function.
*
* This is the rclnodejs equivalent of rclpy's `wait_for_message`.
*
* @param typeClass - The ROS message type class.
* @param node - The node to create the temporary subscription on.
* @param topic - The topic name to listen on.
* @param options - Options including timeout and QoS.
* @returns Resolves with the received message.
* @throws Error if timeout expires before a message arrives.
*/
function waitForMessage<T extends TypeClass<MessageTypeClassName>>(
typeClass: T,
node: Node,
topic: string,
options?: WaitForMessageOptions
): Promise<MessageType<T>>;

/**
* Stop all activity, destroy all nodes and node components.
*
Expand Down
Loading