Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
20ede1c
docs(testing): seed issue-135 writeup skeleton
ottobolyos Apr 25, 2026
644b2e5
docs(testing): scope mqtt-relay availability topic defect
ottobolyos Apr 25, 2026
7951780
refactor(agent-module): extract availability topic helper in MqttRelay
ottobolyos Apr 25, 2026
0769b25
test(agent-module): add red tests for MqttRelay availability topic
ottobolyos Apr 25, 2026
308829b
fix(agent-module): correct availability topic emission in MqttRelay
ottobolyos Apr 25, 2026
a7d1677
test(agent-module): pin MqttRelay availability topic regression
ottobolyos Apr 25, 2026
6b5abba
docs(testing): document issue-135 phases 02 through 06
ottobolyos Apr 25, 2026
420a768
docs(testing): scrub internal path reference from phase 04 writeup
ottobolyos Apr 25, 2026
af89c33
chore(docs): remove internal planning leak from committed tree
ottobolyos Apr 27, 2026
2e3abe6
test(agent-module): pin MqttRelay AvailabilityTopic input validation
ottobolyos Apr 27, 2026
49e021f
fix(agent-module): validate MqttRelay AvailabilityTopic inputs
ottobolyos Apr 27, 2026
4d253fa
test(agent-module): pin MqttRelay missed-observation underflow guard
ottobolyos Apr 27, 2026
e60c4be
fix(agent-module): guard MqttRelay missed-observation underflow
ottobolyos Apr 27, 2026
9a6605c
docs(agent-module): document MqttRelay availability topic move
ottobolyos Apr 27, 2026
d62a4e3
test(agent-module): pin MqttRelay shutdown null-guard policy
ottobolyos Apr 27, 2026
a839b41
fix(agent-module): guard MqttRelay OnStop NRE in Entity mode
ottobolyos Apr 27, 2026
e110a09
test(agent-module): pin MqttRelay shutdown disconnect-await policy
ottobolyos Apr 27, 2026
6251d52
fix(agent-module): bound MqttRelay shutdown disconnect await
ottobolyos Apr 27, 2026
1eeaacf
test(agent-module): pin MqttRelay last-sent-sequence atomic policy
ottobolyos Apr 27, 2026
4b5a860
fix(agent-module): atomic 64-bit access for MqttRelay last-sent count
ottobolyos Apr 27, 2026
89561da
test(agent-module): pin MqttRelay async-void handler safety policy
ottobolyos Apr 27, 2026
369d7dc
fix(agent-module): wrap MqttRelay async-void handlers with fault log
ottobolyos Apr 27, 2026
efd006c
test(agent-module): pin MqttRelay Worker outer-catch logging policy
ottobolyos Apr 27, 2026
9b8c054
test(agent-module): stabilise LastSentSequenceTracker concurrent smoke
ottobolyos Apr 27, 2026
c96f5fd
fix(agent-module): log unexpected MqttRelay Worker faults
ottobolyos Apr 27, 2026
8350cc8
test(agent-module): pin MqttRelay observation grouping single-pass
ottobolyos Apr 27, 2026
d9fd5c8
fix(agent-module): single-pass observation grouping in MqttRelay
ottobolyos Apr 27, 2026
ce0fd18
test(agent-module): pin MqttRelay last-sent-sequence persister policy
ottobolyos Apr 27, 2026
a1f722e
fix(agent-module): in-memory MqttRelay last-sent-seq with timed flush
ottobolyos Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using System;
using System.Threading.Tasks;

