From e59266dbbc858c1fdaa029dabf8598e0d73d6e43 Mon Sep 17 00:00:00 2001 From: Minggang Wang Date: Tue, 17 Mar 2026 15:26:38 +0800 Subject: [PATCH 1/3] Expose MessageInfo metadata on subscription callbacks --- index.js | 4 + lib/message_info.js | 94 +++++++++++++++ lib/node.js | 28 ++++- lib/subscription.js | 25 +++- src/rcl_subscription_bindings.cpp | 49 ++++++++ test/test-message-info.js | 192 ++++++++++++++++++++++++++++++ types/index.d.ts | 1 + types/message_info.d.ts | 72 +++++++++++ types/subscription.d.ts | 9 +- 9 files changed, 465 insertions(+), 9 deletions(-) create mode 100644 lib/message_info.js create mode 100644 test/test-message-info.js create mode 100644 types/message_info.d.ts diff --git a/index.js b/index.js index ff9f23ee..118ceaed 100644 --- a/index.js +++ b/index.js @@ -65,6 +65,7 @@ const errors = require('./lib/errors.js'); const ParameterWatcher = require('./lib/parameter_watcher.js'); const ParameterEventHandler = require('./lib/parameter_event_handler.js'); const MessageIntrospector = require('./lib/message_introspector.js'); +const MessageInfo = require('./lib/message_info.js'); const ObservableSubscription = require('./lib/observable_subscription.js'); const { spawn } = require('child_process'); const { @@ -247,6 +248,9 @@ let rcl = { /** {@link ParameterEventHandler} class */ ParameterEventHandler: ParameterEventHandler, + /** {@link MessageInfo} class */ + MessageInfo: MessageInfo, + /** {@link ObservableSubscription} class */ ObservableSubscription: ObservableSubscription, diff --git a/lib/message_info.js b/lib/message_info.js new file mode 100644 index 00000000..3cd9afa3 --- /dev/null +++ b/lib/message_info.js @@ -0,0 +1,94 @@ +// Copyright (c) 2026, The Robot Web Tools Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +/** + * @class MessageInfo + * + * Contains metadata about a received message, including timestamps, + * sequence numbers, and the publisher's globally unique identifier (GID). + * + * This is the rclnodejs equivalent of rclpy's MessageInfo. + * It is passed as the second argument to subscription callbacks when the + * callback accepts two parameters. + * + * @example + * node.createSubscription( + * 'std_msgs/msg/String', + * 'topic', + * (msg, messageInfo) => { + * console.log('Source timestamp:', messageInfo.sourceTimestamp); + * console.log('Received at:', messageInfo.receivedTimestamp); + * console.log('Publisher GID:', messageInfo.publisherGid); + * } + * ); + */ +class MessageInfo { + /** + * Create a MessageInfo from a raw info object returned by the native layer. + * + * @param {object} rawInfo - Raw message info from rclTakeWithInfo + * @hideconstructor + */ + constructor(rawInfo) { + /** + * The timestamp when the message was published (nanoseconds since epoch). + * @type {bigint} + */ + this.sourceTimestamp = rawInfo.source_timestamp; + + /** + * The timestamp when the message was received by the subscription (nanoseconds since epoch). + * @type {bigint} + */ + this.receivedTimestamp = rawInfo.received_timestamp; + + /** + * The publication sequence number assigned by the publisher. + * @type {bigint} + */ + this.publicationSequenceNumber = rawInfo.publication_sequence_number; + + /** + * The reception sequence number assigned by the subscriber. + * @type {bigint} + */ + this.receptionSequenceNumber = rawInfo.reception_sequence_number; + + /** + * The globally unique identifier (GID) of the publisher. + * A Buffer containing the raw GID bytes. + * @type {Buffer} + */ + this.publisherGid = rawInfo.publisher_gid; + } + + /** + * Convert to a plain object representation. + * + * @returns {object} Plain object with all metadata fields + */ + toPlainObject() { + return { + sourceTimestamp: this.sourceTimestamp, + receivedTimestamp: this.receivedTimestamp, + publicationSequenceNumber: this.publicationSequenceNumber, + receptionSequenceNumber: this.receptionSequenceNumber, + publisherGid: this.publisherGid, + }; + } +} + +module.exports = MessageInfo; diff --git a/lib/node.js b/lib/node.js index 0bc2c89a..d2db640a 100644 --- a/lib/node.js +++ b/lib/node.js @@ -47,6 +47,7 @@ const Rates = require('./rate.js'); const Service = require('./service.js'); const Subscription = require('./subscription.js'); const ObservableSubscription = require('./observable_subscription.js'); +const MessageInfo = require('./message_info.js'); const TimeSource = require('./time_source.js'); const Timer = require('./timer.js'); const TypeDescriptionService = require('./type_description_service.js'); @@ -263,7 +264,13 @@ class Node extends rclnodejs.ShadowNode { if (subscription.isRaw) { let rawMessage = rclnodejs.rclTakeRaw(subscription.handle); if (rawMessage) { - subscription.processResponse(rawMessage); + if (subscription.wantsMessageInfo) { + // Re-take with info for raw subscriptions that want metadata + // Note: raw take doesn't support info, pass null + subscription.processResponse(rawMessage, null); + } else { + subscription.processResponse(rawMessage); + } } continue; } @@ -271,9 +278,22 @@ class Node extends rclnodejs.ShadowNode { this._runWithMessageType( subscription.typeClass, (message, deserialize) => { - let success = rclnodejs.rclTake(subscription.handle, message); - if (success) { - subscription.processResponse(deserialize()); + if (subscription.wantsMessageInfo) { + let rawInfo = rclnodejs.rclTakeWithInfo( + subscription.handle, + message + ); + if (rawInfo) { + subscription.processResponse( + deserialize(), + new MessageInfo(rawInfo) + ); + } + } else { + let success = rclnodejs.rclTake(subscription.handle, message); + if (success) { + subscription.processResponse(deserialize()); + } } } ); diff --git a/lib/subscription.js b/lib/subscription.js index 4a6b0d5f..38724779 100644 --- a/lib/subscription.js +++ b/lib/subscription.js @@ -45,6 +45,7 @@ class Subscription extends Entity { this._isRaw = options.isRaw || false; this._serializationMode = options.serializationMode || 'default'; this._node = node; + this._wantsMessageInfo = callback.length >= 2; if (node && eventCallbacks) { this._events = eventCallbacks.createEventHandlers(this.handle); @@ -52,10 +53,24 @@ class Subscription extends Entity { } } - processResponse(msg) { + /** + * Whether this subscription's callback wants MessageInfo as a second argument. + * Determined by callback.length >= 2. + * @type {boolean} + * @readonly + */ + get wantsMessageInfo() { + return this._wantsMessageInfo; + } + + processResponse(msg, messageInfo) { debug(`Message of topic ${this._topic} received.`); if (this._isRaw) { - this._callback(msg); + if (this._wantsMessageInfo && messageInfo) { + this._callback(msg, messageInfo); + } else { + this._callback(msg); + } } else { let message = msg.toPlainObject(this.typedArrayEnabled); @@ -63,7 +78,11 @@ class Subscription extends Entity { message = applySerializationMode(message, this._serializationMode); } - this._callback(message); + if (this._wantsMessageInfo && messageInfo) { + this._callback(message, messageInfo); + } else { + this._callback(message); + } } } diff --git a/src/rcl_subscription_bindings.cpp b/src/rcl_subscription_bindings.cpp index ea0de901..c7bdffd9 100644 --- a/src/rcl_subscription_bindings.cpp +++ b/src/rcl_subscription_bindings.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -53,6 +54,53 @@ Napi::Value RclTake(const Napi::CallbackInfo& info) { return env.Undefined(); } +Napi::Value RclTakeWithInfo(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + + RclHandle* subscription_handle = + RclHandle::Unwrap(info[0].As()); + rcl_subscription_t* subscription = + reinterpret_cast(subscription_handle->ptr()); + void* msg_taken = info[1].As>().Data(); + + rmw_message_info_t message_info = rmw_get_zero_initialized_message_info(); + rcl_ret_t ret = rcl_take(subscription, msg_taken, &message_info, nullptr); + + if (ret != RCL_RET_OK && ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) { + std::string error_string = rcl_get_error_string().str; + rcl_reset_error(); + Napi::Error::New(env, error_string).ThrowAsJavaScriptException(); + return env.Undefined(); + } + + if (ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED) { + return env.Undefined(); + } + + // Build JS object with message info fields + Napi::Object js_info = Napi::Object::New(env); + js_info.Set("source_timestamp", + Napi::BigInt::New(env, message_info.source_timestamp)); + js_info.Set("received_timestamp", + Napi::BigInt::New(env, message_info.received_timestamp)); + js_info.Set( + "publication_sequence_number", + Napi::BigInt::New( + env, static_cast(message_info.publication_sequence_number))); + js_info.Set( + "reception_sequence_number", + Napi::BigInt::New( + env, static_cast(message_info.reception_sequence_number))); + + // Publisher GID as Buffer + auto gid_buf = + Napi::Buffer::Copy(env, message_info.publisher_gid.data, + sizeof(message_info.publisher_gid.data)); + js_info.Set("publisher_gid", gid_buf); + + return js_info; +} + Napi::Value CreateSubscription(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); @@ -422,6 +470,7 @@ Napi::Value GetPublisherCount(const Napi::CallbackInfo& info) { Napi::Object InitSubscriptionBindings(Napi::Env env, Napi::Object exports) { exports.Set("rclTake", Napi::Function::New(env, RclTake)); + exports.Set("rclTakeWithInfo", Napi::Function::New(env, RclTakeWithInfo)); exports.Set("createSubscription", Napi::Function::New(env, CreateSubscription)); exports.Set("rclTakeRaw", Napi::Function::New(env, RclTakeRaw)); diff --git a/test/test-message-info.js b/test/test-message-info.js new file mode 100644 index 00000000..abd989c6 --- /dev/null +++ b/test/test-message-info.js @@ -0,0 +1,192 @@ +// Copyright (c) 2026, The Robot Web Tools Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const assert = require('assert'); +const rclnodejs = require('../index.js'); + +describe('MessageInfo tests', function () { + this.timeout(60 * 1000); + + let node; + + before(function () { + return rclnodejs.init(); + }); + + after(function () { + rclnodejs.shutdown(); + }); + + beforeEach(function () { + node = rclnodejs.createNode('message_info_test_node'); + }); + + afterEach(function () { + node.destroy(); + }); + + it('should receive MessageInfo when callback has 2 parameters', function (done) { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'mi_test_topic_1' + ); + + node.createSubscription( + 'std_msgs/msg/String', + 'mi_test_topic_1', + (msg, messageInfo) => { + assert.ok(messageInfo, 'messageInfo should be provided'); + assert.ok( + typeof messageInfo.sourceTimestamp === 'bigint', + 'sourceTimestamp should be a bigint' + ); + assert.ok( + typeof messageInfo.receivedTimestamp === 'bigint', + 'receivedTimestamp should be a bigint' + ); + assert.ok( + typeof messageInfo.publicationSequenceNumber === 'bigint', + 'publicationSequenceNumber should be a bigint' + ); + assert.ok( + typeof messageInfo.receptionSequenceNumber === 'bigint', + 'receptionSequenceNumber should be a bigint' + ); + assert.ok( + Buffer.isBuffer(messageInfo.publisherGid), + 'publisherGid should be a Buffer' + ); + assert.ok( + messageInfo.publisherGid.length > 0, + 'publisherGid should not be empty' + ); + done(); + } + ); + + rclnodejs.spin(node); + + setTimeout(() => { + publisher.publish('Hello MessageInfo'); + }, 200); + }); + + it('should NOT receive MessageInfo when callback has 1 parameter', function (done) { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'mi_test_topic_2' + ); + + node.createSubscription('std_msgs/msg/String', 'mi_test_topic_2', (msg) => { + assert.strictEqual(typeof msg, 'object'); + assert.strictEqual(msg.data, 'Hello no info'); + // msg should be the message, not MessageInfo + assert.ok(!msg.sourceTimestamp, 'should not have sourceTimestamp'); + done(); + }); + + rclnodejs.spin(node); + + setTimeout(() => { + publisher.publish('Hello no info'); + }, 200); + }); + + it('sourceTimestamp should be a positive value', function (done) { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'mi_test_topic_3' + ); + + node.createSubscription( + 'std_msgs/msg/String', + 'mi_test_topic_3', + (msg, messageInfo) => { + assert.ok( + messageInfo.sourceTimestamp > 0n, + 'sourceTimestamp should be positive' + ); + assert.ok( + messageInfo.receivedTimestamp > 0n, + 'receivedTimestamp should be positive' + ); + done(); + } + ); + + rclnodejs.spin(node); + + setTimeout(() => { + publisher.publish('Timestamp test'); + }, 200); + }); + + it('receivedTimestamp should be >= sourceTimestamp', function (done) { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'mi_test_topic_4' + ); + + node.createSubscription( + 'std_msgs/msg/String', + 'mi_test_topic_4', + (msg, messageInfo) => { + assert.ok( + messageInfo.receivedTimestamp >= messageInfo.sourceTimestamp, + 'receivedTimestamp should be >= sourceTimestamp' + ); + done(); + } + ); + + rclnodejs.spin(node); + + setTimeout(() => { + publisher.publish('Ordering test'); + }, 200); + }); + + it('toPlainObject should return all fields', function (done) { + const publisher = node.createPublisher( + 'std_msgs/msg/String', + 'mi_test_topic_5' + ); + + node.createSubscription( + 'std_msgs/msg/String', + 'mi_test_topic_5', + (msg, messageInfo) => { + const plain = messageInfo.toPlainObject(); + assert.ok(plain.sourceTimestamp !== undefined); + assert.ok(plain.receivedTimestamp !== undefined); + assert.ok(plain.publicationSequenceNumber !== undefined); + assert.ok(plain.receptionSequenceNumber !== undefined); + assert.ok(plain.publisherGid !== undefined); + done(); + } + ); + + rclnodejs.spin(node); + + setTimeout(() => { + publisher.publish('Plain object test'); + }, 200); + }); + + it('MessageInfo class should be exported', function () { + assert.ok(rclnodejs.MessageInfo); + }); +}); diff --git a/types/index.d.ts b/types/index.d.ts index 5dd91a72..c0dd56a7 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -3,6 +3,7 @@ /// /// /// +/// import { ChildProcess } from 'child_process'; diff --git a/types/message_info.d.ts b/types/message_info.d.ts new file mode 100644 index 00000000..05243662 --- /dev/null +++ b/types/message_info.d.ts @@ -0,0 +1,72 @@ +// Copyright (c) 2026, The Robot Web Tools Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +declare module 'rclnodejs' { + /** + * Contains metadata about a received message, including timestamps, + * sequence numbers, and the publisher's globally unique identifier (GID). + * + * Passed as the second argument to subscription callbacks when the + * callback accepts two parameters. + * + * @example + * ```typescript + * node.createSubscription( + * 'std_msgs/msg/String', + * 'topic', + * (msg: rclnodejs.std_msgs.msg.String, info: MessageInfo) => { + * console.log('Source timestamp:', info.sourceTimestamp); + * } + * ); + * ``` + */ + class MessageInfo { + /** + * The timestamp when the message was published (nanoseconds since epoch). + */ + readonly sourceTimestamp: bigint; + + /** + * The timestamp when the message was received by the subscription (nanoseconds since epoch). + */ + readonly receivedTimestamp: bigint; + + /** + * The publication sequence number assigned by the publisher. + */ + readonly publicationSequenceNumber: bigint; + + /** + * The reception sequence number assigned by the subscriber. + */ + readonly receptionSequenceNumber: bigint; + + /** + * The globally unique identifier (GID) of the publisher. + * A Buffer containing the raw GID bytes. + */ + readonly publisherGid: Buffer; + + /** + * Convert to a plain object representation. + */ + toPlainObject(): { + sourceTimestamp: bigint; + receivedTimestamp: bigint; + publicationSequenceNumber: bigint; + receptionSequenceNumber: bigint; + publisherGid: Buffer; + }; + } +} diff --git a/types/subscription.d.ts b/types/subscription.d.ts index 42a43af1..74246918 100644 --- a/types/subscription.d.ts +++ b/types/subscription.d.ts @@ -1,8 +1,11 @@ declare module 'rclnodejs' { /** * A callback for receiving published messages. + * If the callback accepts two parameters, the second will be a MessageInfo + * containing metadata about the received message. * * @param message - The published message. + * @param messageInfo - Optional metadata about the message (timestamps, publisher GID, etc). * * @remarks * See {@link Node#createSubscription | Node.createSubscription} @@ -13,7 +16,8 @@ declare module 'rclnodejs' { */ type SubscriptionCallback> = // * @param message - The published message - (message: MessageType) => void; + | ((message: MessageType) => void) + | ((message: MessageType, messageInfo: MessageInfo) => void); /** * A callback for receiving published raw messages. @@ -29,7 +33,8 @@ declare module 'rclnodejs' { */ type SubscriptionWithRawMessageCallback = // * @param message - The published raw message - (message: Buffer) => void; + | ((message: Buffer) => void) + | ((message: Buffer, messageInfo: MessageInfo) => void); /** * A ROS Subscription for published messages on a topic. From 8f68fb865bd260363fd7dc1a8acfd5572004b8e9 Mon Sep 17 00:00:00 2001 From: Minggang Wang Date: Tue, 17 Mar 2026 15:44:23 +0800 Subject: [PATCH 2/3] Address comments --- lib/node.js | 8 +------- types/subscription.d.ts | 9 +++++---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/lib/node.js b/lib/node.js index d2db640a..7f7f4011 100644 --- a/lib/node.js +++ b/lib/node.js @@ -264,13 +264,7 @@ class Node extends rclnodejs.ShadowNode { if (subscription.isRaw) { let rawMessage = rclnodejs.rclTakeRaw(subscription.handle); if (rawMessage) { - if (subscription.wantsMessageInfo) { - // Re-take with info for raw subscriptions that want metadata - // Note: raw take doesn't support info, pass null - subscription.processResponse(rawMessage, null); - } else { - subscription.processResponse(rawMessage); - } + subscription.processResponse(rawMessage); } continue; } diff --git a/types/subscription.d.ts b/types/subscription.d.ts index 74246918..5fdf7b82 100644 --- a/types/subscription.d.ts +++ b/types/subscription.d.ts @@ -16,13 +16,15 @@ declare module 'rclnodejs' { */ type SubscriptionCallback> = // * @param message - The published message - | ((message: MessageType) => void) - | ((message: MessageType, messageInfo: MessageInfo) => void); + (message: MessageType, messageInfo?: MessageInfo) => void; /** * A callback for receiving published raw messages. + * If the callback accepts a second parameter, it will receive a MessageInfo + * containing metadata about the received message. * * @param message - The published message. + * @param messageInfo - Optional metadata about the message. * * @remarks * See {@link Node#createSubscription | Node.createSubscription} @@ -33,8 +35,7 @@ declare module 'rclnodejs' { */ type SubscriptionWithRawMessageCallback = // * @param message - The published raw message - | ((message: Buffer) => void) - | ((message: Buffer, messageInfo: MessageInfo) => void); + (message: Buffer, messageInfo?: MessageInfo) => void; /** * A ROS Subscription for published messages on a topic. From 2c745faf1d05fb0fad5b3c3c6a6493a9e2bcd31a Mon Sep 17 00:00:00 2001 From: Minggang Wang Date: Tue, 17 Mar 2026 16:16:01 +0800 Subject: [PATCH 3/3] Fix ts test failure --- test/types/index.test-d.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 7147b811..f125b5c2 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -235,7 +235,10 @@ subscription = node.createSubscription( } ); -const rawMessageCallback = (message: Buffer) => { +const rawMessageCallback = ( + message: Buffer, + _messageInfo?: rclnodejs.MessageInfo +) => { const receivedRawMessage = message; }; expectType(rawMessageCallback);