diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AsyncVoidGuard.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AsyncVoidGuard.cs
new file mode 100644
index 000000000..0be86025a
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AsyncVoidGuard.cs
@@ -0,0 +1,59 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System;
+using System.Threading.Tasks;
+
+namespace MTConnect
+{
+ ///
+ /// Centralises the MqttRelay agent-module async void
+ /// event-handler safety policy. An async void method that
+ /// throws routes its exception to the synchronization context,
+ /// which on the ThreadPool tears down the host process. The
+ /// MqttRelay handlers run on broker-raised events, so without a
+ /// top-level guard a formatter throw (DataItem null-deref) or a
+ /// synchronous broker call (broker shutting down) crashed the
+ /// agent.
+ ///
+ /// Wrapping every handler body in guarantees that
+ /// any exception, synchronous or async, is captured and routed to
+ /// a logging callback rather than escaping to the synchronization
+ /// context. The guard is also unit-testable in isolation, so the
+ /// policy is pinned away from the (hard-to-mock) Module class.
+ ///
+ public static class AsyncVoidGuard
+ {
+ ///
+ /// Awaits the supplied handler body and routes any exception
+ /// to the logging callback. Never rethrows: the contract is
+ /// "an async-void path must not crash the host". A null body
+ /// is a no-op and a null or throwing logger is tolerated.
+ ///
+ ///
+ /// The handler implementation. May be null.
+ ///
+ ///
+ /// Logger callback invoked with any exception thrown
+ /// synchronously by or surfaced through
+ /// its . May be null.
+ ///
+ public static async Task Run(Func body, Action onFault)
+ {
+ if (body == null) return;
+
+ try
+ {
+ var task = body();
+ if (task != null) await task.ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ if (onFault != null)
+ {
+ try { onFault(ex); } catch { }
+ }
+ }
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AvailabilityTopic.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AvailabilityTopic.cs
new file mode 100644
index 000000000..bbdae8cb0
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AvailabilityTopic.cs
@@ -0,0 +1,88 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+namespace MTConnect
+{
+ ///
+ /// Builds the MQTT topic on which the MqttRelay agent module
+ /// publishes the agent's Availability state (the MQTT Last Will and
+ /// Testament topic plus the on-connect retained Available message).
+ ///
+ public static class AvailabilityTopic
+ {
+ ///
+ /// Constant topic segment that separates the agent-availability
+ /// publishes from the document-envelope publishes (Probe /
+ /// Current / Sample / Asset). Subscribers wildcarding on
+ /// {TopicPrefix}/Probe/# therefore never receive the raw
+ /// availability payload.
+ ///
+ public const string AgentSegment = "Agent";
+
+ ///
+ /// Constant trailing topic segment that names the availability
+ /// publish.
+ ///
+ public const string AvailableSegment = "Available";
+
+ ///
+ /// Builds the full MQTT topic the MqttRelay module uses to
+ /// publish the agent's Availability state. Returns
+ /// null when either input is null or empty, or when
+ /// either input contains an MQTT-reserved character so callers
+ /// can short-circuit before configuring an MQTT publish.
+ ///
+ ///
+ /// The configured TopicPrefix value, e.g.
+ /// MTConnect or MTConnect/Document. Leading and
+ /// trailing / separators are stripped so the resulting
+ /// topic stays canonical.
+ ///
+ ///
+ /// The agent's Uuid identifier. The agentUuid is a
+ /// single topic segment so it must not contain /.
+ ///
+ ///
+ /// Per MQTT 3.1.1 OASIS standard section 4.7.1.1, MQTT topic
+ /// names must not contain the wildcard characters + or
+ /// # nor the null character \0; supplying any of
+ /// those characters in either input produces a null
+ /// return.
+ ///
+ public static string Build(string topicPrefix, string agentUuid)
+ {
+ if (string.IsNullOrEmpty(topicPrefix)) return null;
+ if (string.IsNullOrEmpty(agentUuid)) return null;
+
+ // MQTT 3.1.1 ยง4.7.1.1: '+' and '#' are wildcard characters
+ // and the null character is reserved. None of them are
+ // legal inside a topic name.
+ if (ContainsReservedCharacter(topicPrefix)) return null;
+ if (ContainsReservedCharacter(agentUuid)) return null;
+
+ // The agentUuid is a single topic segment so it must not
+ // embed a '/'; otherwise the {TopicPrefix}/Agent/{AgentUuid}
+ // /Available shape silently fragments across additional
+ // segments.
+ if (agentUuid.IndexOf('/') >= 0) return null;
+
+ // Canonicalise the prefix: strip a leading or trailing '/'
+ // so the resulting topic does not begin with '/' nor
+ // contain a stray empty segment.
+ var canonicalPrefix = topicPrefix.Trim('/');
+ if (canonicalPrefix.Length == 0) return null;
+
+ return $"{canonicalPrefix}/{AgentSegment}/{agentUuid}/{AvailableSegment}";
+ }
+
+ private static bool ContainsReservedCharacter(string value)
+ {
+ for (var i = 0; i < value.Length; i++)
+ {
+ var c = value[i];
+ if (c == '+' || c == '#' || c == '\0') return true;
+ }
+ return false;
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequencePersister.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequencePersister.cs
new file mode 100644
index 000000000..26630b782
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequencePersister.cs
@@ -0,0 +1,92 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System;
+using System.Threading;
+
+namespace MTConnect
+{
+ ///
+ /// Tracks the MqttRelay last-sent observation sequence in memory
+ /// and flushes to durable storage only when explicitly asked.
+ /// Module.cs previously wrote to disk synchronously from every
+ /// successful observation publish; on a high-rate stream that put
+ /// a disk write on every ThreadPool callback. This persister
+ /// decouples the event-handler hot path from disk IO: handlers
+ /// call , and a timer / shutdown / batch
+ /// boundary calls .
+ ///
+ /// All read/write of the value field uses
+ /// so the persister is safe to share
+ /// between event-handler threads and a flush timer thread on 32-
+ /// bit hosts where bare 64-bit access can tear.
+ ///
+ public sealed class LastSentSequencePersister
+ {
+ private long _value;
+ private int _dirty; // 0 = clean, 1 = dirty (Interlocked-managed)
+
+ ///
+ /// Returns the current in-memory sequence value with an
+ /// atomic 64-bit read. Does not clear the dirty bit; only
+ /// establishes durable persistence.
+ ///
+ public ulong Read()
+ {
+ return unchecked((ulong)Interlocked.Read(ref _value));
+ }
+
+ ///
+ /// Whether the in-memory value has been modified since the
+ /// last successful flush. The flush timer can use this to
+ /// skip writes when the relay is idle.
+ ///
+ public bool IsDirty
+ {
+ get { return Interlocked.CompareExchange(ref _dirty, 0, 0) != 0; }
+ }
+
+ ///
+ /// Records a new last-sent sequence value (last write wins)
+ /// and marks the persister dirty so the next flush will
+ /// emit a write.
+ ///
+ public void Update(ulong value)
+ {
+ Interlocked.Exchange(ref _value, unchecked((long)value));
+ Interlocked.Exchange(ref _dirty, 1);
+ }
+
+ ///
+ /// Seeds the in-memory value from durable storage at startup
+ /// without marking the persister dirty. The initial value
+ /// already matches disk so a flush would be redundant.
+ ///
+ public void Initialize(ulong value)
+ {
+ Interlocked.Exchange(ref _value, unchecked((long)value));
+ Interlocked.Exchange(ref _dirty, 0);
+ }
+
+ ///
+ /// Emits a write through if and only
+ /// if the persister is dirty, then clears the dirty bit. If
+ /// the writer throws, the dirty bit is preserved and the
+ /// exception propagates so the caller can log and retry on
+ /// the next tick. Returns true when a write was
+ /// actually emitted.
+ ///
+ public bool TryFlush(Action write)
+ {
+ if (write == null) return false;
+ if (Interlocked.CompareExchange(ref _dirty, 0, 0) == 0) return false;
+
+ var snapshot = Read();
+ // Clear dirty *after* a successful write so that a thrown
+ // writer leaves the persister dirty for the next attempt.
+ write(snapshot);
+ Interlocked.Exchange(ref _dirty, 0);
+ return true;
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequenceTracker.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequenceTracker.cs
new file mode 100644
index 000000000..e66ba60b0
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequenceTracker.cs
@@ -0,0 +1,44 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System.Threading;
+
+namespace MTConnect
+{
+ ///
+ /// Centralises atomic 64-bit access to the MqttRelay last-sent
+ /// sequence counter. Module.cs previously read and wrote a bare
+ /// field. On 32-bit hosts a 64-bit field is read
+ /// and written as two 32-bit halves and a concurrent reader can
+ /// observe a torn value. MqttRelay reads the counter from
+ /// observation event handlers (multiple ThreadPool threads) while
+ /// the durable-relay Worker writes it, so the access pattern was
+ /// genuinely racy. The tracker wraps every read/write in
+ /// primitives so the policy is enforced
+ /// uniformly and is unit-testable.
+ ///
+ public sealed class LastSentSequenceTracker
+ {
+ private long _value;
+
+ ///
+ /// Returns the current sequence value with an atomic 64-bit
+ /// read. Round-trips the full range,
+ /// including values whose bit pattern is negative when
+ /// reinterpreted as a signed .
+ ///
+ public ulong Read()
+ {
+ return unchecked((ulong)Interlocked.Read(ref _value));
+ }
+
+ ///
+ /// Sets the current sequence value with an atomic 64-bit write.
+ /// Last write wins; this is not a max-watermark.
+ ///
+ public void Write(ulong value)
+ {
+ Interlocked.Exchange(ref _value, unchecked((long)value));
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs
index 06411a474..6a4f215a6 100644
--- a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs
@@ -37,7 +37,13 @@ public class Module : MTConnectAgentModule
private CancellationTokenSource _stop;
private static readonly object _lastSentSequenceLock = new object();
private long _totalIncomingObservations = 0;
- private long _lastSentSequence = 0;
+ // In-memory persister: tracks the last-sent sequence under
+ // DurableRelay and flushes to disk only on a timer, on
+ // shutdown, and at batch boundaries (rather than on every
+ // observation, which serialised the relay behind disk IO).
+ private readonly LastSentSequencePersister _lastSentSequencePersister = new LastSentSequencePersister();
+ private System.Threading.Timer _lastSentFlushTimer;
+ private static readonly TimeSpan LastSentFlushInterval = TimeSpan.FromSeconds(1);
private const string DirectoryBuffer = "buffer";
private const string LastSentSequenceFileName = "mqttrelay_last_sent.seq";
@@ -77,23 +83,61 @@ protected override void OnStartAfterLoad(bool initializeDataItems)
{
_stop = new CancellationTokenSource();
+ // Seed the in-memory persister from disk so the first
+ // RelayBufferedObservations diagnostic uses the durable
+ // value rather than zero.
+ if (_configuration.DurableRelay)
+ {
+ _lastSentSequencePersister.Initialize(ReadLastSentSequenceFromDisk());
+
+ // Start a background flush timer. The timer only emits
+ // a write when the persister is dirty so an idle relay
+ // does not burn IOPS.
+ _lastSentFlushTimer = new System.Threading.Timer(
+ _ => FlushLastSentSequence(),
+ null,
+ LastSentFlushInterval,
+ LastSentFlushInterval);
+ }
+
_ = Task.Run(Worker, _stop.Token);
}
protected override void OnStop()
{
- _documentServer.Stop();
+ // Entity-mode constructs only _entityServer, so a bare
+ // _documentServer.Stop() here would raise an NRE during
+ // shutdown. Route through the lifecycle helper so each
+ // server stop is independently null-safe and exception-safe.
+ MqttRelayLifecycle.StopServers(
+ documentStop: _documentServer != null ? (Action)(() => _documentServer.Stop()) : null,
+ entityStop: null);
+
+ // Flush any pending last-sent-sequence update before the
+ // timer is disposed, so a clean shutdown does not lose
+ // progress against the buffered-observation relay.
+ if (_lastSentFlushTimer != null)
+ {
+ try { _lastSentFlushTimer.Dispose(); } catch { }
+ _lastSentFlushTimer = null;
+ }
+ FlushLastSentSequence();
if (_stop != null) _stop.Cancel();
- try
- {
- if (_mqttClient != null)
- {
- _mqttClient.DisconnectAsync(MqttClientDisconnectOptionsReason.NormalDisconnection);
- }
- }
- catch { }
+ // Bounded await on the disconnect: previously this was a
+ // fire-and-forget DisconnectAsync(...) whose returned Task
+ // was never awaited. Faults were silently dropped and the
+ // host process risked exiting before the disconnect
+ // completed. Route through the lifecycle helper so faults
+ // surface to the log and a hung broker cannot block
+ // shutdown indefinitely.
+ MqttRelayLifecycle.DisconnectWithTimeout(
+ disconnect: _mqttClient != null
+ ? (Func)(() => _mqttClient.DisconnectAsync(MqttClientDisconnectOptionsReason.NormalDisconnection))
+ : null,
+ timeout: TimeSpan.FromSeconds(5),
+ onFault: ex => Log(MTConnectLogLevel.Warning, $"MQTT Relay Disconnect Error : {ex.Message}"));
}
private async Task Worker()
@@ -246,7 +290,15 @@ private async Task Worker()
await Task.Delay(_configuration.ReconnectInterval, _stop.Token);
}
catch (TaskCanceledException) { }
- catch (Exception) { }
+ catch (Exception ex)
+ {
+ // Route through WorkerLoopExceptionLogger so the
+ // operator sees an unexpected defect at the outer
+ // scope instead of having it silently swallowed.
+ WorkerLoopExceptionLogger.Log(
+ exception: ex,
+ onLog: msg => Log(MTConnectLogLevel.Warning, msg));
+ }
} while (!_stop.Token.IsCancellationRequested);
}
@@ -273,13 +325,16 @@ private async Task RelayBufferedObservations()
{
if (Agent is IMTConnectAgentBroker broker)
{
- ulong lastSent = GetLastSentSequence();
+ // Read in-memory: the persister was seeded from disk at
+ // OnStartAfterLoad. Avoid a synchronous file read on
+ // every relay attempt.
+ ulong lastSent = _lastSentSequencePersister.Read();
ulong from = Math.Max(lastSent + 1, broker.FirstSequence);
ulong to = broker.LastSequence;
Log(MTConnectLogLevel.Information, $"[MQTT Relay] RelayBufferedObservations: lastSent={lastSent}, broker.FirstSequence={broker.FirstSequence}, broker.LastSequence={broker.LastSequence}");
- long missed = (long)(to - lastSent);
+ long missed = RelayBufferDiagnostics.ComputeMissed(to, lastSent);
if (lastSent + 1 < broker.FirstSequence)
{
@@ -319,12 +374,19 @@ private async Task RelayBufferedObservations()
var result = await _entityServer.PublishObservation(_mqttClient, x);
if (result != null && result.IsSuccess)
{
- SetLastSentSequence(x.Sequence);
+ RecordLastSentSequence(x.Sequence);
}
}
+ // Batch boundary: the buffered relay just ran a
+ // chunk of observations under DurableRelay; flush
+ // the persister so a crash before the next timer
+ // tick does not lose this batch's progress.
+ FlushLastSentSequence();
+
Log(MTConnectLogLevel.Information, $"[MQTT Relay] Buffered observation relay complete. {observations.Count} missed observations sent.");
- var unsent = broker.LastSequence > (ulong)_lastSentSequence ? broker.LastSequence - (ulong)_lastSentSequence : 0;
+ var lastSentSnapshot = _lastSentSequencePersister.Read();
+ var unsent = broker.LastSequence > lastSentSnapshot ? broker.LastSequence - lastSentSnapshot : 0;
if (unsent > 0)
Log(MTConnectLogLevel.Information, $"[MQTT Relay] {unsent} new observations arrived during relay and will be sent next.");
}
@@ -349,7 +411,10 @@ private static string GetLastSentSequenceFilePath(IAgentApplicationConfiguration
return Path.Combine(baseDir, LastSentSequenceFileName);
}
- private ulong GetLastSentSequence()
+ // Disk read used at startup (Initialize) only. Hot-path reads
+ // go through _lastSentSequencePersister.Read(); this method
+ // is the single point of entry for the actual file IO.
+ private ulong ReadLastSentSequenceFromDisk()
{
if (!_configuration.DurableRelay) return 0;
var agentConfig = Agent?.Configuration as IAgentApplicationConfiguration;
@@ -365,18 +430,53 @@ private ulong GetLastSentSequence()
return 0;
}
- private void SetLastSentSequence(ulong seq)
+ // Records a new in-memory last-sent sequence. The actual disk
+ // write happens out-of-band on the flush timer / shutdown /
+ // batch boundary via FlushLastSentSequence.
+ private void RecordLastSentSequence(ulong seq)
{
if (!_configuration.DurableRelay) return; // Default
- var agentConfig = Agent?.Configuration as IAgentApplicationConfiguration;
- var path = GetLastSentSequenceFilePath(agentConfig);
- lock (_lastSentSequenceLock)
+ _lastSentSequencePersister.Update(seq);
+ }
+
+ // Writes the in-memory persister value to disk if dirty. Safe
+ // to call from any thread; the inner File.WriteAllText runs
+ // under _lastSentSequenceLock so a timer tick cannot race a
+ // shutdown flush.
+ private void FlushLastSentSequence()
+ {
+ if (!_configuration.DurableRelay) return;
+
+ try
+ {
+ var agentConfig = Agent?.Configuration as IAgentApplicationConfiguration;
+ var path = GetLastSentSequenceFilePath(agentConfig);
+
+ _lastSentSequencePersister.TryFlush(seq =>
+ {
+ lock (_lastSentSequenceLock)
+ {
+ File.WriteAllText(path, seq.ToString());
+ }
+ });
+ }
+ catch (Exception ex)
{
- File.WriteAllText(path, seq.ToString());
+ Log(MTConnectLogLevel.Warning, $"MQTT Relay last-sent-sequence flush error : {ex.Message}");
}
}
private async void ProbeReceived(IDevice device, IDevicesResponseDocument responseDocument)
+ {
+ // async void handler: route any unhandled fault (formatter
+ // throw, message-builder failure) to the log instead of the
+ // synchronization context.
+ await AsyncVoidGuard.Run(
+ async () => await ProbeReceivedCore(device, responseDocument),
+ ex => Log(MTConnectLogLevel.Warning, $"ProbeReceived handler error : {ex.Message}"));
+ }
+
+ private async Task ProbeReceivedCore(IDevice device, IDevicesResponseDocument responseDocument)
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
@@ -453,6 +553,15 @@ private async void ProbeReceived(IDevice device, IDevicesResponseDocument respon
}
private async void CurrentReceived(IDevice device, IStreamsResponseOutputDocument responseDocument)
+ {
+ // async void handler: route any unhandled fault to the log
+ // instead of the synchronization context.
+ await AsyncVoidGuard.Run(
+ async () => await CurrentReceivedCore(device, responseDocument),
+ ex => Log(MTConnectLogLevel.Warning, $"CurrentReceived handler error : {ex.Message}"));
+ }
+
+ private async Task CurrentReceivedCore(IDevice device, IStreamsResponseOutputDocument responseDocument)
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
@@ -493,6 +602,15 @@ private async void CurrentReceived(IDevice device, IStreamsResponseOutputDocumen
}
private async void SampleReceived(IDevice device, IStreamsResponseOutputDocument responseDocument)
+ {
+ // async void handler: route any unhandled fault to the log
+ // instead of the synchronization context.
+ await AsyncVoidGuard.Run(
+ async () => await SampleReceivedCore(device, responseDocument),
+ ex => Log(MTConnectLogLevel.Warning, $"SampleReceived handler error : {ex.Message}"));
+ }
+
+ private async Task SampleReceivedCore(IDevice device, IStreamsResponseOutputDocument responseDocument)
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
@@ -533,6 +651,15 @@ private async void SampleReceived(IDevice device, IStreamsResponseOutputDocument
}
private async void AssetReceived(IDevice device, IAssetsResponseDocument responseDocument)
+ {
+ // async void handler: route any unhandled fault to the log
+ // instead of the synchronization context.
+ await AsyncVoidGuard.Run(
+ async () => await AssetReceivedCore(device, responseDocument),
+ ex => Log(MTConnectLogLevel.Warning, $"AssetReceived handler error : {ex.Message}"));
+ }
+
+ private async Task AssetReceivedCore(IDevice device, IAssetsResponseDocument responseDocument)
{
if (_mqttClient != null && _mqttClient.IsConnected && responseDocument != null && !responseDocument.Assets.IsNullOrEmpty())
{
@@ -602,65 +729,60 @@ private async Task PublishCurrentObservations()
private async Task PublishObservations(IEnumerable observations)
{
- if (!observations.IsNullOrEmpty())
+ if (observations.IsNullOrEmpty()) return;
+
+ // Single-pass grouping replaces the previous O(n*k)
+ // Distinct + Where + FirstOrDefault pattern that iterated
+ // the source up to three times per distinct DataItemId.
+ foreach (var group in ObservationGrouper.GroupByDataItem(observations))
{
- var dataItemIds = observations.Select(o => o.DataItemId).Distinct();
- foreach (var dataItemId in dataItemIds)
- {
- var dataItemObservations = observations.Where(o => o.DataItemId == dataItemId);
- var dataItemObservation = dataItemObservations.FirstOrDefault();
+ // Materialise each group's observations once: the
+ // condition path needs to enumerate twice (test the
+ // first item's category and rebuild every observation)
+ // and re-iterating the IGrouping would re-iterate the
+ // upstream source.
+ var groupObservations = group.ToList();
+ if (groupObservations.Count == 0) continue;
- if (dataItemObservation.Category == DataItemCategory.CONDITION)
- {
- // Conditions have multiple observations
- var multipleObservations = new List();
- foreach (var observation in dataItemObservations)
- {
- var x = new Observation();
- x.DeviceUuid = observation.DeviceUuid;
- x.DataItemId = observation.DataItemId;
- x.DataItem = observation.DataItem;
- x.Name = observation.Name;
- x.Category = observation.Category;
- x.Type = observation.Type;
- x.SubType = observation.SubType;
- x.Representation = observation.Representation;
- x.CompositionId = observation.CompositionId;
- x.InstanceId = observation.InstanceId;
- x.Sequence = observation.Sequence;
- x.Timestamp = observation.Timestamp;
- x.AddValues(observation.Values);
-
- multipleObservations.Add(x);
- }
+ var first = groupObservations[0];
- await _entityServer.PublishObservations(_mqttClient, multipleObservations);
- }
- else
+ if (first.Category == DataItemCategory.CONDITION)
+ {
+ // Conditions have multiple observations
+ var multipleObservations = new List(groupObservations.Count);
+ foreach (var observation in groupObservations)
{
- var observation = dataItemObservations.FirstOrDefault();
-
- var x = new Observation();
- x.DeviceUuid = observation.DeviceUuid;
- x.DataItemId = observation.DataItemId;
- x.DataItem = observation.DataItem;
- x.Name = observation.Name;
- x.Category = observation.Category;
- x.Type = observation.Type;
- x.SubType = observation.SubType;
- x.Representation = observation.Representation;
- x.CompositionId = observation.CompositionId;
- x.InstanceId = observation.InstanceId;
- x.Sequence = observation.Sequence;
- x.Timestamp = observation.Timestamp;
- x.AddValues(observation.Values);
-
- await _entityServer.PublishObservation(_mqttClient, x);
+ multipleObservations.Add(CloneAsObservation(observation));
}
+
+ await _entityServer.PublishObservations(_mqttClient, multipleObservations);
+ }
+ else
+ {
+ await _entityServer.PublishObservation(_mqttClient, CloneAsObservation(first));
}
}
}
+ private static Observation CloneAsObservation(IObservationOutput source)
+ {
+ var x = new Observation();
+ x.DeviceUuid = source.DeviceUuid;
+ x.DataItemId = source.DataItemId;
+ x.DataItem = source.DataItem;
+ x.Name = source.Name;
+ x.Category = source.Category;
+ x.Type = source.Type;
+ x.SubType = source.SubType;
+ x.Representation = source.Representation;
+ x.CompositionId = source.CompositionId;
+ x.InstanceId = source.InstanceId;
+ x.Sequence = source.Sequence;
+ x.Timestamp = source.Timestamp;
+ x.AddValues(source.Values);
+ return x;
+ }
+
private async Task PublishAssets()
{
if (_entityServer != null)
@@ -679,70 +801,91 @@ private async Task PublishAssets()
private async void AgentDeviceAdded(object sender, IDevice device)
{
- if (_entityServer != null) await _entityServer.PublishDevice(_mqttClient, device);
+ // async void handlers must not throw; route any fault to
+ // the log instead of the synchronization context.
+ await AsyncVoidGuard.Run(
+ async () =>
+ {
+ if (_entityServer != null) await _entityServer.PublishDevice(_mqttClient, device);
+ },
+ ex => Log(MTConnectLogLevel.Warning, $"AgentDeviceAdded handler error : {ex.Message}"));
}
private async void AgentObservationAdded(object sender, IObservation observation)
{
- if (_configuration.DurableRelay)
- {
- Interlocked.Increment(ref _totalIncomingObservations);
-
- var lastSent = GetLastSentSequence();
- _lastSentSequence = (long)lastSent;
-
- if (_entityServer != null)
+ // async void handlers must not throw; route any fault to
+ // the log instead of the synchronization context.
+ await AsyncVoidGuard.Run(
+ async () =>
{
- if (observation.Category == DataItemCategory.CONDITION)
+ if (_configuration.DurableRelay)
{
- var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
- var result = await _entityServer.PublishObservations(_mqttClient, conditionObservations.OfType());
- if (result != null && result.IsSuccess && conditionObservations != null && conditionObservations.Any())
+ Interlocked.Increment(ref _totalIncomingObservations);
+
+ // No disk read here: the persister was seeded
+ // at startup and is updated in-memory below.
+ // The previous GetLastSentSequence() call did
+ // a synchronous File.ReadAllText on every
+ // observation event.
+
+ if (_entityServer != null)
{
- SetLastSentSequence(conditionObservations.Max(o => o.Sequence));
+ if (observation.Category == DataItemCategory.CONDITION)
+ {
+ var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
+ var result = await _entityServer.PublishObservations(_mqttClient, conditionObservations.OfType());
+ if (result != null && result.IsSuccess && conditionObservations != null && conditionObservations.Any())
+ {
+ RecordLastSentSequence(conditionObservations.Max(o => o.Sequence));
+ }
+ }
+ else
+ {
+ var result = await _entityServer.PublishObservation(_mqttClient, observation);
+ if (result != null && result.IsSuccess)
+ {
+ RecordLastSentSequence(observation.Sequence);
+ }
+ }
}
}
else
{
- var result = await _entityServer.PublishObservation(_mqttClient, observation);
- if (result != null && result.IsSuccess)
+ if (_entityServer != null)
{
- SetLastSentSequence(observation.Sequence);
+ if (observation.Category == DataItemCategory.CONDITION)
+ {
+ var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
+ await PublishObservations(conditionObservations);
+ }
+ else
+ {
+ await _entityServer.PublishObservation(_mqttClient, observation);
+ }
}
}
- }
- }
- else
- {
- if (_entityServer != null)
- {
- if (observation.Category == DataItemCategory.CONDITION)
- {
- var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
- await PublishObservations(conditionObservations);
- }
- else
- {
- await _entityServer.PublishObservation(_mqttClient, observation);
- }
- }
- }
+ },
+ ex => Log(MTConnectLogLevel.Warning, $"AgentObservationAdded handler error : {ex.Message}"));
}
private async void AgentAssetAdded(object sender, IAsset asset)
{
- if (_entityServer != null) await _entityServer.PublishAsset(_mqttClient, asset);
+ // async void handlers must not throw; route any fault to
+ // the log instead of the synchronization context.
+ await AsyncVoidGuard.Run(
+ async () =>
+ {
+ if (_entityServer != null) await _entityServer.PublishAsset(_mqttClient, asset);
+ },
+ ex => Log(MTConnectLogLevel.Warning, $"AgentAssetAdded handler error : {ex.Message}"));
}
private string GetAgentAvailableTopic()
{
- if (Agent != null && _configuration != null)
- {
- return $"{_configuration.TopicPrefix}/{MTConnectMqttDocumentServer.ProbeTopic}/{Agent.Uuid}/Available"; ;
- }
+ if (Agent == null || _configuration == null) return null;
- return null;
+ return AvailabilityTopic.Build(_configuration.TopicPrefix, Agent.Uuid);
}
}
}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayLifecycle.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayLifecycle.cs
new file mode 100644
index 000000000..f4d61d2ae
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayLifecycle.cs
@@ -0,0 +1,127 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System;
+using System.Threading.Tasks;
+
+namespace MTConnect
+{
+ ///
+ /// Centralises the MqttRelay agent module shutdown policy. The
+ /// MqttRelay module previously called _documentServer.Stop()
+ /// unconditionally from OnStop(); when configured for
+ /// TopicStructure=Entity only _entityServer is
+ /// constructed, so the document-server reference was null
+ /// and shutdown raised a .
+ /// Encapsulating the policy here makes the null-guard unit-testable
+ /// without instantiating an MTConnect agent broker, and keeps the
+ /// per-server stop independent so a throw on one path cannot leave
+ /// the other server running.
+ ///
+ public static class MqttRelayLifecycle
+ {
+ ///
+ /// Invokes the supplied per-server stop actions, tolerating
+ /// either or both being null. A throw from one action
+ /// does not prevent the other from running.
+ ///
+ ///
+ /// Optional action stopping the Document-mode server.
+ ///
+ ///
+ /// Optional action stopping the Entity-mode server.
+ ///
+ public static void StopServers(Action documentStop, Action entityStop)
+ {
+ if (documentStop != null)
+ {
+ try { documentStop(); } catch { }
+ }
+
+ if (entityStop != null)
+ {
+ try { entityStop(); } catch { }
+ }
+ }
+
+ ///
+ /// Awaits an MQTT-client disconnect with a bounded timeout.
+ /// Replaces the previous fire-and-forget invocation in
+ /// OnStop():
+ ///
+ /// * Synchronous throw from is
+ /// captured and routed to instead
+ /// of escaping the shutdown path.
+ /// * A faulted Task is surfaced to
+ /// with its inner exception.
+ /// * A disconnect that does not complete within
+ /// is treated as best-effort
+ /// success so a misbehaving broker cannot hang shutdown.
+ /// * A null is a no-op (the
+ /// worker may never have created an MQTT client).
+ ///
+ ///
+ /// Factory that begins the disconnect and returns its
+ /// . May be null.
+ ///
+ ///
+ /// Bound on how long shutdown is willing to wait. A timeout
+ /// elapses silently because the agent is already shutting down.
+ ///
+ ///
+ /// Logger callback invoked with the exception when the
+ /// disconnect throws synchronously or its Task faults.
+ ///
+ public static void DisconnectWithTimeout(
+ Func disconnect,
+ TimeSpan timeout,
+ Action onFault)
+ {
+ if (disconnect == null) return;
+
+ Task task;
+ try
+ {
+ task = disconnect();
+ }
+ catch (Exception ex)
+ {
+ if (onFault != null) onFault(ex);
+ return;
+ }
+
+ if (task == null) return;
+
+ try
+ {
+ if (!task.Wait(timeout))
+ {
+ // Bounded wait elapsed; agent is shutting down so
+ // a hung broker cannot block process exit. Best
+ // effort: leave the task to be GC'd. Attach a
+ // continuation so its eventual fault is still
+ // surfaced rather than swallowed by the finalizer.
+ if (onFault != null)
+ {
+ task.ContinueWith(
+ t => onFault(t.Exception),
+ TaskContinuationOptions.OnlyOnFaulted);
+ }
+ return;
+ }
+ }
+ catch (AggregateException agg)
+ {
+ if (onFault != null)
+ {
+ var inner = agg.InnerException ?? agg;
+ onFault(inner);
+ }
+ }
+ catch (Exception ex)
+ {
+ if (onFault != null) onFault(ex);
+ }
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/ObservationGrouper.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/ObservationGrouper.cs
new file mode 100644
index 000000000..a09ae7748
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/ObservationGrouper.cs
@@ -0,0 +1,40 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Linq;
+using MTConnect.Observations.Output;
+
+namespace MTConnect
+{
+ ///
+ /// Single-pass grouping helper for MqttRelay observation
+ /// publishing. Module.PublishObservations previously iterated the
+ /// input enumerable up to three times per distinct DataItemId
+ /// (Distinct, Where, FirstOrDefault); on large agents that
+ /// throttled the catch-up after a reconnect. This helper produces
+ /// the same logical grouping in a single pass.
+ ///
+ public static class ObservationGrouper
+ {
+ ///
+ /// Groups by
+ /// in a single
+ /// pass. Encounter order is preserved within each group so a
+ /// caller that relies on sequence-monotonic ordering is not
+ /// broken. A null source returns an empty result.
+ ///
+ public static IEnumerable> GroupByDataItem(
+ IEnumerable observations)
+ {
+ if (observations == null) return Enumerable.Empty>();
+
+ // Materialise into a List then GroupBy, so a deferred
+ // upstream query is iterated exactly once. (LINQ GroupBy
+ // is itself single-pass over its source, but ToList
+ // makes the contract explicit and lets the caller iterate
+ // the resulting groups multiple times safely.)
+ return observations.ToList().GroupBy(o => o.DataItemId);
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md
index a1e2f8c8c..688bb08b6 100644
--- a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md
@@ -105,6 +105,43 @@ modules:
topicPrefix: enterprise/site/area/line/cell/MTConnect
```
+## Migration: Availability Topic Moved Out of `Probe/#`
+
+In earlier releases the MqttRelay agent module published the agent's
+Availability state (Last Will and Testament plus the on-connect retained
+"AVAILABLE" message) on a four-segment topic that fell under the
+`{TopicPrefix}/Probe/#` wildcard:
+
+```
+{TopicPrefix}/Probe/{AgentUuid}/Available (raw "AVAILABLE" / "UNAVAILABLE" UTF-8 bytes)
+```
+
+That broke the contract that every payload under the Probe wildcard is a
+JSON document envelope. Subscribers binding to `{TopicPrefix}/Probe/#`
+expecting only Probe document JSON would receive a non-JSON Availability
+payload and either crash or silently drop the message.
+
+Starting with this release the module emits agent Availability on a
+dedicated, non-overlapping topic:
+
+```
+{TopicPrefix}/Agent/{AgentUuid}/Available (raw "AVAILABLE" / "UNAVAILABLE" UTF-8 bytes)
+```
+
+This matches the cppagent reference implementation
+([mtconnect/cppagent](https://github.com/mtconnect/cppagent)) which also
+publishes JSON document envelopes only under the Probe wildcard and emits
+agent status on a dedicated agent / status topic. The Probe wildcard is
+now pure-JSON for any subscriber.
+
+### Action required when upgrading
+
+* Subscribers that relied on the old `{TopicPrefix}/Probe/{AgentUuid}/Available`
+ topic to track agent Availability must subscribe to
+ `{TopicPrefix}/Agent/{AgentUuid}/Available` instead.
+* Any retained-message cleanup tooling pointed at the old topic should be
+ updated to clear the new topic on broker rotations.
+
## Contribution / Feedback
- Please use the [Issues](https://github.com/TrakHound/MTConnect.NET/issues) tab to create issues for specific problems that you may encounter
- Please feel free to use the [Pull Requests](https://github.com/TrakHound/MTConnect.NET/pulls) tab for any suggested improvements to the source code
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/RelayBufferDiagnostics.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/RelayBufferDiagnostics.cs
new file mode 100644
index 000000000..d612f6b92
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/RelayBufferDiagnostics.cs
@@ -0,0 +1,36 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+namespace MTConnect
+{
+ ///
+ /// Helpers for the MqttRelay buffered-observation diagnostic log
+ /// lines. The "missed observations" figure printed when a relay
+ /// reconnects is computed against unsigned broker sequence numbers,
+ /// so the helper guards the unsigned subtraction from underflow.
+ ///
+ public static class RelayBufferDiagnostics
+ {
+ ///
+ /// Computes the count of buffered observations the operator
+ /// reasonably expects to see relayed when the connection
+ /// recovers. Returns 0 when
+ /// is at or above ; that case is
+ /// degenerate (a stale persisted last-sent-sequence value or a
+ /// rolled broker sequence) so a meaningful "missed" figure
+ /// cannot be produced and the diagnostic should not print a
+ /// huge spurious number.
+ ///
+ ///
+ /// The broker's current LastSequence.
+ ///
+ ///
+ /// The persisted last-sent-sequence.
+ ///
+ public static long ComputeMissed(ulong to, ulong lastSent)
+ {
+ if (lastSent >= to) return 0;
+ return (long)(to - lastSent);
+ }
+ }
+}
diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/WorkerLoopExceptionLogger.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/WorkerLoopExceptionLogger.cs
new file mode 100644
index 000000000..5b31667ca
--- /dev/null
+++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/WorkerLoopExceptionLogger.cs
@@ -0,0 +1,39 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System;
+using System.Threading.Tasks;
+
+namespace MTConnect
+{
+ ///
+ /// Encodes the MqttRelay Worker outer-catch logging policy. The
+ /// outer-loop catch in Module.Worker previously had a bare empty
+ /// catch on , swallowing any unexpected
+ /// exception escaping the inner try/catch. The relay quietly
+ /// entered the reconnect delay branch and the underlying defect
+ /// went undiagnosed for the lifetime of the agent.
+ ///
+ /// This helper centralizes the policy: skip the orderly-shutdown
+ /// cancelation signals, log everything else with type and message
+ /// so the operator can diagnose the defect from log scrapes.
+ ///
+ public static class WorkerLoopExceptionLogger
+ {
+ ///
+ /// Routes to
+ /// unless the exception is a cancelation signal
+ /// ( or its
+ /// subclass), in which case
+ /// the policy is silence (orderly shutdown).
+ ///
+ public static void Log(Exception exception, Action onLog)
+ {
+ if (exception == null) return;
+ if (onLog == null) return;
+ if (exception is OperationCanceledException) return;
+
+ onLog($"MQTT Relay Worker unexpected error : {exception.GetType().Name} : {exception.Message}");
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AsyncVoidGuardTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AsyncVoidGuardTests.cs
new file mode 100644
index 000000000..c63947165
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AsyncVoidGuardTests.cs
@@ -0,0 +1,106 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System;
+using System.Threading.Tasks;
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins the MqttRelay async-void event-handler safety policy. The
+ /// module exposes seven event handlers that the agent broker
+ /// invokes as async void:
+ ///
+ /// AgentDeviceAdded, AgentObservationAdded, AgentAssetAdded,
+ /// ProbeReceived, CurrentReceived, SampleReceived,
+ /// AssetReceived
+ ///
+ /// An async void method that throws routes its exception to
+ /// the synchronization context, which on the ThreadPool tears down
+ /// the host process. Three of the seven handlers (AgentDeviceAdded,
+ /// AgentObservationAdded, AgentAssetAdded) had no top-level
+ /// try/catch; the other four guarded only the inner publish. A
+ /// throw from the formatter or from a synchronous broker call
+ /// (DataItem null-deref, broker shutting down) crashed the agent.
+ ///
+ /// AsyncVoidGuard.Run wraps the handler body and routes any
+ /// exception to a logging callback so the policy is enforced
+ /// uniformly and is unit-testable. Pinning the policy here also
+ /// prevents a future contributor from re-introducing an unguarded
+ /// async-void path.
+ ///
+ [TestFixture]
+ public class AsyncVoidGuardTests
+ {
+ [Test]
+ public async Task Run_completes_normally_when_body_succeeds()
+ {
+ string loggedFault = null;
+
+ await AsyncVoidGuard.Run(
+ async () => { await Task.Yield(); },
+ ex => loggedFault = ex.Message);
+
+ Assert.That(loggedFault, Is.Null,
+ "Successful body must not invoke the fault logger.");
+ }
+
+ [Test]
+ public async Task Run_routes_synchronous_throw_to_logger()
+ {
+ string loggedFault = null;
+
+ await AsyncVoidGuard.Run(
+ () => throw new InvalidOperationException("sync throw"),
+ ex => loggedFault = ex.Message);
+
+ Assert.That(loggedFault, Is.EqualTo("sync throw"));
+ }
+
+ [Test]
+ public async Task Run_routes_async_throw_to_logger()
+ {
+ string loggedFault = null;
+
+ await AsyncVoidGuard.Run(
+ async () => { await Task.Yield(); throw new InvalidOperationException("async throw"); },
+ ex => loggedFault = ex.Message);
+
+ Assert.That(loggedFault, Is.EqualTo("async throw"));
+ }
+
+ [Test]
+ public async Task Run_does_not_rethrow_when_logger_is_null()
+ {
+ // Even with a null logger, the guard must swallow the
+ // exception so an async-void handler cannot crash the host.
+ await AsyncVoidGuard.Run(
+ () => throw new InvalidOperationException("no logger"),
+ onFault: null);
+
+ Assert.Pass();
+ }
+
+ [Test]
+ public async Task Run_does_not_rethrow_when_logger_itself_throws()
+ {
+ // A misbehaving logger must not corrupt the guard
+ // contract: the originating handler must still complete.
+ await AsyncVoidGuard.Run(
+ () => throw new InvalidOperationException("body"),
+ ex => throw new InvalidOperationException("logger crashed"));
+
+ Assert.Pass();
+ }
+
+ [Test]
+ public async Task Run_no_ops_when_body_is_null()
+ {
+ // A misuse case (handler delegate not wired); the guard
+ // must not throw NullReferenceException of its own.
+ await AsyncVoidGuard.Run(body: null, onFault: _ => { });
+ Assert.Pass();
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicRegressionTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicRegressionTests.cs
new file mode 100644
index 000000000..9099d32e0
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicRegressionTests.cs
@@ -0,0 +1,88 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins the regression for
+ /// https://github.com/TrakHound/MTConnect.NET/issues/135. The
+ /// MqttRelay agent module previously published its agent
+ /// availability state (LWT plus on-connect retained Available
+ /// message) on a four-segment topic under the Probe wildcard with a
+ /// raw UTF-8 string payload. Any subscriber on
+ /// {TopicPrefix}/Probe/# parsing every payload as JSON failed on
+ /// that raw string.
+ ///
+ /// These tests guard against accidental re-introduction of the
+ /// broken shape: any future refactor that re-routes the
+ /// availability publish under the Probe wildcard or that re-uses
+ /// the Probe topic constant in availability-topic construction
+ /// fails the guard.
+ ///
+ /// Source references:
+ /// https://github.com/TrakHound/MTConnect.NET/issues/135
+ /// https://github.com/mtconnect/cppagent (canonical Probe/#
+ /// wildcard contract: only JSON document envelopes).
+ ///
+ [TestFixture]
+ public class AvailabilityTopicRegressionTests
+ {
+ [Test]
+ public void Topic_never_contains_probe_segment_for_any_inputs()
+ {
+ string[] topicPrefixes =
+ {
+ "MTConnect",
+ "MTConnect/Document",
+ "fleet/site/agent-1",
+ "Probe", // adversarial: prefix happens to be "Probe".
+ "x"
+ };
+
+ string[] agentUuids =
+ {
+ "agent-uuid-1",
+ "Probe", // adversarial: uuid happens to be "Probe".
+ "00000000-0000-0000-0000-000000000000",
+ "agent.with.dots",
+ "agent-with-dashes"
+ };
+
+ foreach (var prefix in topicPrefixes)
+ {
+ foreach (var uuid in agentUuids)
+ {
+ var topic = AvailabilityTopic.Build(prefix, uuid);
+
+ Assert.That(topic, Is.Not.Null,
+ $"Build({prefix}, {uuid}) should produce a topic.");
+
+ // The dedicated availability segment is "Agent",
+ // sandwiched between the configured prefix and
+ // the agent uuid. Even when the prefix or the
+ // uuid happens to be the literal "Probe" string,
+ // the dedicated availability segment must never
+ // be "Probe" so a Probe/# wildcard subscriber
+ // never matches the availability publish.
+ var expectedSegment =
+ $"/{AvailabilityTopic.AgentSegment}/{uuid}/{AvailabilityTopic.AvailableSegment}";
+
+ Assert.That(topic, Does.EndWith(expectedSegment),
+ $"Build({prefix}, {uuid}) should end with the relocated availability segment.");
+ }
+ }
+ }
+
+ [Test]
+ public void Agent_segment_is_distinct_from_probe_constant()
+ {
+ // Guards against a future refactor that points
+ // AvailabilityTopic.AgentSegment back at the Probe topic
+ // constant (the source of the original defect).
+ Assert.That(AvailabilityTopic.AgentSegment, Is.Not.EqualTo("Probe"));
+ Assert.That(AvailabilityTopic.AgentSegment, Is.EqualTo("Agent"));
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicTests.cs
new file mode 100644
index 000000000..a2df94669
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicTests.cs
@@ -0,0 +1,105 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins the MQTT topic on which the MqttRelay agent module publishes
+ /// the agent's Availability state. Background and motivation:
+ ///
+ /// https://github.com/TrakHound/MTConnect.NET/issues/135
+ ///
+ /// The MqttRelay's MQTT Last Will and Testament plus on-connect
+ /// retained Available message previously emitted on
+ /// {TopicPrefix}/Probe/{AgentUuid}/Available
+ /// with a raw "AVAILABLE" / "UNAVAILABLE" UTF-8 string payload.
+ /// That four-segment topic falls under the
+ /// {TopicPrefix}/Probe/#
+ /// wildcard, breaking the contract that every payload under that
+ /// wildcard is a JSON document envelope. The cppagent reference
+ /// implementation
+ /// https://github.com/mtconnect/cppagent
+ /// publishes only JSON document envelopes under the Probe wildcard
+ /// and emits the agent availability state outside the wildcard
+ /// (typically on a dedicated agent / status topic).
+ ///
+ /// These tests pin the corrected shape:
+ /// {TopicPrefix}/Agent/{AgentUuid}/Available
+ /// so the Probe wildcard remains pure-JSON for any subscriber.
+ ///
+ [TestFixture]
+ public class AvailabilityTopicTests
+ {
+ [Test]
+ public void Build_returns_topic_outside_probe_wildcard()
+ {
+ // Arrange / Act
+ var topic = AvailabilityTopic.Build("MTConnect", "agent-uuid-1");
+
+ // Assert: never emit under the Probe/# wildcard.
+ Assert.That(topic, Does.Not.Contain("/Probe/"),
+ "MqttRelay availability topic must not fall under the Probe/# wildcard.");
+ }
+
+ [Test]
+ public void Build_uses_dedicated_agent_segment()
+ {
+ // Arrange
+ const string topicPrefix = "MTConnect";
+ const string agentUuid = "agent-uuid-1";
+
+ // Act
+ var topic = AvailabilityTopic.Build(topicPrefix, agentUuid);
+
+ // Assert: shape matches the relocated contract.
+ Assert.That(topic, Is.EqualTo("MTConnect/Agent/agent-uuid-1/Available"));
+ }
+
+ [Test]
+ public void Build_preserves_multi_segment_topic_prefix()
+ {
+ // Arrange
+ const string topicPrefix = "MTConnect/Document";
+ const string agentUuid = "agent-uuid-2";
+
+ // Act
+ var topic = AvailabilityTopic.Build(topicPrefix, agentUuid);
+
+ // Assert
+ Assert.That(topic, Is.EqualTo("MTConnect/Document/Agent/agent-uuid-2/Available"));
+ Assert.That(topic, Does.Not.Contain("/Probe/"));
+ }
+
+ [Test]
+ public void Build_returns_null_when_topic_prefix_is_null()
+ {
+ Assert.That(AvailabilityTopic.Build(null, "agent-uuid-1"), Is.Null);
+ }
+
+ [Test]
+ public void Build_returns_null_when_topic_prefix_is_empty()
+ {
+ Assert.That(AvailabilityTopic.Build(string.Empty, "agent-uuid-1"), Is.Null);
+ }
+
+ [Test]
+ public void Build_returns_null_when_agent_uuid_is_null()
+ {
+ Assert.That(AvailabilityTopic.Build("MTConnect", null), Is.Null);
+ }
+
+ [Test]
+ public void Build_returns_null_when_agent_uuid_is_empty()
+ {
+ Assert.That(AvailabilityTopic.Build("MTConnect", string.Empty), Is.Null);
+ }
+
+ [Test]
+ public void AvailableSegment_constant_pins_trailing_topic_segment()
+ {
+ Assert.That(AvailabilityTopic.AvailableSegment, Is.EqualTo("Available"));
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicValidationTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicValidationTests.cs
new file mode 100644
index 000000000..d4b2ca12f
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicValidationTests.cs
@@ -0,0 +1,78 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins MQTT topic input validation for
+ /// against the
+ /// MQTT 3.1.1 reserved-character rules. The MQTT specification (3.1.1
+ /// section 4.7.1.1) reserves '+', '#', and the null character within
+ /// topic names; '/' is the topic separator and must not appear inside
+ /// a single segment such as a topicPrefix or agentUuid input.
+ ///
+ /// If a caller supplies a value containing any of those reserved
+ /// characters the resulting topic would be malformed (or would alter
+ /// the wildcard contract under which other subscribers operate), so
+ /// rejects the
+ /// input by returning null.
+ ///
+ /// Source reference:
+ /// MQTT 3.1.1, OASIS standard, section 4.7.1.1 (topic name and
+ /// topic filter format).
+ ///
+ [TestFixture]
+ public class AvailabilityTopicValidationTests
+ {
+ [TestCase("MTConnect+", "agent-uuid-1")]
+ [TestCase("MTConnect#", "agent-uuid-1")]
+ [TestCase("MTConnect\0", "agent-uuid-1")]
+ [TestCase("MTConnect", "agent+uuid")]
+ [TestCase("MTConnect", "agent#uuid")]
+ [TestCase("MTConnect", "agent\0uuid")]
+ public void Build_returns_null_when_reserved_character_present(
+ string topicPrefix, string agentUuid)
+ {
+ Assert.That(
+ AvailabilityTopic.Build(topicPrefix, agentUuid),
+ Is.Null,
+ $"Build({topicPrefix}, {agentUuid}) should reject MQTT-reserved characters.");
+ }
+
+ [Test]
+ public void Build_rejects_slash_inside_agent_uuid_segment()
+ {
+ // The agentUuid is a single topic segment; embedding '/'
+ // would split it across multiple segments and break the
+ // {TopicPrefix}/Agent/{AgentUuid}/Available shape.
+ Assert.That(
+ AvailabilityTopic.Build("MTConnect", "agent/uuid"),
+ Is.Null);
+ }
+
+ [Test]
+ public void Build_strips_leading_slash_from_topic_prefix()
+ {
+ // Optional canonicalisation: a leading '/' in topicPrefix
+ // would yield a topic beginning with '/', which is legal in
+ // MQTT 3.1.1 but produces a confusing empty leading
+ // segment. Strip it so the resulting topic stays canonical.
+ Assert.That(
+ AvailabilityTopic.Build("/MTConnect", "agent-1"),
+ Is.EqualTo("MTConnect/Agent/agent-1/Available"));
+ }
+
+ [Test]
+ public void Build_strips_trailing_slash_from_topic_prefix()
+ {
+ // Optional canonicalisation: a trailing '/' would yield
+ // "MTConnect//Agent/agent-1/Available" with a stray empty
+ // segment.
+ Assert.That(
+ AvailabilityTopic.Build("MTConnect/", "agent-1"),
+ Is.EqualTo("MTConnect/Agent/agent-1/Available"));
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequencePersisterTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequencePersisterTests.cs
new file mode 100644
index 000000000..95b996a0a
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequencePersisterTests.cs
@@ -0,0 +1,135 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins the MqttRelay last-sent-sequence persistence policy under
+ /// DurableRelay. Module.cs previously synchronously wrote to disk
+ /// from every successful observation publish in
+ /// AgentObservationAdded:
+ ///
+ /// File.WriteAllText(path, seq.ToString());
+ ///
+ /// Under high-rate observation arrival that put a synchronous disk
+ /// write on every event-handler invocation, throttling the relay.
+ /// LastSentSequencePersister introduces an in-memory tracker plus
+ /// an explicit dirty bit; the caller flushes to disk on a timer,
+ /// on shutdown, and after every batch boundary, instead of after
+ /// every observation.
+ ///
+ [TestFixture]
+ public class LastSentSequencePersisterTests
+ {
+ [Test]
+ public void Update_marks_dirty_and_round_trips_through_read()
+ {
+ var persister = new LastSentSequencePersister();
+
+ persister.Update(42UL);
+
+ Assert.That(persister.Read(), Is.EqualTo(42UL));
+ Assert.That(persister.IsDirty, Is.True,
+ "An in-memory update must mark the persister dirty so the caller knows to flush.");
+ }
+
+ [Test]
+ public void Read_does_not_clear_dirty_bit()
+ {
+ var persister = new LastSentSequencePersister();
+ persister.Update(7UL);
+
+ _ = persister.Read();
+
+ Assert.That(persister.IsDirty, Is.True,
+ "Read must not clear dirty: only Flush establishes durable persistence.");
+ }
+
+ [Test]
+ public void TryFlush_writes_only_when_dirty_and_clears_dirty_on_success()
+ {
+ var persister = new LastSentSequencePersister();
+ persister.Update(99UL);
+
+ ulong? written = null;
+ var flushed = persister.TryFlush(seq => written = seq);
+
+ Assert.That(flushed, Is.True,
+ "TryFlush returns true when a write was actually emitted.");
+ Assert.That(written, Is.EqualTo(99UL));
+ Assert.That(persister.IsDirty, Is.False,
+ "After a successful flush the persister must be clean so the next timer tick does not redundantly write.");
+ }
+
+ [Test]
+ public void TryFlush_no_ops_when_not_dirty()
+ {
+ var persister = new LastSentSequencePersister();
+ // Never updated; persister is clean.
+ var written = false;
+ var flushed = persister.TryFlush(_ => written = true);
+
+ Assert.That(flushed, Is.False);
+ Assert.That(written, Is.False,
+ "A clean persister must not invoke the writer; that would burn IOPS for no progress.");
+ }
+
+ [Test]
+ public void TryFlush_keeps_dirty_when_writer_throws()
+ {
+ // A failing write must not clear dirty: the caller wants
+ // the next timer tick to retry.
+ var persister = new LastSentSequencePersister();
+ persister.Update(123UL);
+
+ Assert.Throws(
+ () => persister.TryFlush(_ => throw new System.IO.IOException("disk full")));
+
+ Assert.That(persister.IsDirty, Is.True,
+ "A failed write must leave the persister dirty so the next flush retries.");
+ Assert.That(persister.Read(), Is.EqualTo(123UL));
+ }
+
+ [Test]
+ public void Update_is_a_last_write_wins_overwrite()
+ {
+ var persister = new LastSentSequencePersister();
+ persister.Update(100UL);
+ persister.Update(50UL);
+
+ Assert.That(persister.Read(), Is.EqualTo(50UL),
+ "Last write wins; the persister is not a max-watermark.");
+ }
+
+ [Test]
+ public void Initialize_seeds_value_without_marking_dirty()
+ {
+ // Used at startup to load the persisted last-sent-sequence
+ // from disk: the in-memory value must reflect the on-disk
+ // value but the persister must be clean (no flush needed
+ // until a real update arrives).
+ var persister = new LastSentSequencePersister();
+ persister.Initialize(500UL);
+
+ Assert.That(persister.Read(), Is.EqualTo(500UL));
+ Assert.That(persister.IsDirty, Is.False,
+ "Initialize seeds the in-memory value from disk and must not request a redundant flush.");
+ }
+
+ [Test]
+ public void TryFlush_no_ops_when_writer_is_null()
+ {
+ var persister = new LastSentSequencePersister();
+ persister.Update(7UL);
+
+ // A null writer means the caller has not wired persistence
+ // (e.g. DurableRelay disabled at runtime); the persister
+ // must not throw.
+ Assert.DoesNotThrow(() => persister.TryFlush(null));
+ // Dirty bit unchanged because no write happened.
+ Assert.That(persister.IsDirty, Is.True);
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequenceTrackerTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequenceTrackerTests.cs
new file mode 100644
index 000000000..ea2f4cc5c
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequenceTrackerTests.cs
@@ -0,0 +1,118 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins the atomic-access policy for the MqttRelay relay-progress
+ /// counter. Module.cs previously read and wrote
+ /// _lastSentSequence (a 64-bit field) without
+ /// , which on 32-bit hosts is not an
+ /// atomic operation: two 32-bit halves are read or written
+ /// independently and a concurrent reader can observe a torn value.
+ ///
+ /// MqttRelay reads _lastSentSequence from observation event
+ /// handlers (multiple ThreadPool threads) while the durable-relay
+ /// Worker writes it. A torn read could log a wildly wrong "unsent"
+ /// figure and, worse, be propagated to the persisted last-sent
+ /// sequence file, causing buffered observations to be skipped or
+ /// re-sent on the next reconnect.
+ ///
+ /// LastSentSequenceTracker centralises the atomic read/write so the
+ /// policy is unit-testable and so any future caller cannot
+ /// regress to a torn-read pattern.
+ ///
+ [TestFixture]
+ public class LastSentSequenceTrackerTests
+ {
+ [Test]
+ public void Read_returns_zero_on_fresh_tracker()
+ {
+ var tracker = new LastSentSequenceTracker();
+ Assert.That(tracker.Read(), Is.EqualTo(0UL));
+ }
+
+ [Test]
+ public void Write_round_trips_through_read()
+ {
+ var tracker = new LastSentSequenceTracker();
+ tracker.Write(42UL);
+ Assert.That(tracker.Read(), Is.EqualTo(42UL));
+ }
+
+ [Test]
+ public void Write_supports_full_ulong_range_round_trip()
+ {
+ // The broker sequence is unsigned; the tracker must round
+ // trip values whose bit pattern is negative when reinterpreted
+ // as a signed long (Interlocked operates on long internally).
+ var tracker = new LastSentSequenceTracker();
+ tracker.Write(ulong.MaxValue);
+ Assert.That(tracker.Read(), Is.EqualTo(ulong.MaxValue));
+ }
+
+ [Test]
+ public void Write_overwrites_previous_value()
+ {
+ var tracker = new LastSentSequenceTracker();
+ tracker.Write(100UL);
+ tracker.Write(50UL);
+ Assert.That(tracker.Read(), Is.EqualTo(50UL),
+ "Last write wins; the tracker is not a max-watermark.");
+ }
+
+ [Test]
+ public void Concurrent_writes_and_reads_observe_only_written_values()
+ {
+ // Smoke-test for non-atomic torn read: launch a writer
+ // ramping through the high ulong range and a reader
+ // sampling the value; the reader must never observe a
+ // value that was never written. With Interlocked the
+ // assertion is trivially true; with naive long field the
+ // assertion would fail on 32-bit hosts.
+ var tracker = new LastSentSequenceTracker();
+ const int iterations = 100_000;
+ var startGate = new System.Threading.ManualResetEventSlim(false);
+
+ var writer = Task.Run(() =>
+ {
+ startGate.Wait();
+ for (int i = 1; i <= iterations; i++)
+ {
+ tracker.Write((ulong)i | 0xFFFFFFFF00000000UL);
+ }
+ });
+
+ var reader = Task.Run(() =>
+ {
+ startGate.Wait();
+ while (!writer.IsCompleted)
+ {
+ var v = tracker.Read();
+ if (v != 0UL)
+ {
+ var low = v & 0x00000000FFFFFFFFUL;
+ var high = v & 0xFFFFFFFF00000000UL;
+ // The high half is the canary: the writer
+ // never sets a value with high half != the
+ // sentinel, so observing anything else is a
+ // torn read.
+ Assert.That(high, Is.EqualTo(0xFFFFFFFF00000000UL),
+ "Torn read: high half not as written.");
+ Assert.That(low, Is.LessThanOrEqualTo((ulong)iterations));
+ }
+ }
+ });
+
+ startGate.Set();
+ Task.WaitAll(writer, reader);
+
+ // Final state must be the writer's last value.
+ Assert.That(tracker.Read(), Is.EqualTo((ulong)iterations | 0xFFFFFFFF00000000UL));
+ }
+ }
+}
diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleDisconnectTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleDisconnectTests.cs
new file mode 100644
index 000000000..44c95d47e
--- /dev/null
+++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleDisconnectTests.cs
@@ -0,0 +1,108 @@
+// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
+// TrakHound Inc. licenses this file to you under the MIT license.
+
+using System;
+using System.Threading.Tasks;
+using NUnit.Framework;
+
+namespace MTConnect.AgentModule.MqttRelay.Tests
+{
+ ///
+ /// Pins the MqttRelay module disconnect-on-shutdown policy. The
+ /// previous OnStop() implementation called
+ /// _mqttClient.DisconnectAsync(...) as a fire-and-forget
+ /// task: the returned was never awaited and any
+ /// fault on the disconnect path was silently lost. That hid broker
+ /// errors at shutdown (so an admin diagnosing a hung shutdown could
+ /// not see the real cause) and risked the host process exiting
+ /// before the disconnect actually completed.
+ ///
+ /// The lifecycle helper now exposes a bounded await with a
+ /// fault-logging continuation. The policy:
+ ///
+ /// * Awaits the disconnect, but bounds the wait so a misbehaving
+ /// broker cannot hang shutdown forever.
+ /// * Surfaces a fault to a logging callback so the operator gets a
+ /// diagnostic instead of a silently-dropped exception.
+ /// * Treats the timeout itself as success (best-effort shutdown).
+ ///
+ [TestFixture]
+ public class MqttRelayLifecycleDisconnectTests
+ {
+ [Test]
+ public void DisconnectWithTimeout_returns_when_disconnect_completes()
+ {
+ // The disconnect task completes promptly; the helper must
+ // not block the shutdown thread or invoke the fault log.
+ string loggedFault = null;
+
+ MqttRelayLifecycle.DisconnectWithTimeout(
+ disconnect: () => Task.CompletedTask,
+ timeout: TimeSpan.FromSeconds(1),
+ onFault: ex => loggedFault = ex.Message);
+
+ Assert.That(loggedFault, Is.Null,
+ "Successful disconnect must not invoke the fault logger.");
+ }
+
+ [Test]
+ public void DisconnectWithTimeout_logs_fault_when_disconnect_throws()
+ {
+ string loggedFault = null;
+
+ MqttRelayLifecycle.DisconnectWithTimeout(
+ disconnect: () => Task.FromException(new InvalidOperationException("broker rejected")),
+ timeout: TimeSpan.FromSeconds(1),
+ onFault: ex => loggedFault = ex.Message);
+
+ Assert.That(loggedFault, Is.EqualTo("broker rejected"),
+ "A faulted disconnect Task must surface its exception to the logger.");
+ }
+
+ [Test]
+ public void DisconnectWithTimeout_returns_after_timeout_when_disconnect_hangs()
+ {
+ // A misbehaving broker that never completes the disconnect
+ // must not hang shutdown indefinitely; the helper bounds the
+ // wait and treats the timeout as best-effort success.
+ var sw = System.Diagnostics.Stopwatch.StartNew();
+
+ MqttRelayLifecycle.DisconnectWithTimeout(
+ disconnect: () => new TaskCompletionSource