namespace MTConnect
{
/// <summary>
/// Centralises the MqttRelay agent-module <c>async void</c>
/// event-handler safety policy. An <c>async void</c> method that
/// throws routes its exception to the synchronization context,
/// which on the ThreadPool tears down the host process. The
/// MqttRelay handlers run on broker-raised events, so without a
/// top-level guard a formatter throw (DataItem null-deref) or a
/// synchronous broker call (broker shutting down) crashed the
/// agent.
///
/// Wrapping every handler body in <see cref="Run"/> guarantees that
/// any exception, synchronous or async, is captured and routed to
/// a logging callback rather than escaping to the synchronization
/// context. The guard is also unit-testable in isolation, so the
/// policy is pinned away from the (hard-to-mock) Module class.
/// </summary>
public static class AsyncVoidGuard
{
/// <summary>
/// Awaits the supplied handler body and routes any exception
/// to the logging callback. Never rethrows: the contract is
/// "an async-void path must not crash the host". A null body
/// is a no-op and a null or throwing logger is tolerated.
/// </summary>
/// <param name="body">
/// The handler implementation. May be <c>null</c>.
/// </param>
/// <param name="onFault">
/// Logger callback invoked with any exception thrown
/// synchronously by <paramref name="body"/> or surfaced through
/// its <see cref="Task"/>. May be <c>null</c>.
/// </param>
public static async Task Run(Func<Task> body, Action<Exception> onFault)
{
if (body == null) return;

try
{
var task = body();
if (task != null) await task.ConfigureAwait(false);
}
catch (Exception ex)
{
if (onFault != null)
{
try { onFault(ex); } catch { }
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

namespace MTConnect
{
/// <summary>
/// Builds the MQTT topic on which the MqttRelay agent module
/// publishes the agent's Availability state (the MQTT Last Will and
/// Testament topic plus the on-connect retained Available message).
/// </summary>
public static class AvailabilityTopic
{
/// <summary>
/// Constant topic segment that separates the agent-availability
/// publishes from the document-envelope publishes (Probe /
/// Current / Sample / Asset). Subscribers wildcarding on
/// <c>{TopicPrefix}/Probe/#</c> therefore never receive the raw
/// availability payload.
/// </summary>
public const string AgentSegment = "Agent";

/// <summary>
/// Constant trailing topic segment that names the availability
/// publish.
/// </summary>
public const string AvailableSegment = "Available";

/// <summary>
/// Builds the full MQTT topic the MqttRelay module uses to
/// publish the agent's Availability state. Returns
/// <c>null</c> when either input is null or empty, or when
/// either input contains an MQTT-reserved character so callers
/// can short-circuit before configuring an MQTT publish.
/// </summary>
/// <param name="topicPrefix">
/// The configured <c>TopicPrefix</c> value, e.g.
/// <c>MTConnect</c> or <c>MTConnect/Document</c>. Leading and
/// trailing <c>/</c> separators are stripped so the resulting
/// topic stays canonical.
/// </param>
/// <param name="agentUuid">
/// The agent's <c>Uuid</c> identifier. The agentUuid is a
/// single topic segment so it must not contain <c>/</c>.
/// </param>
/// <remarks>
/// Per MQTT 3.1.1 OASIS standard section 4.7.1.1, MQTT topic
/// names must not contain the wildcard characters <c>+</c> or
/// <c>#</c> nor the null character <c>\0</c>; supplying any of
/// those characters in either input produces a <c>null</c>
/// return.
/// </remarks>
public static string Build(string topicPrefix, string agentUuid)
{
if (string.IsNullOrEmpty(topicPrefix)) return null;
if (string.IsNullOrEmpty(agentUuid)) return null;

// MQTT 3.1.1 §4.7.1.1: '+' and '#' are wildcard characters
// and the null character is reserved. None of them are
// legal inside a topic name.
if (ContainsReservedCharacter(topicPrefix)) return null;
if (ContainsReservedCharacter(agentUuid)) return null;

// The agentUuid is a single topic segment so it must not
// embed a '/'; otherwise the {TopicPrefix}/Agent/{AgentUuid}
// /Available shape silently fragments across additional
// segments.
if (agentUuid.IndexOf('/') >= 0) return null;

// Canonicalise the prefix: strip a leading or trailing '/'
// so the resulting topic does not begin with '/' nor
// contain a stray empty segment.
var canonicalPrefix = topicPrefix.Trim('/');
if (canonicalPrefix.Length == 0) return null;

return $"{canonicalPrefix}/{AgentSegment}/{agentUuid}/{AvailableSegment}";
}

private static bool ContainsReservedCharacter(string value)
{
for (var i = 0; i < value.Length; i++)
{
var c = value[i];
if (c == '+' || c == '#' || c == '\0') return true;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using System;
using System.Threading;

namespace MTConnect
{
/// <summary>
/// Tracks the MqttRelay last-sent observation sequence in memory
/// and flushes to durable storage only when explicitly asked.
/// Module.cs previously wrote to disk synchronously from every
/// successful observation publish; on a high-rate stream that put
/// a disk write on every ThreadPool callback. This persister
/// decouples the event-handler hot path from disk IO: handlers
/// call <see cref="Update"/>, and a timer / shutdown / batch
/// boundary calls <see cref="TryFlush"/>.
///
/// All read/write of the value field uses
/// <see cref="Interlocked"/> so the persister is safe to share
/// between event-handler threads and a flush timer thread on 32-
/// bit hosts where bare 64-bit access can tear.
/// </summary>
public sealed class LastSentSequencePersister
{
private long _value;
private int _dirty; // 0 = clean, 1 = dirty (Interlocked-managed)

/// <summary>
/// Returns the current in-memory sequence value with an
/// atomic 64-bit read. Does not clear the dirty bit; only
/// <see cref="TryFlush"/> establishes durable persistence.
/// </summary>
public ulong Read()
{
return unchecked((ulong)Interlocked.Read(ref _value));
}

/// <summary>
/// Whether the in-memory value has been modified since the
/// last successful flush. The flush timer can use this to
/// skip writes when the relay is idle.
/// </summary>
public bool IsDirty
{
get { return Interlocked.CompareExchange(ref _dirty, 0, 0) != 0; }
}

/// <summary>
/// Records a new last-sent sequence value (last write wins)
/// and marks the persister dirty so the next flush will
/// emit a write.
/// </summary>
public void Update(ulong value)
{
Interlocked.Exchange(ref _value, unchecked((long)value));
Interlocked.Exchange(ref _dirty, 1);
}

/// <summary>
/// Seeds the in-memory value from durable storage at startup
/// without marking the persister dirty. The initial value
/// already matches disk so a flush would be redundant.
/// </summary>
public void Initialize(ulong value)
{
Interlocked.Exchange(ref _value, unchecked((long)value));
Interlocked.Exchange(ref _dirty, 0);
}

/// <summary>
/// Emits a write through <paramref name="write"/> if and only
/// if the persister is dirty, then clears the dirty bit. If
/// the writer throws, the dirty bit is preserved and the
/// exception propagates so the caller can log and retry on
/// the next tick. Returns <c>true</c> when a write was
/// actually emitted.
/// </summary>
public bool TryFlush(Action<ulong> write)
{
if (write == null) return false;
if (Interlocked.CompareExchange(ref _dirty, 0, 0) == 0) return false;

var snapshot = Read();
// Clear dirty *after* a successful write so that a thrown
// writer leaves the persister dirty for the next attempt.
write(snapshot);
Interlocked.Exchange(ref _dirty, 0);
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using System.Threading;

namespace MTConnect
{
/// <summary>
/// Centralises atomic 64-bit access to the MqttRelay last-sent
/// sequence counter. Module.cs previously read and wrote a bare
/// <see cref="long"/> field. On 32-bit hosts a 64-bit field is read
/// and written as two 32-bit halves and a concurrent reader can
/// observe a torn value. MqttRelay reads the counter from
/// observation event handlers (multiple ThreadPool threads) while
/// the durable-relay Worker writes it, so the access pattern was
/// genuinely racy. The tracker wraps every read/write in
/// <see cref="Interlocked"/> primitives so the policy is enforced
/// uniformly and is unit-testable.
/// </summary>
public sealed class LastSentSequenceTracker
{
private long _value;

/// <summary>
/// Returns the current sequence value with an atomic 64-bit
/// read. Round-trips the full <see cref="ulong"/> range,
/// including values whose bit pattern is negative when
/// reinterpreted as a signed <see cref="long"/>.
/// </summary>
public ulong Read()
{
return unchecked((ulong)Interlocked.Read(ref _value));
}

/// <summary>
/// Sets the current sequence value with an atomic 64-bit write.
/// Last write wins; this is not a max-watermark.
/// </summary>
public void Write(ulong value)
{
Interlocked.Exchange(ref _value, unchecked((long)value));
}
}
}
Loading