diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AsyncVoidGuard.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AsyncVoidGuard.cs new file mode 100644 index 000000000..0be86025a --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AsyncVoidGuard.cs @@ -0,0 +1,59 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; + +namespace MTConnect +{ + /// + /// Centralises the MqttRelay agent-module async void + /// event-handler safety policy. An async void method that + /// throws routes its exception to the synchronization context, + /// which on the ThreadPool tears down the host process. The + /// MqttRelay handlers run on broker-raised events, so without a + /// top-level guard a formatter throw (DataItem null-deref) or a + /// synchronous broker call (broker shutting down) crashed the + /// agent. + /// + /// Wrapping every handler body in guarantees that + /// any exception, synchronous or async, is captured and routed to + /// a logging callback rather than escaping to the synchronization + /// context. The guard is also unit-testable in isolation, so the + /// policy is pinned away from the (hard-to-mock) Module class. + /// + public static class AsyncVoidGuard + { + /// + /// Awaits the supplied handler body and routes any exception + /// to the logging callback. Never rethrows: the contract is + /// "an async-void path must not crash the host". A null body + /// is a no-op and a null or throwing logger is tolerated. + /// + /// + /// The handler implementation. May be null. + /// + /// + /// Logger callback invoked with any exception thrown + /// synchronously by or surfaced through + /// its . May be null. + /// + public static async Task Run(Func body, Action onFault) + { + if (body == null) return; + + try + { + var task = body(); + if (task != null) await task.ConfigureAwait(false); + } + catch (Exception ex) + { + if (onFault != null) + { + try { onFault(ex); } catch { } + } + } + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AvailabilityTopic.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AvailabilityTopic.cs new file mode 100644 index 000000000..bbdae8cb0 --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/AvailabilityTopic.cs @@ -0,0 +1,88 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +namespace MTConnect +{ + /// + /// Builds the MQTT topic on which the MqttRelay agent module + /// publishes the agent's Availability state (the MQTT Last Will and + /// Testament topic plus the on-connect retained Available message). + /// + public static class AvailabilityTopic + { + /// + /// Constant topic segment that separates the agent-availability + /// publishes from the document-envelope publishes (Probe / + /// Current / Sample / Asset). Subscribers wildcarding on + /// {TopicPrefix}/Probe/# therefore never receive the raw + /// availability payload. + /// + public const string AgentSegment = "Agent"; + + /// + /// Constant trailing topic segment that names the availability + /// publish. + /// + public const string AvailableSegment = "Available"; + + /// + /// Builds the full MQTT topic the MqttRelay module uses to + /// publish the agent's Availability state. Returns + /// null when either input is null or empty, or when + /// either input contains an MQTT-reserved character so callers + /// can short-circuit before configuring an MQTT publish. + /// + /// + /// The configured TopicPrefix value, e.g. + /// MTConnect or MTConnect/Document. Leading and + /// trailing / separators are stripped so the resulting + /// topic stays canonical. + /// + /// + /// The agent's Uuid identifier. The agentUuid is a + /// single topic segment so it must not contain /. + /// + /// + /// Per MQTT 3.1.1 OASIS standard section 4.7.1.1, MQTT topic + /// names must not contain the wildcard characters + or + /// # nor the null character \0; supplying any of + /// those characters in either input produces a null + /// return. + /// + public static string Build(string topicPrefix, string agentUuid) + { + if (string.IsNullOrEmpty(topicPrefix)) return null; + if (string.IsNullOrEmpty(agentUuid)) return null; + + // MQTT 3.1.1 ยง4.7.1.1: '+' and '#' are wildcard characters + // and the null character is reserved. None of them are + // legal inside a topic name. + if (ContainsReservedCharacter(topicPrefix)) return null; + if (ContainsReservedCharacter(agentUuid)) return null; + + // The agentUuid is a single topic segment so it must not + // embed a '/'; otherwise the {TopicPrefix}/Agent/{AgentUuid} + // /Available shape silently fragments across additional + // segments. + if (agentUuid.IndexOf('/') >= 0) return null; + + // Canonicalise the prefix: strip a leading or trailing '/' + // so the resulting topic does not begin with '/' nor + // contain a stray empty segment. + var canonicalPrefix = topicPrefix.Trim('/'); + if (canonicalPrefix.Length == 0) return null; + + return $"{canonicalPrefix}/{AgentSegment}/{agentUuid}/{AvailableSegment}"; + } + + private static bool ContainsReservedCharacter(string value) + { + for (var i = 0; i < value.Length; i++) + { + var c = value[i]; + if (c == '+' || c == '#' || c == '\0') return true; + } + return false; + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequencePersister.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequencePersister.cs new file mode 100644 index 000000000..26630b782 --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequencePersister.cs @@ -0,0 +1,92 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading; + +namespace MTConnect +{ + /// + /// Tracks the MqttRelay last-sent observation sequence in memory + /// and flushes to durable storage only when explicitly asked. + /// Module.cs previously wrote to disk synchronously from every + /// successful observation publish; on a high-rate stream that put + /// a disk write on every ThreadPool callback. This persister + /// decouples the event-handler hot path from disk IO: handlers + /// call , and a timer / shutdown / batch + /// boundary calls . + /// + /// All read/write of the value field uses + /// so the persister is safe to share + /// between event-handler threads and a flush timer thread on 32- + /// bit hosts where bare 64-bit access can tear. + /// + public sealed class LastSentSequencePersister + { + private long _value; + private int _dirty; // 0 = clean, 1 = dirty (Interlocked-managed) + + /// + /// Returns the current in-memory sequence value with an + /// atomic 64-bit read. Does not clear the dirty bit; only + /// establishes durable persistence. + /// + public ulong Read() + { + return unchecked((ulong)Interlocked.Read(ref _value)); + } + + /// + /// Whether the in-memory value has been modified since the + /// last successful flush. The flush timer can use this to + /// skip writes when the relay is idle. + /// + public bool IsDirty + { + get { return Interlocked.CompareExchange(ref _dirty, 0, 0) != 0; } + } + + /// + /// Records a new last-sent sequence value (last write wins) + /// and marks the persister dirty so the next flush will + /// emit a write. + /// + public void Update(ulong value) + { + Interlocked.Exchange(ref _value, unchecked((long)value)); + Interlocked.Exchange(ref _dirty, 1); + } + + /// + /// Seeds the in-memory value from durable storage at startup + /// without marking the persister dirty. The initial value + /// already matches disk so a flush would be redundant. + /// + public void Initialize(ulong value) + { + Interlocked.Exchange(ref _value, unchecked((long)value)); + Interlocked.Exchange(ref _dirty, 0); + } + + /// + /// Emits a write through if and only + /// if the persister is dirty, then clears the dirty bit. If + /// the writer throws, the dirty bit is preserved and the + /// exception propagates so the caller can log and retry on + /// the next tick. Returns true when a write was + /// actually emitted. + /// + public bool TryFlush(Action write) + { + if (write == null) return false; + if (Interlocked.CompareExchange(ref _dirty, 0, 0) == 0) return false; + + var snapshot = Read(); + // Clear dirty *after* a successful write so that a thrown + // writer leaves the persister dirty for the next attempt. + write(snapshot); + Interlocked.Exchange(ref _dirty, 0); + return true; + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequenceTracker.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequenceTracker.cs new file mode 100644 index 000000000..e66ba60b0 --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/LastSentSequenceTracker.cs @@ -0,0 +1,44 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Threading; + +namespace MTConnect +{ + /// + /// Centralises atomic 64-bit access to the MqttRelay last-sent + /// sequence counter. Module.cs previously read and wrote a bare + /// field. On 32-bit hosts a 64-bit field is read + /// and written as two 32-bit halves and a concurrent reader can + /// observe a torn value. MqttRelay reads the counter from + /// observation event handlers (multiple ThreadPool threads) while + /// the durable-relay Worker writes it, so the access pattern was + /// genuinely racy. The tracker wraps every read/write in + /// primitives so the policy is enforced + /// uniformly and is unit-testable. + /// + public sealed class LastSentSequenceTracker + { + private long _value; + + /// + /// Returns the current sequence value with an atomic 64-bit + /// read. Round-trips the full range, + /// including values whose bit pattern is negative when + /// reinterpreted as a signed . + /// + public ulong Read() + { + return unchecked((ulong)Interlocked.Read(ref _value)); + } + + /// + /// Sets the current sequence value with an atomic 64-bit write. + /// Last write wins; this is not a max-watermark. + /// + public void Write(ulong value) + { + Interlocked.Exchange(ref _value, unchecked((long)value)); + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs index 06411a474..6a4f215a6 100644 --- a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs @@ -37,7 +37,13 @@ public class Module : MTConnectAgentModule private CancellationTokenSource _stop; private static readonly object _lastSentSequenceLock = new object(); private long _totalIncomingObservations = 0; - private long _lastSentSequence = 0; + // In-memory persister: tracks the last-sent sequence under + // DurableRelay and flushes to disk only on a timer, on + // shutdown, and at batch boundaries (rather than on every + // observation, which serialised the relay behind disk IO). + private readonly LastSentSequencePersister _lastSentSequencePersister = new LastSentSequencePersister(); + private System.Threading.Timer _lastSentFlushTimer; + private static readonly TimeSpan LastSentFlushInterval = TimeSpan.FromSeconds(1); private const string DirectoryBuffer = "buffer"; private const string LastSentSequenceFileName = "mqttrelay_last_sent.seq"; @@ -77,23 +83,61 @@ protected override void OnStartAfterLoad(bool initializeDataItems) { _stop = new CancellationTokenSource(); + // Seed the in-memory persister from disk so the first + // RelayBufferedObservations diagnostic uses the durable + // value rather than zero. + if (_configuration.DurableRelay) + { + _lastSentSequencePersister.Initialize(ReadLastSentSequenceFromDisk()); + + // Start a background flush timer. The timer only emits + // a write when the persister is dirty so an idle relay + // does not burn IOPS. + _lastSentFlushTimer = new System.Threading.Timer( + _ => FlushLastSentSequence(), + null, + LastSentFlushInterval, + LastSentFlushInterval); + } + _ = Task.Run(Worker, _stop.Token); } protected override void OnStop() { - _documentServer.Stop(); + // Entity-mode constructs only _entityServer, so a bare + // _documentServer.Stop() here would raise an NRE during + // shutdown. Route through the lifecycle helper so each + // server stop is independently null-safe and exception-safe. + MqttRelayLifecycle.StopServers( + documentStop: _documentServer != null ? (Action)(() => _documentServer.Stop()) : null, + entityStop: null); + + // Flush any pending last-sent-sequence update before the + // timer is disposed, so a clean shutdown does not lose + // progress against the buffered-observation relay. + if (_lastSentFlushTimer != null) + { + try { _lastSentFlushTimer.Dispose(); } catch { } + _lastSentFlushTimer = null; + } + FlushLastSentSequence(); if (_stop != null) _stop.Cancel(); - try - { - if (_mqttClient != null) - { - _mqttClient.DisconnectAsync(MqttClientDisconnectOptionsReason.NormalDisconnection); - } - } - catch { } + // Bounded await on the disconnect: previously this was a + // fire-and-forget DisconnectAsync(...) whose returned Task + // was never awaited. Faults were silently dropped and the + // host process risked exiting before the disconnect + // completed. Route through the lifecycle helper so faults + // surface to the log and a hung broker cannot block + // shutdown indefinitely. + MqttRelayLifecycle.DisconnectWithTimeout( + disconnect: _mqttClient != null + ? (Func)(() => _mqttClient.DisconnectAsync(MqttClientDisconnectOptionsReason.NormalDisconnection)) + : null, + timeout: TimeSpan.FromSeconds(5), + onFault: ex => Log(MTConnectLogLevel.Warning, $"MQTT Relay Disconnect Error : {ex.Message}")); } private async Task Worker() @@ -246,7 +290,15 @@ private async Task Worker() await Task.Delay(_configuration.ReconnectInterval, _stop.Token); } catch (TaskCanceledException) { } - catch (Exception) { } + catch (Exception ex) + { + // Route through WorkerLoopExceptionLogger so the + // operator sees an unexpected defect at the outer + // scope instead of having it silently swallowed. + WorkerLoopExceptionLogger.Log( + exception: ex, + onLog: msg => Log(MTConnectLogLevel.Warning, msg)); + } } while (!_stop.Token.IsCancellationRequested); } @@ -273,13 +325,16 @@ private async Task RelayBufferedObservations() { if (Agent is IMTConnectAgentBroker broker) { - ulong lastSent = GetLastSentSequence(); + // Read in-memory: the persister was seeded from disk at + // OnStartAfterLoad. Avoid a synchronous file read on + // every relay attempt. + ulong lastSent = _lastSentSequencePersister.Read(); ulong from = Math.Max(lastSent + 1, broker.FirstSequence); ulong to = broker.LastSequence; Log(MTConnectLogLevel.Information, $"[MQTT Relay] RelayBufferedObservations: lastSent={lastSent}, broker.FirstSequence={broker.FirstSequence}, broker.LastSequence={broker.LastSequence}"); - long missed = (long)(to - lastSent); + long missed = RelayBufferDiagnostics.ComputeMissed(to, lastSent); if (lastSent + 1 < broker.FirstSequence) { @@ -319,12 +374,19 @@ private async Task RelayBufferedObservations() var result = await _entityServer.PublishObservation(_mqttClient, x); if (result != null && result.IsSuccess) { - SetLastSentSequence(x.Sequence); + RecordLastSentSequence(x.Sequence); } } + // Batch boundary: the buffered relay just ran a + // chunk of observations under DurableRelay; flush + // the persister so a crash before the next timer + // tick does not lose this batch's progress. + FlushLastSentSequence(); + Log(MTConnectLogLevel.Information, $"[MQTT Relay] Buffered observation relay complete. {observations.Count} missed observations sent."); - var unsent = broker.LastSequence > (ulong)_lastSentSequence ? broker.LastSequence - (ulong)_lastSentSequence : 0; + var lastSentSnapshot = _lastSentSequencePersister.Read(); + var unsent = broker.LastSequence > lastSentSnapshot ? broker.LastSequence - lastSentSnapshot : 0; if (unsent > 0) Log(MTConnectLogLevel.Information, $"[MQTT Relay] {unsent} new observations arrived during relay and will be sent next."); } @@ -349,7 +411,10 @@ private static string GetLastSentSequenceFilePath(IAgentApplicationConfiguration return Path.Combine(baseDir, LastSentSequenceFileName); } - private ulong GetLastSentSequence() + // Disk read used at startup (Initialize) only. Hot-path reads + // go through _lastSentSequencePersister.Read(); this method + // is the single point of entry for the actual file IO. + private ulong ReadLastSentSequenceFromDisk() { if (!_configuration.DurableRelay) return 0; var agentConfig = Agent?.Configuration as IAgentApplicationConfiguration; @@ -365,18 +430,53 @@ private ulong GetLastSentSequence() return 0; } - private void SetLastSentSequence(ulong seq) + // Records a new in-memory last-sent sequence. The actual disk + // write happens out-of-band on the flush timer / shutdown / + // batch boundary via FlushLastSentSequence. + private void RecordLastSentSequence(ulong seq) { if (!_configuration.DurableRelay) return; // Default - var agentConfig = Agent?.Configuration as IAgentApplicationConfiguration; - var path = GetLastSentSequenceFilePath(agentConfig); - lock (_lastSentSequenceLock) + _lastSentSequencePersister.Update(seq); + } + + // Writes the in-memory persister value to disk if dirty. Safe + // to call from any thread; the inner File.WriteAllText runs + // under _lastSentSequenceLock so a timer tick cannot race a + // shutdown flush. + private void FlushLastSentSequence() + { + if (!_configuration.DurableRelay) return; + + try + { + var agentConfig = Agent?.Configuration as IAgentApplicationConfiguration; + var path = GetLastSentSequenceFilePath(agentConfig); + + _lastSentSequencePersister.TryFlush(seq => + { + lock (_lastSentSequenceLock) + { + File.WriteAllText(path, seq.ToString()); + } + }); + } + catch (Exception ex) { - File.WriteAllText(path, seq.ToString()); + Log(MTConnectLogLevel.Warning, $"MQTT Relay last-sent-sequence flush error : {ex.Message}"); } } private async void ProbeReceived(IDevice device, IDevicesResponseDocument responseDocument) + { + // async void handler: route any unhandled fault (formatter + // throw, message-builder failure) to the log instead of the + // synchronization context. + await AsyncVoidGuard.Run( + async () => await ProbeReceivedCore(device, responseDocument), + ex => Log(MTConnectLogLevel.Warning, $"ProbeReceived handler error : {ex.Message}")); + } + + private async Task ProbeReceivedCore(IDevice device, IDevicesResponseDocument responseDocument) { if (_mqttClient != null && _mqttClient.IsConnected) { @@ -453,6 +553,15 @@ private async void ProbeReceived(IDevice device, IDevicesResponseDocument respon } private async void CurrentReceived(IDevice device, IStreamsResponseOutputDocument responseDocument) + { + // async void handler: route any unhandled fault to the log + // instead of the synchronization context. + await AsyncVoidGuard.Run( + async () => await CurrentReceivedCore(device, responseDocument), + ex => Log(MTConnectLogLevel.Warning, $"CurrentReceived handler error : {ex.Message}")); + } + + private async Task CurrentReceivedCore(IDevice device, IStreamsResponseOutputDocument responseDocument) { if (_mqttClient != null && _mqttClient.IsConnected) { @@ -493,6 +602,15 @@ private async void CurrentReceived(IDevice device, IStreamsResponseOutputDocumen } private async void SampleReceived(IDevice device, IStreamsResponseOutputDocument responseDocument) + { + // async void handler: route any unhandled fault to the log + // instead of the synchronization context. + await AsyncVoidGuard.Run( + async () => await SampleReceivedCore(device, responseDocument), + ex => Log(MTConnectLogLevel.Warning, $"SampleReceived handler error : {ex.Message}")); + } + + private async Task SampleReceivedCore(IDevice device, IStreamsResponseOutputDocument responseDocument) { if (_mqttClient != null && _mqttClient.IsConnected) { @@ -533,6 +651,15 @@ private async void SampleReceived(IDevice device, IStreamsResponseOutputDocument } private async void AssetReceived(IDevice device, IAssetsResponseDocument responseDocument) + { + // async void handler: route any unhandled fault to the log + // instead of the synchronization context. + await AsyncVoidGuard.Run( + async () => await AssetReceivedCore(device, responseDocument), + ex => Log(MTConnectLogLevel.Warning, $"AssetReceived handler error : {ex.Message}")); + } + + private async Task AssetReceivedCore(IDevice device, IAssetsResponseDocument responseDocument) { if (_mqttClient != null && _mqttClient.IsConnected && responseDocument != null && !responseDocument.Assets.IsNullOrEmpty()) { @@ -602,65 +729,60 @@ private async Task PublishCurrentObservations() private async Task PublishObservations(IEnumerable observations) { - if (!observations.IsNullOrEmpty()) + if (observations.IsNullOrEmpty()) return; + + // Single-pass grouping replaces the previous O(n*k) + // Distinct + Where + FirstOrDefault pattern that iterated + // the source up to three times per distinct DataItemId. + foreach (var group in ObservationGrouper.GroupByDataItem(observations)) { - var dataItemIds = observations.Select(o => o.DataItemId).Distinct(); - foreach (var dataItemId in dataItemIds) - { - var dataItemObservations = observations.Where(o => o.DataItemId == dataItemId); - var dataItemObservation = dataItemObservations.FirstOrDefault(); + // Materialise each group's observations once: the + // condition path needs to enumerate twice (test the + // first item's category and rebuild every observation) + // and re-iterating the IGrouping would re-iterate the + // upstream source. + var groupObservations = group.ToList(); + if (groupObservations.Count == 0) continue; - if (dataItemObservation.Category == DataItemCategory.CONDITION) - { - // Conditions have multiple observations - var multipleObservations = new List(); - foreach (var observation in dataItemObservations) - { - var x = new Observation(); - x.DeviceUuid = observation.DeviceUuid; - x.DataItemId = observation.DataItemId; - x.DataItem = observation.DataItem; - x.Name = observation.Name; - x.Category = observation.Category; - x.Type = observation.Type; - x.SubType = observation.SubType; - x.Representation = observation.Representation; - x.CompositionId = observation.CompositionId; - x.InstanceId = observation.InstanceId; - x.Sequence = observation.Sequence; - x.Timestamp = observation.Timestamp; - x.AddValues(observation.Values); - - multipleObservations.Add(x); - } + var first = groupObservations[0]; - await _entityServer.PublishObservations(_mqttClient, multipleObservations); - } - else + if (first.Category == DataItemCategory.CONDITION) + { + // Conditions have multiple observations + var multipleObservations = new List(groupObservations.Count); + foreach (var observation in groupObservations) { - var observation = dataItemObservations.FirstOrDefault(); - - var x = new Observation(); - x.DeviceUuid = observation.DeviceUuid; - x.DataItemId = observation.DataItemId; - x.DataItem = observation.DataItem; - x.Name = observation.Name; - x.Category = observation.Category; - x.Type = observation.Type; - x.SubType = observation.SubType; - x.Representation = observation.Representation; - x.CompositionId = observation.CompositionId; - x.InstanceId = observation.InstanceId; - x.Sequence = observation.Sequence; - x.Timestamp = observation.Timestamp; - x.AddValues(observation.Values); - - await _entityServer.PublishObservation(_mqttClient, x); + multipleObservations.Add(CloneAsObservation(observation)); } + + await _entityServer.PublishObservations(_mqttClient, multipleObservations); + } + else + { + await _entityServer.PublishObservation(_mqttClient, CloneAsObservation(first)); } } } + private static Observation CloneAsObservation(IObservationOutput source) + { + var x = new Observation(); + x.DeviceUuid = source.DeviceUuid; + x.DataItemId = source.DataItemId; + x.DataItem = source.DataItem; + x.Name = source.Name; + x.Category = source.Category; + x.Type = source.Type; + x.SubType = source.SubType; + x.Representation = source.Representation; + x.CompositionId = source.CompositionId; + x.InstanceId = source.InstanceId; + x.Sequence = source.Sequence; + x.Timestamp = source.Timestamp; + x.AddValues(source.Values); + return x; + } + private async Task PublishAssets() { if (_entityServer != null) @@ -679,70 +801,91 @@ private async Task PublishAssets() private async void AgentDeviceAdded(object sender, IDevice device) { - if (_entityServer != null) await _entityServer.PublishDevice(_mqttClient, device); + // async void handlers must not throw; route any fault to + // the log instead of the synchronization context. + await AsyncVoidGuard.Run( + async () => + { + if (_entityServer != null) await _entityServer.PublishDevice(_mqttClient, device); + }, + ex => Log(MTConnectLogLevel.Warning, $"AgentDeviceAdded handler error : {ex.Message}")); } private async void AgentObservationAdded(object sender, IObservation observation) { - if (_configuration.DurableRelay) - { - Interlocked.Increment(ref _totalIncomingObservations); - - var lastSent = GetLastSentSequence(); - _lastSentSequence = (long)lastSent; - - if (_entityServer != null) + // async void handlers must not throw; route any fault to + // the log instead of the synchronization context. + await AsyncVoidGuard.Run( + async () => { - if (observation.Category == DataItemCategory.CONDITION) + if (_configuration.DurableRelay) { - var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId); - var result = await _entityServer.PublishObservations(_mqttClient, conditionObservations.OfType()); - if (result != null && result.IsSuccess && conditionObservations != null && conditionObservations.Any()) + Interlocked.Increment(ref _totalIncomingObservations); + + // No disk read here: the persister was seeded + // at startup and is updated in-memory below. + // The previous GetLastSentSequence() call did + // a synchronous File.ReadAllText on every + // observation event. + + if (_entityServer != null) { - SetLastSentSequence(conditionObservations.Max(o => o.Sequence)); + if (observation.Category == DataItemCategory.CONDITION) + { + var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId); + var result = await _entityServer.PublishObservations(_mqttClient, conditionObservations.OfType()); + if (result != null && result.IsSuccess && conditionObservations != null && conditionObservations.Any()) + { + RecordLastSentSequence(conditionObservations.Max(o => o.Sequence)); + } + } + else + { + var result = await _entityServer.PublishObservation(_mqttClient, observation); + if (result != null && result.IsSuccess) + { + RecordLastSentSequence(observation.Sequence); + } + } } } else { - var result = await _entityServer.PublishObservation(_mqttClient, observation); - if (result != null && result.IsSuccess) + if (_entityServer != null) { - SetLastSentSequence(observation.Sequence); + if (observation.Category == DataItemCategory.CONDITION) + { + var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId); + await PublishObservations(conditionObservations); + } + else + { + await _entityServer.PublishObservation(_mqttClient, observation); + } } } - } - } - else - { - if (_entityServer != null) - { - if (observation.Category == DataItemCategory.CONDITION) - { - var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId); - await PublishObservations(conditionObservations); - } - else - { - await _entityServer.PublishObservation(_mqttClient, observation); - } - } - } + }, + ex => Log(MTConnectLogLevel.Warning, $"AgentObservationAdded handler error : {ex.Message}")); } private async void AgentAssetAdded(object sender, IAsset asset) { - if (_entityServer != null) await _entityServer.PublishAsset(_mqttClient, asset); + // async void handlers must not throw; route any fault to + // the log instead of the synchronization context. + await AsyncVoidGuard.Run( + async () => + { + if (_entityServer != null) await _entityServer.PublishAsset(_mqttClient, asset); + }, + ex => Log(MTConnectLogLevel.Warning, $"AgentAssetAdded handler error : {ex.Message}")); } private string GetAgentAvailableTopic() { - if (Agent != null && _configuration != null) - { - return $"{_configuration.TopicPrefix}/{MTConnectMqttDocumentServer.ProbeTopic}/{Agent.Uuid}/Available"; ; - } + if (Agent == null || _configuration == null) return null; - return null; + return AvailabilityTopic.Build(_configuration.TopicPrefix, Agent.Uuid); } } } diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayLifecycle.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayLifecycle.cs new file mode 100644 index 000000000..f4d61d2ae --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayLifecycle.cs @@ -0,0 +1,127 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; + +namespace MTConnect +{ + /// + /// Centralises the MqttRelay agent module shutdown policy. The + /// MqttRelay module previously called _documentServer.Stop() + /// unconditionally from OnStop(); when configured for + /// TopicStructure=Entity only _entityServer is + /// constructed, so the document-server reference was null + /// and shutdown raised a . + /// Encapsulating the policy here makes the null-guard unit-testable + /// without instantiating an MTConnect agent broker, and keeps the + /// per-server stop independent so a throw on one path cannot leave + /// the other server running. + /// + public static class MqttRelayLifecycle + { + /// + /// Invokes the supplied per-server stop actions, tolerating + /// either or both being null. A throw from one action + /// does not prevent the other from running. + /// + /// + /// Optional action stopping the Document-mode server. + /// + /// + /// Optional action stopping the Entity-mode server. + /// + public static void StopServers(Action documentStop, Action entityStop) + { + if (documentStop != null) + { + try { documentStop(); } catch { } + } + + if (entityStop != null) + { + try { entityStop(); } catch { } + } + } + + /// + /// Awaits an MQTT-client disconnect with a bounded timeout. + /// Replaces the previous fire-and-forget invocation in + /// OnStop(): + /// + /// * Synchronous throw from is + /// captured and routed to instead + /// of escaping the shutdown path. + /// * A faulted Task is surfaced to + /// with its inner exception. + /// * A disconnect that does not complete within + /// is treated as best-effort + /// success so a misbehaving broker cannot hang shutdown. + /// * A null is a no-op (the + /// worker may never have created an MQTT client). + /// + /// + /// Factory that begins the disconnect and returns its + /// . May be null. + /// + /// + /// Bound on how long shutdown is willing to wait. A timeout + /// elapses silently because the agent is already shutting down. + /// + /// + /// Logger callback invoked with the exception when the + /// disconnect throws synchronously or its Task faults. + /// + public static void DisconnectWithTimeout( + Func disconnect, + TimeSpan timeout, + Action onFault) + { + if (disconnect == null) return; + + Task task; + try + { + task = disconnect(); + } + catch (Exception ex) + { + if (onFault != null) onFault(ex); + return; + } + + if (task == null) return; + + try + { + if (!task.Wait(timeout)) + { + // Bounded wait elapsed; agent is shutting down so + // a hung broker cannot block process exit. Best + // effort: leave the task to be GC'd. Attach a + // continuation so its eventual fault is still + // surfaced rather than swallowed by the finalizer. + if (onFault != null) + { + task.ContinueWith( + t => onFault(t.Exception), + TaskContinuationOptions.OnlyOnFaulted); + } + return; + } + } + catch (AggregateException agg) + { + if (onFault != null) + { + var inner = agg.InnerException ?? agg; + onFault(inner); + } + } + catch (Exception ex) + { + if (onFault != null) onFault(ex); + } + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/ObservationGrouper.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/ObservationGrouper.cs new file mode 100644 index 000000000..a09ae7748 --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/ObservationGrouper.cs @@ -0,0 +1,40 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Linq; +using MTConnect.Observations.Output; + +namespace MTConnect +{ + /// + /// Single-pass grouping helper for MqttRelay observation + /// publishing. Module.PublishObservations previously iterated the + /// input enumerable up to three times per distinct DataItemId + /// (Distinct, Where, FirstOrDefault); on large agents that + /// throttled the catch-up after a reconnect. This helper produces + /// the same logical grouping in a single pass. + /// + public static class ObservationGrouper + { + /// + /// Groups by + /// in a single + /// pass. Encounter order is preserved within each group so a + /// caller that relies on sequence-monotonic ordering is not + /// broken. A null source returns an empty result. + /// + public static IEnumerable> GroupByDataItem( + IEnumerable observations) + { + if (observations == null) return Enumerable.Empty>(); + + // Materialise into a List then GroupBy, so a deferred + // upstream query is iterated exactly once. (LINQ GroupBy + // is itself single-pass over its source, but ToList + // makes the contract explicit and lets the caller iterate + // the resulting groups multiple times safely.) + return observations.ToList().GroupBy(o => o.DataItemId); + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md index a1e2f8c8c..688bb08b6 100644 --- a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md @@ -105,6 +105,43 @@ modules: topicPrefix: enterprise/site/area/line/cell/MTConnect ``` +## Migration: Availability Topic Moved Out of `Probe/#` + +In earlier releases the MqttRelay agent module published the agent's +Availability state (Last Will and Testament plus the on-connect retained +"AVAILABLE" message) on a four-segment topic that fell under the +`{TopicPrefix}/Probe/#` wildcard: + +``` +{TopicPrefix}/Probe/{AgentUuid}/Available (raw "AVAILABLE" / "UNAVAILABLE" UTF-8 bytes) +``` + +That broke the contract that every payload under the Probe wildcard is a +JSON document envelope. Subscribers binding to `{TopicPrefix}/Probe/#` +expecting only Probe document JSON would receive a non-JSON Availability +payload and either crash or silently drop the message. + +Starting with this release the module emits agent Availability on a +dedicated, non-overlapping topic: + +``` +{TopicPrefix}/Agent/{AgentUuid}/Available (raw "AVAILABLE" / "UNAVAILABLE" UTF-8 bytes) +``` + +This matches the cppagent reference implementation +([mtconnect/cppagent](https://github.com/mtconnect/cppagent)) which also +publishes JSON document envelopes only under the Probe wildcard and emits +agent status on a dedicated agent / status topic. The Probe wildcard is +now pure-JSON for any subscriber. + +### Action required when upgrading + +* Subscribers that relied on the old `{TopicPrefix}/Probe/{AgentUuid}/Available` + topic to track agent Availability must subscribe to + `{TopicPrefix}/Agent/{AgentUuid}/Available` instead. +* Any retained-message cleanup tooling pointed at the old topic should be + updated to clear the new topic on broker rotations. + ## Contribution / Feedback - Please use the [Issues](https://github.com/TrakHound/MTConnect.NET/issues) tab to create issues for specific problems that you may encounter - Please feel free to use the [Pull Requests](https://github.com/TrakHound/MTConnect.NET/pulls) tab for any suggested improvements to the source code diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/RelayBufferDiagnostics.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/RelayBufferDiagnostics.cs new file mode 100644 index 000000000..d612f6b92 --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/RelayBufferDiagnostics.cs @@ -0,0 +1,36 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +namespace MTConnect +{ + /// + /// Helpers for the MqttRelay buffered-observation diagnostic log + /// lines. The "missed observations" figure printed when a relay + /// reconnects is computed against unsigned broker sequence numbers, + /// so the helper guards the unsigned subtraction from underflow. + /// + public static class RelayBufferDiagnostics + { + /// + /// Computes the count of buffered observations the operator + /// reasonably expects to see relayed when the connection + /// recovers. Returns 0 when + /// is at or above ; that case is + /// degenerate (a stale persisted last-sent-sequence value or a + /// rolled broker sequence) so a meaningful "missed" figure + /// cannot be produced and the diagnostic should not print a + /// huge spurious number. + /// + /// + /// The broker's current LastSequence. + /// + /// + /// The persisted last-sent-sequence. + /// + public static long ComputeMissed(ulong to, ulong lastSent) + { + if (lastSent >= to) return 0; + return (long)(to - lastSent); + } + } +} diff --git a/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/WorkerLoopExceptionLogger.cs b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/WorkerLoopExceptionLogger.cs new file mode 100644 index 000000000..5b31667ca --- /dev/null +++ b/agent/Modules/MTConnect.NET-AgentModule-MqttRelay/WorkerLoopExceptionLogger.cs @@ -0,0 +1,39 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; + +namespace MTConnect +{ + /// + /// Encodes the MqttRelay Worker outer-catch logging policy. The + /// outer-loop catch in Module.Worker previously had a bare empty + /// catch on , swallowing any unexpected + /// exception escaping the inner try/catch. The relay quietly + /// entered the reconnect delay branch and the underlying defect + /// went undiagnosed for the lifetime of the agent. + /// + /// This helper centralizes the policy: skip the orderly-shutdown + /// cancelation signals, log everything else with type and message + /// so the operator can diagnose the defect from log scrapes. + /// + public static class WorkerLoopExceptionLogger + { + /// + /// Routes to + /// unless the exception is a cancelation signal + /// ( or its + /// subclass), in which case + /// the policy is silence (orderly shutdown). + /// + public static void Log(Exception exception, Action onLog) + { + if (exception == null) return; + if (onLog == null) return; + if (exception is OperationCanceledException) return; + + onLog($"MQTT Relay Worker unexpected error : {exception.GetType().Name} : {exception.Message}"); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AsyncVoidGuardTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AsyncVoidGuardTests.cs new file mode 100644 index 000000000..c63947165 --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AsyncVoidGuardTests.cs @@ -0,0 +1,106 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MqttRelay async-void event-handler safety policy. The + /// module exposes seven event handlers that the agent broker + /// invokes as async void: + /// + /// AgentDeviceAdded, AgentObservationAdded, AgentAssetAdded, + /// ProbeReceived, CurrentReceived, SampleReceived, + /// AssetReceived + /// + /// An async void method that throws routes its exception to + /// the synchronization context, which on the ThreadPool tears down + /// the host process. Three of the seven handlers (AgentDeviceAdded, + /// AgentObservationAdded, AgentAssetAdded) had no top-level + /// try/catch; the other four guarded only the inner publish. A + /// throw from the formatter or from a synchronous broker call + /// (DataItem null-deref, broker shutting down) crashed the agent. + /// + /// AsyncVoidGuard.Run wraps the handler body and routes any + /// exception to a logging callback so the policy is enforced + /// uniformly and is unit-testable. Pinning the policy here also + /// prevents a future contributor from re-introducing an unguarded + /// async-void path. + /// + [TestFixture] + public class AsyncVoidGuardTests + { + [Test] + public async Task Run_completes_normally_when_body_succeeds() + { + string loggedFault = null; + + await AsyncVoidGuard.Run( + async () => { await Task.Yield(); }, + ex => loggedFault = ex.Message); + + Assert.That(loggedFault, Is.Null, + "Successful body must not invoke the fault logger."); + } + + [Test] + public async Task Run_routes_synchronous_throw_to_logger() + { + string loggedFault = null; + + await AsyncVoidGuard.Run( + () => throw new InvalidOperationException("sync throw"), + ex => loggedFault = ex.Message); + + Assert.That(loggedFault, Is.EqualTo("sync throw")); + } + + [Test] + public async Task Run_routes_async_throw_to_logger() + { + string loggedFault = null; + + await AsyncVoidGuard.Run( + async () => { await Task.Yield(); throw new InvalidOperationException("async throw"); }, + ex => loggedFault = ex.Message); + + Assert.That(loggedFault, Is.EqualTo("async throw")); + } + + [Test] + public async Task Run_does_not_rethrow_when_logger_is_null() + { + // Even with a null logger, the guard must swallow the + // exception so an async-void handler cannot crash the host. + await AsyncVoidGuard.Run( + () => throw new InvalidOperationException("no logger"), + onFault: null); + + Assert.Pass(); + } + + [Test] + public async Task Run_does_not_rethrow_when_logger_itself_throws() + { + // A misbehaving logger must not corrupt the guard + // contract: the originating handler must still complete. + await AsyncVoidGuard.Run( + () => throw new InvalidOperationException("body"), + ex => throw new InvalidOperationException("logger crashed")); + + Assert.Pass(); + } + + [Test] + public async Task Run_no_ops_when_body_is_null() + { + // A misuse case (handler delegate not wired); the guard + // must not throw NullReferenceException of its own. + await AsyncVoidGuard.Run(body: null, onFault: _ => { }); + Assert.Pass(); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicRegressionTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicRegressionTests.cs new file mode 100644 index 000000000..9099d32e0 --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicRegressionTests.cs @@ -0,0 +1,88 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the regression for + /// https://github.com/TrakHound/MTConnect.NET/issues/135. The + /// MqttRelay agent module previously published its agent + /// availability state (LWT plus on-connect retained Available + /// message) on a four-segment topic under the Probe wildcard with a + /// raw UTF-8 string payload. Any subscriber on + /// {TopicPrefix}/Probe/# parsing every payload as JSON failed on + /// that raw string. + /// + /// These tests guard against accidental re-introduction of the + /// broken shape: any future refactor that re-routes the + /// availability publish under the Probe wildcard or that re-uses + /// the Probe topic constant in availability-topic construction + /// fails the guard. + /// + /// Source references: + /// https://github.com/TrakHound/MTConnect.NET/issues/135 + /// https://github.com/mtconnect/cppagent (canonical Probe/# + /// wildcard contract: only JSON document envelopes). + /// + [TestFixture] + public class AvailabilityTopicRegressionTests + { + [Test] + public void Topic_never_contains_probe_segment_for_any_inputs() + { + string[] topicPrefixes = + { + "MTConnect", + "MTConnect/Document", + "fleet/site/agent-1", + "Probe", // adversarial: prefix happens to be "Probe". + "x" + }; + + string[] agentUuids = + { + "agent-uuid-1", + "Probe", // adversarial: uuid happens to be "Probe". + "00000000-0000-0000-0000-000000000000", + "agent.with.dots", + "agent-with-dashes" + }; + + foreach (var prefix in topicPrefixes) + { + foreach (var uuid in agentUuids) + { + var topic = AvailabilityTopic.Build(prefix, uuid); + + Assert.That(topic, Is.Not.Null, + $"Build({prefix}, {uuid}) should produce a topic."); + + // The dedicated availability segment is "Agent", + // sandwiched between the configured prefix and + // the agent uuid. Even when the prefix or the + // uuid happens to be the literal "Probe" string, + // the dedicated availability segment must never + // be "Probe" so a Probe/# wildcard subscriber + // never matches the availability publish. + var expectedSegment = + $"/{AvailabilityTopic.AgentSegment}/{uuid}/{AvailabilityTopic.AvailableSegment}"; + + Assert.That(topic, Does.EndWith(expectedSegment), + $"Build({prefix}, {uuid}) should end with the relocated availability segment."); + } + } + } + + [Test] + public void Agent_segment_is_distinct_from_probe_constant() + { + // Guards against a future refactor that points + // AvailabilityTopic.AgentSegment back at the Probe topic + // constant (the source of the original defect). + Assert.That(AvailabilityTopic.AgentSegment, Is.Not.EqualTo("Probe")); + Assert.That(AvailabilityTopic.AgentSegment, Is.EqualTo("Agent")); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicTests.cs new file mode 100644 index 000000000..a2df94669 --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicTests.cs @@ -0,0 +1,105 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MQTT topic on which the MqttRelay agent module publishes + /// the agent's Availability state. Background and motivation: + /// + /// https://github.com/TrakHound/MTConnect.NET/issues/135 + /// + /// The MqttRelay's MQTT Last Will and Testament plus on-connect + /// retained Available message previously emitted on + /// {TopicPrefix}/Probe/{AgentUuid}/Available + /// with a raw "AVAILABLE" / "UNAVAILABLE" UTF-8 string payload. + /// That four-segment topic falls under the + /// {TopicPrefix}/Probe/# + /// wildcard, breaking the contract that every payload under that + /// wildcard is a JSON document envelope. The cppagent reference + /// implementation + /// https://github.com/mtconnect/cppagent + /// publishes only JSON document envelopes under the Probe wildcard + /// and emits the agent availability state outside the wildcard + /// (typically on a dedicated agent / status topic). + /// + /// These tests pin the corrected shape: + /// {TopicPrefix}/Agent/{AgentUuid}/Available + /// so the Probe wildcard remains pure-JSON for any subscriber. + /// + [TestFixture] + public class AvailabilityTopicTests + { + [Test] + public void Build_returns_topic_outside_probe_wildcard() + { + // Arrange / Act + var topic = AvailabilityTopic.Build("MTConnect", "agent-uuid-1"); + + // Assert: never emit under the Probe/# wildcard. + Assert.That(topic, Does.Not.Contain("/Probe/"), + "MqttRelay availability topic must not fall under the Probe/# wildcard."); + } + + [Test] + public void Build_uses_dedicated_agent_segment() + { + // Arrange + const string topicPrefix = "MTConnect"; + const string agentUuid = "agent-uuid-1"; + + // Act + var topic = AvailabilityTopic.Build(topicPrefix, agentUuid); + + // Assert: shape matches the relocated contract. + Assert.That(topic, Is.EqualTo("MTConnect/Agent/agent-uuid-1/Available")); + } + + [Test] + public void Build_preserves_multi_segment_topic_prefix() + { + // Arrange + const string topicPrefix = "MTConnect/Document"; + const string agentUuid = "agent-uuid-2"; + + // Act + var topic = AvailabilityTopic.Build(topicPrefix, agentUuid); + + // Assert + Assert.That(topic, Is.EqualTo("MTConnect/Document/Agent/agent-uuid-2/Available")); + Assert.That(topic, Does.Not.Contain("/Probe/")); + } + + [Test] + public void Build_returns_null_when_topic_prefix_is_null() + { + Assert.That(AvailabilityTopic.Build(null, "agent-uuid-1"), Is.Null); + } + + [Test] + public void Build_returns_null_when_topic_prefix_is_empty() + { + Assert.That(AvailabilityTopic.Build(string.Empty, "agent-uuid-1"), Is.Null); + } + + [Test] + public void Build_returns_null_when_agent_uuid_is_null() + { + Assert.That(AvailabilityTopic.Build("MTConnect", null), Is.Null); + } + + [Test] + public void Build_returns_null_when_agent_uuid_is_empty() + { + Assert.That(AvailabilityTopic.Build("MTConnect", string.Empty), Is.Null); + } + + [Test] + public void AvailableSegment_constant_pins_trailing_topic_segment() + { + Assert.That(AvailabilityTopic.AvailableSegment, Is.EqualTo("Available")); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicValidationTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicValidationTests.cs new file mode 100644 index 000000000..d4b2ca12f --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/AvailabilityTopicValidationTests.cs @@ -0,0 +1,78 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins MQTT topic input validation for + /// against the + /// MQTT 3.1.1 reserved-character rules. The MQTT specification (3.1.1 + /// section 4.7.1.1) reserves '+', '#', and the null character within + /// topic names; '/' is the topic separator and must not appear inside + /// a single segment such as a topicPrefix or agentUuid input. + /// + /// If a caller supplies a value containing any of those reserved + /// characters the resulting topic would be malformed (or would alter + /// the wildcard contract under which other subscribers operate), so + /// rejects the + /// input by returning null. + /// + /// Source reference: + /// MQTT 3.1.1, OASIS standard, section 4.7.1.1 (topic name and + /// topic filter format). + /// + [TestFixture] + public class AvailabilityTopicValidationTests + { + [TestCase("MTConnect+", "agent-uuid-1")] + [TestCase("MTConnect#", "agent-uuid-1")] + [TestCase("MTConnect\0", "agent-uuid-1")] + [TestCase("MTConnect", "agent+uuid")] + [TestCase("MTConnect", "agent#uuid")] + [TestCase("MTConnect", "agent\0uuid")] + public void Build_returns_null_when_reserved_character_present( + string topicPrefix, string agentUuid) + { + Assert.That( + AvailabilityTopic.Build(topicPrefix, agentUuid), + Is.Null, + $"Build({topicPrefix}, {agentUuid}) should reject MQTT-reserved characters."); + } + + [Test] + public void Build_rejects_slash_inside_agent_uuid_segment() + { + // The agentUuid is a single topic segment; embedding '/' + // would split it across multiple segments and break the + // {TopicPrefix}/Agent/{AgentUuid}/Available shape. + Assert.That( + AvailabilityTopic.Build("MTConnect", "agent/uuid"), + Is.Null); + } + + [Test] + public void Build_strips_leading_slash_from_topic_prefix() + { + // Optional canonicalisation: a leading '/' in topicPrefix + // would yield a topic beginning with '/', which is legal in + // MQTT 3.1.1 but produces a confusing empty leading + // segment. Strip it so the resulting topic stays canonical. + Assert.That( + AvailabilityTopic.Build("/MTConnect", "agent-1"), + Is.EqualTo("MTConnect/Agent/agent-1/Available")); + } + + [Test] + public void Build_strips_trailing_slash_from_topic_prefix() + { + // Optional canonicalisation: a trailing '/' would yield + // "MTConnect//Agent/agent-1/Available" with a stray empty + // segment. + Assert.That( + AvailabilityTopic.Build("MTConnect/", "agent-1"), + Is.EqualTo("MTConnect/Agent/agent-1/Available")); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequencePersisterTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequencePersisterTests.cs new file mode 100644 index 000000000..95b996a0a --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequencePersisterTests.cs @@ -0,0 +1,135 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MqttRelay last-sent-sequence persistence policy under + /// DurableRelay. Module.cs previously synchronously wrote to disk + /// from every successful observation publish in + /// AgentObservationAdded: + /// + /// File.WriteAllText(path, seq.ToString()); + /// + /// Under high-rate observation arrival that put a synchronous disk + /// write on every event-handler invocation, throttling the relay. + /// LastSentSequencePersister introduces an in-memory tracker plus + /// an explicit dirty bit; the caller flushes to disk on a timer, + /// on shutdown, and after every batch boundary, instead of after + /// every observation. + /// + [TestFixture] + public class LastSentSequencePersisterTests + { + [Test] + public void Update_marks_dirty_and_round_trips_through_read() + { + var persister = new LastSentSequencePersister(); + + persister.Update(42UL); + + Assert.That(persister.Read(), Is.EqualTo(42UL)); + Assert.That(persister.IsDirty, Is.True, + "An in-memory update must mark the persister dirty so the caller knows to flush."); + } + + [Test] + public void Read_does_not_clear_dirty_bit() + { + var persister = new LastSentSequencePersister(); + persister.Update(7UL); + + _ = persister.Read(); + + Assert.That(persister.IsDirty, Is.True, + "Read must not clear dirty: only Flush establishes durable persistence."); + } + + [Test] + public void TryFlush_writes_only_when_dirty_and_clears_dirty_on_success() + { + var persister = new LastSentSequencePersister(); + persister.Update(99UL); + + ulong? written = null; + var flushed = persister.TryFlush(seq => written = seq); + + Assert.That(flushed, Is.True, + "TryFlush returns true when a write was actually emitted."); + Assert.That(written, Is.EqualTo(99UL)); + Assert.That(persister.IsDirty, Is.False, + "After a successful flush the persister must be clean so the next timer tick does not redundantly write."); + } + + [Test] + public void TryFlush_no_ops_when_not_dirty() + { + var persister = new LastSentSequencePersister(); + // Never updated; persister is clean. + var written = false; + var flushed = persister.TryFlush(_ => written = true); + + Assert.That(flushed, Is.False); + Assert.That(written, Is.False, + "A clean persister must not invoke the writer; that would burn IOPS for no progress."); + } + + [Test] + public void TryFlush_keeps_dirty_when_writer_throws() + { + // A failing write must not clear dirty: the caller wants + // the next timer tick to retry. + var persister = new LastSentSequencePersister(); + persister.Update(123UL); + + Assert.Throws( + () => persister.TryFlush(_ => throw new System.IO.IOException("disk full"))); + + Assert.That(persister.IsDirty, Is.True, + "A failed write must leave the persister dirty so the next flush retries."); + Assert.That(persister.Read(), Is.EqualTo(123UL)); + } + + [Test] + public void Update_is_a_last_write_wins_overwrite() + { + var persister = new LastSentSequencePersister(); + persister.Update(100UL); + persister.Update(50UL); + + Assert.That(persister.Read(), Is.EqualTo(50UL), + "Last write wins; the persister is not a max-watermark."); + } + + [Test] + public void Initialize_seeds_value_without_marking_dirty() + { + // Used at startup to load the persisted last-sent-sequence + // from disk: the in-memory value must reflect the on-disk + // value but the persister must be clean (no flush needed + // until a real update arrives). + var persister = new LastSentSequencePersister(); + persister.Initialize(500UL); + + Assert.That(persister.Read(), Is.EqualTo(500UL)); + Assert.That(persister.IsDirty, Is.False, + "Initialize seeds the in-memory value from disk and must not request a redundant flush."); + } + + [Test] + public void TryFlush_no_ops_when_writer_is_null() + { + var persister = new LastSentSequencePersister(); + persister.Update(7UL); + + // A null writer means the caller has not wired persistence + // (e.g. DurableRelay disabled at runtime); the persister + // must not throw. + Assert.DoesNotThrow(() => persister.TryFlush(null)); + // Dirty bit unchanged because no write happened. + Assert.That(persister.IsDirty, Is.True); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequenceTrackerTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequenceTrackerTests.cs new file mode 100644 index 000000000..ea2f4cc5c --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/LastSentSequenceTrackerTests.cs @@ -0,0 +1,118 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the atomic-access policy for the MqttRelay relay-progress + /// counter. Module.cs previously read and wrote + /// _lastSentSequence (a 64-bit field) without + /// , which on 32-bit hosts is not an + /// atomic operation: two 32-bit halves are read or written + /// independently and a concurrent reader can observe a torn value. + /// + /// MqttRelay reads _lastSentSequence from observation event + /// handlers (multiple ThreadPool threads) while the durable-relay + /// Worker writes it. A torn read could log a wildly wrong "unsent" + /// figure and, worse, be propagated to the persisted last-sent + /// sequence file, causing buffered observations to be skipped or + /// re-sent on the next reconnect. + /// + /// LastSentSequenceTracker centralises the atomic read/write so the + /// policy is unit-testable and so any future caller cannot + /// regress to a torn-read pattern. + /// + [TestFixture] + public class LastSentSequenceTrackerTests + { + [Test] + public void Read_returns_zero_on_fresh_tracker() + { + var tracker = new LastSentSequenceTracker(); + Assert.That(tracker.Read(), Is.EqualTo(0UL)); + } + + [Test] + public void Write_round_trips_through_read() + { + var tracker = new LastSentSequenceTracker(); + tracker.Write(42UL); + Assert.That(tracker.Read(), Is.EqualTo(42UL)); + } + + [Test] + public void Write_supports_full_ulong_range_round_trip() + { + // The broker sequence is unsigned; the tracker must round + // trip values whose bit pattern is negative when reinterpreted + // as a signed long (Interlocked operates on long internally). + var tracker = new LastSentSequenceTracker(); + tracker.Write(ulong.MaxValue); + Assert.That(tracker.Read(), Is.EqualTo(ulong.MaxValue)); + } + + [Test] + public void Write_overwrites_previous_value() + { + var tracker = new LastSentSequenceTracker(); + tracker.Write(100UL); + tracker.Write(50UL); + Assert.That(tracker.Read(), Is.EqualTo(50UL), + "Last write wins; the tracker is not a max-watermark."); + } + + [Test] + public void Concurrent_writes_and_reads_observe_only_written_values() + { + // Smoke-test for non-atomic torn read: launch a writer + // ramping through the high ulong range and a reader + // sampling the value; the reader must never observe a + // value that was never written. With Interlocked the + // assertion is trivially true; with naive long field the + // assertion would fail on 32-bit hosts. + var tracker = new LastSentSequenceTracker(); + const int iterations = 100_000; + var startGate = new System.Threading.ManualResetEventSlim(false); + + var writer = Task.Run(() => + { + startGate.Wait(); + for (int i = 1; i <= iterations; i++) + { + tracker.Write((ulong)i | 0xFFFFFFFF00000000UL); + } + }); + + var reader = Task.Run(() => + { + startGate.Wait(); + while (!writer.IsCompleted) + { + var v = tracker.Read(); + if (v != 0UL) + { + var low = v & 0x00000000FFFFFFFFUL; + var high = v & 0xFFFFFFFF00000000UL; + // The high half is the canary: the writer + // never sets a value with high half != the + // sentinel, so observing anything else is a + // torn read. + Assert.That(high, Is.EqualTo(0xFFFFFFFF00000000UL), + "Torn read: high half not as written."); + Assert.That(low, Is.LessThanOrEqualTo((ulong)iterations)); + } + } + }); + + startGate.Set(); + Task.WaitAll(writer, reader); + + // Final state must be the writer's last value. + Assert.That(tracker.Read(), Is.EqualTo((ulong)iterations | 0xFFFFFFFF00000000UL)); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleDisconnectTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleDisconnectTests.cs new file mode 100644 index 000000000..44c95d47e --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleDisconnectTests.cs @@ -0,0 +1,108 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MqttRelay module disconnect-on-shutdown policy. The + /// previous OnStop() implementation called + /// _mqttClient.DisconnectAsync(...) as a fire-and-forget + /// task: the returned was never awaited and any + /// fault on the disconnect path was silently lost. That hid broker + /// errors at shutdown (so an admin diagnosing a hung shutdown could + /// not see the real cause) and risked the host process exiting + /// before the disconnect actually completed. + /// + /// The lifecycle helper now exposes a bounded await with a + /// fault-logging continuation. The policy: + /// + /// * Awaits the disconnect, but bounds the wait so a misbehaving + /// broker cannot hang shutdown forever. + /// * Surfaces a fault to a logging callback so the operator gets a + /// diagnostic instead of a silently-dropped exception. + /// * Treats the timeout itself as success (best-effort shutdown). + /// + [TestFixture] + public class MqttRelayLifecycleDisconnectTests + { + [Test] + public void DisconnectWithTimeout_returns_when_disconnect_completes() + { + // The disconnect task completes promptly; the helper must + // not block the shutdown thread or invoke the fault log. + string loggedFault = null; + + MqttRelayLifecycle.DisconnectWithTimeout( + disconnect: () => Task.CompletedTask, + timeout: TimeSpan.FromSeconds(1), + onFault: ex => loggedFault = ex.Message); + + Assert.That(loggedFault, Is.Null, + "Successful disconnect must not invoke the fault logger."); + } + + [Test] + public void DisconnectWithTimeout_logs_fault_when_disconnect_throws() + { + string loggedFault = null; + + MqttRelayLifecycle.DisconnectWithTimeout( + disconnect: () => Task.FromException(new InvalidOperationException("broker rejected")), + timeout: TimeSpan.FromSeconds(1), + onFault: ex => loggedFault = ex.Message); + + Assert.That(loggedFault, Is.EqualTo("broker rejected"), + "A faulted disconnect Task must surface its exception to the logger."); + } + + [Test] + public void DisconnectWithTimeout_returns_after_timeout_when_disconnect_hangs() + { + // A misbehaving broker that never completes the disconnect + // must not hang shutdown indefinitely; the helper bounds the + // wait and treats the timeout as best-effort success. + var sw = System.Diagnostics.Stopwatch.StartNew(); + + MqttRelayLifecycle.DisconnectWithTimeout( + disconnect: () => new TaskCompletionSource().Task, + timeout: TimeSpan.FromMilliseconds(50), + onFault: _ => { }); + + sw.Stop(); + Assert.That(sw.Elapsed, Is.LessThan(TimeSpan.FromSeconds(2)), + "Hung disconnect must bail out near the configured timeout."); + } + + [Test] + public void DisconnectWithTimeout_does_not_throw_when_disconnect_factory_throws_synchronously() + { + // A factory that throws before returning a Task represents a + // misbehaving client; the helper must catch synchronously + // and route the exception to the fault logger. + string loggedFault = null; + + Assert.DoesNotThrow(() => MqttRelayLifecycle.DisconnectWithTimeout( + disconnect: () => throw new InvalidOperationException("sync throw"), + timeout: TimeSpan.FromSeconds(1), + onFault: ex => loggedFault = ex.Message)); + + Assert.That(loggedFault, Is.EqualTo("sync throw")); + } + + [Test] + public void DisconnectWithTimeout_no_ops_when_disconnect_factory_is_null() + { + // The shutdown path must tolerate a null disconnect factory + // (for example when _mqttClient is null because the worker + // never ran). + Assert.DoesNotThrow(() => MqttRelayLifecycle.DisconnectWithTimeout( + disconnect: null, + timeout: TimeSpan.FromSeconds(1), + onFault: _ => { })); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleStopTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleStopTests.cs new file mode 100644 index 000000000..d0c2af405 --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/MqttRelayLifecycleStopTests.cs @@ -0,0 +1,90 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MqttRelay module shutdown policy. Prior to this guard + /// the module's OnStop() unconditionally invoked + /// _documentServer.Stop(). When the module was configured + /// with TopicStructure=Entity only _entityServer was + /// constructed, so _documentServer was null and the + /// invocation threw a + /// during agent shutdown. The lifecycle helper centralises the + /// null-guard so the policy is unit-testable without standing up an + /// MTConnect agent broker. + /// + /// Background: the bug was first observed when an Entity-mode relay + /// was stopped as part of a graceful agent shutdown and the NRE + /// surfaced in the host service event log, masking the real shutdown + /// reason. + /// + [TestFixture] + public class MqttRelayLifecycleStopTests + { + [Test] + public void StopServers_does_not_throw_when_both_servers_null() + { + // Entity-mode plus a constructor that did not initialise + // either server is the worst case; the helper must be a + // total function over (null, null). + Assert.DoesNotThrow( + () => MqttRelayLifecycle.StopServers(documentStop: null, entityStop: null)); + } + + [Test] + public void StopServers_invokes_document_stop_when_provided() + { + var documentStopped = false; + MqttRelayLifecycle.StopServers( + documentStop: () => documentStopped = true, + entityStop: null); + + Assert.That(documentStopped, Is.True, + "Document-mode shutdown must invoke the document-server stop action."); + } + + [Test] + public void StopServers_invokes_entity_stop_when_provided() + { + var entityStopped = false; + MqttRelayLifecycle.StopServers( + documentStop: null, + entityStop: () => entityStopped = true); + + Assert.That(entityStopped, Is.True, + "Entity-mode shutdown must invoke the entity-server stop action."); + } + + [Test] + public void StopServers_invokes_both_when_both_provided() + { + var documentStopped = false; + var entityStopped = false; + + MqttRelayLifecycle.StopServers( + documentStop: () => documentStopped = true, + entityStop: () => entityStopped = true); + + Assert.That(documentStopped, Is.True); + Assert.That(entityStopped, Is.True); + } + + [Test] + public void StopServers_swallows_document_stop_exception_and_runs_entity_stop() + { + // A throwing document-server stop must not prevent the + // entity-server stop from running; otherwise a partial + // shutdown leaks live handlers. + var entityStopped = false; + + Assert.DoesNotThrow(() => MqttRelayLifecycle.StopServers( + documentStop: () => throw new System.InvalidOperationException("doc"), + entityStop: () => entityStopped = true)); + + Assert.That(entityStopped, Is.True); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/ObservationGrouperTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/ObservationGrouperTests.cs new file mode 100644 index 000000000..d411f4532 --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/ObservationGrouperTests.cs @@ -0,0 +1,179 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Linq; +using MTConnect.Devices; +using MTConnect.Observations; +using MTConnect.Observations.Output; +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MqttRelay PublishObservations grouping policy. The + /// previous implementation iterated the input enumerable up to + /// three times per distinct DataItemId: + /// + /// var dataItemIds = observations.Select(o => o.DataItemId).Distinct(); + /// foreach (var dataItemId in dataItemIds) + /// { + /// var dataItemObservations = observations.Where(o => o.DataItemId == dataItemId); + /// var dataItemObservation = dataItemObservations.FirstOrDefault(); + /// ... + /// } + /// + /// That is O(n*k) where n is the observation count and k is the + /// distinct-DataItemId count, plus repeated enumeration of an + /// IEnumerable that may be a deferred/expensive query. On large + /// agents (thousands of observations across hundreds of data + /// items) that materially slowed the relay catch-up after a + /// reconnect. + /// + /// ObservationGrouper.GroupByDataItem produces a single-pass + /// grouping: an IEnumerable of (DataItemId, observations) groups, + /// each iteration of the source happening exactly once. + /// + [TestFixture] + public class ObservationGrouperTests + { + [Test] + public void GroupByDataItem_returns_empty_when_input_null() + { + Assert.That(ObservationGrouper.GroupByDataItem(null), Is.Empty); + } + + [Test] + public void GroupByDataItem_returns_empty_when_input_empty() + { + Assert.That( + ObservationGrouper.GroupByDataItem(new List()), + Is.Empty); + } + + [Test] + public void GroupByDataItem_groups_observations_by_data_item_id() + { + var input = new List + { + Stub("A", DataItemCategory.SAMPLE, sequence: 1), + Stub("B", DataItemCategory.EVENT, sequence: 2), + Stub("A", DataItemCategory.SAMPLE, sequence: 3), + Stub("C", DataItemCategory.CONDITION, sequence: 4), + Stub("B", DataItemCategory.EVENT, sequence: 5), + }; + + var groups = ObservationGrouper.GroupByDataItem(input).ToList(); + + Assert.That(groups.Select(g => g.Key), Is.EquivalentTo(new[] { "A", "B", "C" })); + + var groupA = groups.Single(g => g.Key == "A").ToList(); + Assert.That(groupA.Select(o => o.Sequence), Is.EquivalentTo(new ulong[] { 1, 3 })); + + var groupB = groups.Single(g => g.Key == "B").ToList(); + Assert.That(groupB.Select(o => o.Sequence), Is.EquivalentTo(new ulong[] { 2, 5 })); + + var groupC = groups.Single(g => g.Key == "C").ToList(); + Assert.That(groupC.Select(o => o.Sequence), Is.EquivalentTo(new ulong[] { 4 })); + } + + [Test] + public void GroupByDataItem_iterates_source_exactly_once() + { + // Pins the perf-fix contract. The previous Module.cs path + // iterated the source up to three times (Distinct, Where, + // FirstOrDefault) per group. The grouping helper must take + // a single pass over the source. + var iterationCount = 0; + IEnumerable Source() + { + iterationCount++; + yield return Stub("A", DataItemCategory.SAMPLE, sequence: 1); + yield return Stub("B", DataItemCategory.EVENT, sequence: 2); + yield return Stub("A", DataItemCategory.SAMPLE, sequence: 3); + } + + var groups = ObservationGrouper.GroupByDataItem(Source()).ToList(); + // Force eager enumeration of every group. + foreach (var g in groups) _ = g.ToList(); + + Assert.That(iterationCount, Is.EqualTo(1), + "Grouping must iterate the source exactly once."); + Assert.That(groups, Has.Count.EqualTo(2)); + } + + [Test] + public void GroupByDataItem_preserves_first_seen_order_per_group() + { + // Useful for callers that rely on encounter order (the + // condition path) for sequence-monotonic publishing. + var input = new List + { + Stub("A", DataItemCategory.SAMPLE, sequence: 10), + Stub("A", DataItemCategory.SAMPLE, sequence: 11), + Stub("A", DataItemCategory.SAMPLE, sequence: 12), + }; + + var groupA = ObservationGrouper.GroupByDataItem(input).Single(); + Assert.That( + groupA.Select(o => o.Sequence).ToList(), + Is.EqualTo(new ulong[] { 10, 11, 12 })); + } + + [Test] + public void GroupByDataItem_handles_null_data_item_id_as_distinct_group() + { + // Defensive: an IObservationOutput stub with a null + // DataItemId should not crash the grouping; it should + // appear as its own (null-keyed) group so the caller can + // decide what to do with it. + var input = new List + { + Stub(null, DataItemCategory.SAMPLE, sequence: 1), + Stub("A", DataItemCategory.SAMPLE, sequence: 2), + Stub(null, DataItemCategory.SAMPLE, sequence: 3), + }; + + var groups = ObservationGrouper.GroupByDataItem(input).ToList(); + + Assert.That(groups, Has.Count.EqualTo(2)); + var nullGroup = groups.Single(g => g.Key == null).ToList(); + Assert.That(nullGroup.Select(o => o.Sequence), Is.EquivalentTo(new ulong[] { 1, 3 })); + } + + private static IObservationOutput Stub(string dataItemId, DataItemCategory category, ulong sequence) + { + return new ObservationOutputStub(dataItemId, category, sequence); + } + + private sealed class ObservationOutputStub : IObservationOutput + { + public ObservationOutputStub(string dataItemId, DataItemCategory category, ulong sequence) + { + DataItemId = dataItemId; + Category = category; + Sequence = sequence; + } + + public string DeviceUuid => "device-1"; + public IDataItem DataItem => null; + public string DataItemId { get; } + public DataItemCategory Category { get; } + public string Type => "TYPE"; + public string SubType => null; + public string Name => null; + public ulong InstanceId => 0UL; + public ulong Sequence { get; } + public DateTime Timestamp => DateTime.UtcNow; + public DateTimeOffset TimeZoneTimestamp => DateTimeOffset.UtcNow; + public string CompositionId => null; + public DataItemRepresentation Representation => DataItemRepresentation.VALUE; + public Quality Quality => Quality.VALID; + public bool Deprecated => false; + public bool Extended => false; + public ObservationValue[] Values => Array.Empty(); + public string GetValue(string valueKey) => null; + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/RelayBufferDiagnosticsTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/RelayBufferDiagnosticsTests.cs new file mode 100644 index 000000000..2c28d15aa --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/RelayBufferDiagnosticsTests.cs @@ -0,0 +1,71 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the relay-buffer "missed observations" diagnostic + /// computation. The previous Module implementation computed + /// long missed = (long)(to - lastSent); + /// using ulong arithmetic. When lastSent > to (a + /// degenerate but possible state, for example when a persisted + /// last-sent-sequence file is stale relative to a freshly-restarted + /// broker whose sequence has rolled), the unsigned subtraction + /// underflows and the cast to long produces a huge spurious + /// "missed" figure in the diagnostic log line. + /// + /// The corrected helper computes missed only when + /// lastSent <= to; otherwise it returns 0 so the + /// diagnostic stays meaningful and the operator does not see + /// nonsense numbers. + /// + [TestFixture] + public class RelayBufferDiagnosticsTests + { + [Test] + public void ComputeMissed_returns_zero_when_last_sent_above_to() + { + Assert.That( + RelayBufferDiagnostics.ComputeMissed(to: 5UL, lastSent: 10UL), + Is.Zero, + "Underflow guard: when lastSent > to, missed must be 0."); + } + + [Test] + public void ComputeMissed_returns_zero_when_last_sent_equals_to() + { + Assert.That( + RelayBufferDiagnostics.ComputeMissed(to: 5UL, lastSent: 5UL), + Is.Zero); + } + + [Test] + public void ComputeMissed_returns_difference_when_last_sent_below_to() + { + Assert.That( + RelayBufferDiagnostics.ComputeMissed(to: 100UL, lastSent: 25UL), + Is.EqualTo(75L)); + } + + [Test] + public void ComputeMissed_handles_zero_last_sent() + { + Assert.That( + RelayBufferDiagnostics.ComputeMissed(to: 100UL, lastSent: 0UL), + Is.EqualTo(100L)); + } + + [Test] + public void ComputeMissed_round_trips_long_max_value_difference() + { + // A 63-bit-fitting positive difference must round-trip + // through the long return type without sign loss. + ulong to = (ulong)long.MaxValue; + Assert.That( + RelayBufferDiagnostics.ComputeMissed(to, lastSent: 0UL), + Is.EqualTo(long.MaxValue)); + } + } +} diff --git a/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/WorkerLoopExceptionLoggerTests.cs b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/WorkerLoopExceptionLoggerTests.cs new file mode 100644 index 000000000..dcd4c0c04 --- /dev/null +++ b/tests/MTConnect.NET-AgentModule-MqttRelay-Tests/WorkerLoopExceptionLoggerTests.cs @@ -0,0 +1,116 @@ +// Copyright (c) 2024 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using System; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace MTConnect.AgentModule.MqttRelay.Tests +{ + /// + /// Pins the MqttRelay Worker outer-catch logging policy. The + /// Worker do/while loop in Module.cs previously had: + /// + /// catch (TaskCanceledException) { } + /// catch (Exception) { } + /// + /// The bare empty catch on the outer loop swallowed any unexpected + /// exception escaping the inner try/catch (for example a throw + /// inside the inner finally block or an oversight in connection + /// handling). The operator saw nothing, the relay quietly entered + /// the reconnect-delay branch, and the underlying defect went + /// undiagnosed for the lifetime of the agent. + /// + /// WorkerLoopExceptionLogger encodes the policy: + /// + /// * TaskCanceledException is the orderly-shutdown signal; do not + /// log it (would spam the operator on every stop). + /// * Any other exception is genuinely unexpected at this scope; + /// log it at Warning so the underlying defect is visible. + /// + [TestFixture] + public class WorkerLoopExceptionLoggerTests + { + [Test] + public void Log_skips_TaskCanceledException() + { + var logged = false; + + WorkerLoopExceptionLogger.Log( + exception: new TaskCanceledException(), + onLog: _ => logged = true); + + Assert.That(logged, Is.False, + "TaskCanceledException is the orderly-shutdown signal; do not spam the log on every stop."); + } + + [Test] + public void Log_writes_unexpected_exception_to_callback() + { + string logged = null; + + WorkerLoopExceptionLogger.Log( + exception: new InvalidOperationException("boom"), + onLog: msg => logged = msg); + + Assert.That(logged, Is.Not.Null); + Assert.That(logged, Does.Contain("boom"), + "The unexpected-exception message must include the exception text so the operator can diagnose the defect."); + } + + [Test] + public void Log_includes_exception_type_name() + { + string logged = null; + + WorkerLoopExceptionLogger.Log( + exception: new InvalidOperationException("boom"), + onLog: msg => logged = msg); + + Assert.That(logged, Does.Contain(nameof(InvalidOperationException)), + "Type name aids in classifying the defect from log scrapes."); + } + + [Test] + public void Log_no_ops_when_exception_is_null() + { + var logged = false; + + WorkerLoopExceptionLogger.Log( + exception: null, + onLog: _ => logged = true); + + Assert.That(logged, Is.False, + "A null exception cannot be logged usefully; do not invoke the callback."); + } + + [Test] + public void Log_no_ops_when_callback_is_null() + { + // Defensive: the helper must not throw when the logger is + // not wired (would defeat the purpose of catching the + // unexpected exception). + Assert.DoesNotThrow(() => WorkerLoopExceptionLogger.Log( + exception: new InvalidOperationException("boom"), + onLog: null)); + } + + [Test] + public void Log_treats_subclass_of_TaskCanceledException_as_cancellation() + { + // OperationCanceledException is the parent type; a + // cancelation token expiring throws TaskCanceledException + // (a subclass). Future runtime versions could extend the + // hierarchy; the policy must treat any cancelation as + // an orderly-shutdown signal. + var logged = false; + + WorkerLoopExceptionLogger.Log( + exception: new OperationCanceledException(), + onLog: _ => logged = true); + + Assert.That(logged, Is.False, + "OperationCanceledException is also an orderly-shutdown signal; do not log."); + } + } +}