Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const errors = require('./lib/errors.js');
const ParameterWatcher = require('./lib/parameter_watcher.js');
const ParameterEventHandler = require('./lib/parameter_event_handler.js');
const MessageIntrospector = require('./lib/message_introspector.js');
const MessageInfo = require('./lib/message_info.js');
const ObservableSubscription = require('./lib/observable_subscription.js');
const { spawn } = require('child_process');
const {
Expand Down Expand Up @@ -247,6 +248,9 @@ let rcl = {
/** {@link ParameterEventHandler} class */
ParameterEventHandler: ParameterEventHandler,

/** {@link MessageInfo} class */
MessageInfo: MessageInfo,

/** {@link ObservableSubscription} class */
ObservableSubscription: ObservableSubscription,

Expand Down
94 changes: 94 additions & 0 deletions lib/message_info.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2026, The Robot Web Tools Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

/**
* @class MessageInfo
*
* Contains metadata about a received message, including timestamps,
* sequence numbers, and the publisher's globally unique identifier (GID).
*
* This is the rclnodejs equivalent of rclpy's MessageInfo.
* It is passed as the second argument to subscription callbacks when the
* callback accepts two parameters.
*
* @example
* node.createSubscription(
* 'std_msgs/msg/String',
* 'topic',
* (msg, messageInfo) => {
* console.log('Source timestamp:', messageInfo.sourceTimestamp);
* console.log('Received at:', messageInfo.receivedTimestamp);
* console.log('Publisher GID:', messageInfo.publisherGid);
* }
* );
*/
class MessageInfo {
/**
* Create a MessageInfo from a raw info object returned by the native layer.
*
* @param {object} rawInfo - Raw message info from rclTakeWithInfo
* @hideconstructor
*/
constructor(rawInfo) {
/**
* The timestamp when the message was published (nanoseconds since epoch).
* @type {bigint}
*/
this.sourceTimestamp = rawInfo.source_timestamp;

/**
* The timestamp when the message was received by the subscription (nanoseconds since epoch).
* @type {bigint}
*/
this.receivedTimestamp = rawInfo.received_timestamp;

/**
* The publication sequence number assigned by the publisher.
* @type {bigint}
*/
this.publicationSequenceNumber = rawInfo.publication_sequence_number;

/**
* The reception sequence number assigned by the subscriber.
* @type {bigint}
*/
this.receptionSequenceNumber = rawInfo.reception_sequence_number;

/**
* The globally unique identifier (GID) of the publisher.
* A Buffer containing the raw GID bytes.
* @type {Buffer}
*/
this.publisherGid = rawInfo.publisher_gid;
}

/**
* Convert to a plain object representation.
*
* @returns {object} Plain object with all metadata fields
*/
toPlainObject() {
return {
sourceTimestamp: this.sourceTimestamp,
receivedTimestamp: this.receivedTimestamp,
publicationSequenceNumber: this.publicationSequenceNumber,
receptionSequenceNumber: this.receptionSequenceNumber,
publisherGid: this.publisherGid,
};
}
}

module.exports = MessageInfo;
20 changes: 17 additions & 3 deletions lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const Rates = require('./rate.js');
const Service = require('./service.js');
const Subscription = require('./subscription.js');
const ObservableSubscription = require('./observable_subscription.js');
const MessageInfo = require('./message_info.js');
const TimeSource = require('./time_source.js');
const Timer = require('./timer.js');
const TypeDescriptionService = require('./type_description_service.js');
Expand Down Expand Up @@ -271,9 +272,22 @@ class Node extends rclnodejs.ShadowNode {
this._runWithMessageType(
subscription.typeClass,
(message, deserialize) => {
let success = rclnodejs.rclTake(subscription.handle, message);
if (success) {
subscription.processResponse(deserialize());
if (subscription.wantsMessageInfo) {
let rawInfo = rclnodejs.rclTakeWithInfo(
subscription.handle,
message
);
if (rawInfo) {
subscription.processResponse(
deserialize(),
new MessageInfo(rawInfo)
);
}
} else {
let success = rclnodejs.rclTake(subscription.handle, message);
if (success) {
subscription.processResponse(deserialize());
}
}
}
);
Expand Down
25 changes: 22 additions & 3 deletions lib/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,44 @@ class Subscription extends Entity {
this._isRaw = options.isRaw || false;
this._serializationMode = options.serializationMode || 'default';
this._node = node;
this._wantsMessageInfo = callback.length >= 2;

if (node && eventCallbacks) {
this._events = eventCallbacks.createEventHandlers(this.handle);
node._events.push(...this._events);
}
}

processResponse(msg) {
/**
* Whether this subscription's callback wants MessageInfo as a second argument.
* Determined by callback.length >= 2.
* @type {boolean}
* @readonly
*/
get wantsMessageInfo() {
return this._wantsMessageInfo;
}

processResponse(msg, messageInfo) {
debug(`Message of topic ${this._topic} received.`);
if (this._isRaw) {
this._callback(msg);
if (this._wantsMessageInfo && messageInfo) {
this._callback(msg, messageInfo);
} else {
this._callback(msg);
}
} else {
let message = msg.toPlainObject(this.typedArrayEnabled);

if (this._serializationMode !== 'default') {
message = applySerializationMode(message, this._serializationMode);
}

this._callback(message);
if (this._wantsMessageInfo && messageInfo) {
this._callback(message, messageInfo);
} else {
this._callback(message);
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions src/rcl_subscription_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <rcl/error_handling.h>
#include <rcl/rcl.h>
#include <rmw/types.h>

#include <cstdio>
#include <memory>
Expand Down Expand Up @@ -53,6 +54,53 @@ Napi::Value RclTake(const Napi::CallbackInfo& info) {
return env.Undefined();
}

Napi::Value RclTakeWithInfo(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();

RclHandle* subscription_handle =
RclHandle::Unwrap(info[0].As<Napi::Object>());
rcl_subscription_t* subscription =
reinterpret_cast<rcl_subscription_t*>(subscription_handle->ptr());
void* msg_taken = info[1].As<Napi::Buffer<char>>().Data();

rmw_message_info_t message_info = rmw_get_zero_initialized_message_info();
rcl_ret_t ret = rcl_take(subscription, msg_taken, &message_info, nullptr);

if (ret != RCL_RET_OK && ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
std::string error_string = rcl_get_error_string().str;
rcl_reset_error();
Napi::Error::New(env, error_string).ThrowAsJavaScriptException();
return env.Undefined();
}

if (ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
return env.Undefined();
}

// Build JS object with message info fields
Napi::Object js_info = Napi::Object::New(env);
js_info.Set("source_timestamp",
Napi::BigInt::New(env, message_info.source_timestamp));
js_info.Set("received_timestamp",
Napi::BigInt::New(env, message_info.received_timestamp));
js_info.Set(
"publication_sequence_number",
Napi::BigInt::New(
env, static_cast<int64_t>(message_info.publication_sequence_number)));
js_info.Set(
"reception_sequence_number",
Napi::BigInt::New(
env, static_cast<int64_t>(message_info.reception_sequence_number)));

// Publisher GID as Buffer
auto gid_buf =
Napi::Buffer<uint8_t>::Copy(env, message_info.publisher_gid.data,
sizeof(message_info.publisher_gid.data));
js_info.Set("publisher_gid", gid_buf);

return js_info;
}

Napi::Value CreateSubscription(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();

Expand Down Expand Up @@ -422,6 +470,7 @@ Napi::Value GetPublisherCount(const Napi::CallbackInfo& info) {

Napi::Object InitSubscriptionBindings(Napi::Env env, Napi::Object exports) {
exports.Set("rclTake", Napi::Function::New(env, RclTake));
exports.Set("rclTakeWithInfo", Napi::Function::New(env, RclTakeWithInfo));
exports.Set("createSubscription",
Napi::Function::New(env, CreateSubscription));
exports.Set("rclTakeRaw", Napi::Function::New(env, RclTakeRaw));
Expand Down
Loading
Loading