Skip to content
Merged
4 changes: 3 additions & 1 deletion docs/configure/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ client.Start();

`CurrentReceived` fires once on stream initialization—`Start()` drives the `/sample` long-poll, not periodic `/current` polling. For periodic snapshots, call `GetCurrent()` directly on a timer; for the streaming case, subscribe to `SampleReceived`.

The client handles reconnects on the consumer's behalf; the `ConnectionError` event fires on transport failures and the client backs off and retries. Parsing or dispatch failures surface through `InternalError`.
The client also exposes the post-probe device model. `DeviceReceived` fires once per parsed device for every `/probe` response, carrying the fully wired [`IDevice`](/api/MTConnect.Devices.IDevice) instance with its DataItems' `Container` and `Device` back-pointers set. The `Devices` property returns a read-only snapshot of every device the most recent probe yielded, keyed by UUID; entries are replaced wholesale on each probe and devices the agent no longer advertises are evicted at the start of the next probe. Both surfaces were added per [issue #176](https://github.com/TrakHound/MTConnect.NET/issues/176).

The client handles reconnects on the consumer's behalf; the `ConnectionError` event fires on transport failures and the client backs off and retries. Parsing or dispatch failures surface through `InternalError`. A `DeviceReceived` handler that throws is isolated: the exception is routed through `InternalError` and the cache fill plus the rest of the per-device fan-out continue normally.

## MQTT—the cppagent-parity broker / relay tree

Expand Down
30 changes: 29 additions & 1 deletion docs/examples/client-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,35 @@ client.CurrentReceived += (s, response) => { /* iterate response.Streams */ };
client.Start();
```

`MTConnectHttpClient` wraps the agent's HTTP endpoints (`/probe`, `/current`, `/sample`, `/asset`) and exposes them as a long-polling subscription with event hooks. The `Interval` property is the consumer-side sample interval — the client requests `/sample?interval=100` and receives each sequence batch through `CurrentReceived` (or `SampleReceived` if you wire that hook up instead).
`MTConnectHttpClient` wraps the agent's HTTP endpoints (`/probe`, `/current`, `/sample`, `/asset`) and exposes them as a long-polling subscription with event hooks. The `Interval` property is the consumer-side sample interval—the client requests `/sample?interval=100` and receives each sequence batch through `CurrentReceived` (or `SampleReceived` if you wire that hook up instead).

### Subscribing to the wired device model

The client maintains a wired snapshot of every device it sees from `/probe` responses, keyed by UUID. Subscribe to `DeviceReceived` to react per-device while the snapshot is filling, and read the `Devices` accessor at any point to consult the post-probe model:

```csharp
client.DeviceReceived += (s, device) =>
{
Console.WriteLine($"Device Received : {device.Uuid} : {device.Name}");
foreach (var dataItem in device.GetDataItems())
{
// Container + Device back-pointers are wired in by the parser.
Console.WriteLine($" {dataItem.Id} : {dataItem.Type} : container={dataItem.Container?.Id}");
}
};

client.Start();

// At any point after the first probe completes:
foreach (var (uuid, device) in client.Devices)
{
Console.WriteLine($"Cached device : {uuid} : {device.Name}");
}
```

`DeviceReceived` fires once per parsed device for every Probe response, carrying the fully wired `IDevice` instance—the same one the `Devices` snapshot accessor returns for that UUID—with the agent's `InstanceId` stamped on each DataItem. A handler that throws is isolated by the client: the exception is forwarded through `InternalError` and the cache fill continues. `Devices` returns a fresh read-only snapshot on every read; cache the reference if you read repeatedly between probes. Devices that disappear between probes are evicted at the start of the next probe, so the snapshot never carries entries the agent no longer advertises.

The `Devices` accessor and the `DeviceReceived` fan-out were added per [issue #176](https://github.com/TrakHound/MTConnect.NET/issues/176) (approved 2026-06-01 by @PatrickRitchie).

### Per-observation validation

Expand Down
106 changes: 97 additions & 9 deletions libraries/MTConnect.NET-HTTP/Clients/MTConnectHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using MTConnect.Streams;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -179,12 +180,66 @@ private void Init()
/// </summary>
public bool UseStreaming { get; set; }

/// <summary>
/// A snapshot of the device model received from the most recent Probe response,
/// keyed by device UUID. The dictionary is empty until the first probe succeeds.
/// Each call returns a fresh snapshot reflecting the most recently completed probe;
/// previously returned snapshots are unaffected by subsequent probes, and devices
/// that disappear between probes are evicted at the start of the next probe so the
/// snapshot never carries entries the agent no longer advertises. Each
/// <see cref="IDevice"/>'s DataItems carry the fully wired
/// <see cref="IDataItem.Container"/> and <see cref="IDataItem.Device"/>
/// back-pointers set during parsing, so consumers can walk the component ancestry
/// of any DataItem without re-parsing the document.
/// </summary>
/// <remarks>
/// <para>
/// The dictionary returned by each call is a fresh allocation independent of the
/// cache, wrapped in a <see cref="ReadOnlyDictionary{TKey, TValue}"/>
/// so consumers cannot mutate the snapshot through a downcast. The <see cref="IDevice"/>
/// values within are shared references to cached instances that the client replaces
/// wholesale on each probe. The accessor acquires the client's internal lock, so
/// callers may enumerate the snapshot without synchronizing against the worker thread
/// that processes subsequent probes.
/// </para>
/// <para>
/// Each access allocates a fresh dictionary. Cache the returned reference if you
/// read repeatedly between probes.
/// </para>
/// <para>
/// Empty probe responses (where the agent answers with no devices) do not clear the
/// cache; the previous snapshot stays presented until the next non-empty probe.
/// </para>
/// </remarks>
public IReadOnlyDictionary<string, IDevice> Devices
{
get
{
lock (_lock)
{
return new ReadOnlyDictionary<string, IDevice>(
new Dictionary<string, IDevice>(_devices));
}
}
}


#region "Events"

/// <summary>
/// Raised when a Device is received
/// Raised once per parsed device for every Probe response the client receives,
/// carrying the fully wired <see cref="IDevice"/> instance—the same instance the
/// <see cref="Devices"/> snapshot accessor would return for that UUID—with its
/// DataItems' <see cref="IDataItem.Container"/> and <see cref="IDataItem.Device"/>
/// back-pointers set, and the agent's <c>InstanceId</c> stamped on each DataItem.
/// </summary>
/// <remarks>
/// Handlers fire in document order while the cache is still being populated.
/// Subscribe to <see cref="ProbeReceived"/> to receive notification after the full
/// probe response has been processed. A handler that throws is isolated by the
/// client: the exception is forwarded through <see cref="InternalError"/> and the
/// cache fill plus subsequent fan-out continue normally.
/// </remarks>
public event EventHandler<IDevice> DeviceReceived;

/// <summary>
Expand Down Expand Up @@ -828,14 +883,15 @@ private void ProcessProbeDocument(IDevicesResponseDocument document)
{
if (document != null && !document.Devices.IsNullOrEmpty())
{
// Clear Cached DataItems and Components
// Clear cached DataItems, Components, and the device snapshot so devices
// the agent no longer advertises are evicted before the new probe is loaded.
lock (_lock)
{
_cachedComponents.Clear();
_cachedDataItems.Clear();
_devices.Clear();
}

var outputDevices = new List<IDevice>();
foreach (var device in document.Devices)
{
var outputDevice = ProcessDevice(document.Header, device);
Expand All @@ -846,18 +902,48 @@ private void ProcessProbeDocument(IDevicesResponseDocument document)
_devices.Remove(outputDevice.Uuid);
_devices.Add(outputDevice.Uuid, outputDevice);
}
}

foreach (var outputDevice in outputDevices)
{
DeviceReceived?.Invoke(this, outputDevice);
// Fire per-device inside the populate loop so the cache and event stay in lockstep.
// Isolate subscriber exceptions per delegate so one bad handler cannot abort the
// populate loop, suppress ProbeReceived, or short-circuit later subscribers in the
// invocation list; route each fault through InternalError instead.
RaiseDeviceReceived(outputDevice);
}

// Raise ProbeReceived Event
ProbeReceived?.Invoke(this, document);
}
}

// Iterate the invocation list so one throwing subscriber cannot short-circuit the
// multicast and starve later subscribers. Each fault is forwarded through
// InternalError; if InternalError itself faults, swallow that secondary fault so the
// populate loop and remaining DeviceReceived subscribers still get every device.
private void RaiseDeviceReceived(IDevice device)
{
var handler = DeviceReceived;
if (handler == null) return;

foreach (var subscriber in handler.GetInvocationList())
{
try
{
((EventHandler<IDevice>)subscriber).Invoke(this, device);
}
catch (Exception ex)
{
try
{
InternalError?.Invoke(this, ex);
}
catch
{
// A faulting InternalError handler must not break DeviceReceived fan-out.
}
}
}
}

private void ProcessCurrentDocument(IStreamsResponseDocument document, CancellationToken cancel)
{
_lastResponse = UnixDateTime.Now;
Expand Down Expand Up @@ -1092,7 +1178,8 @@ private IComponentStream ProcessComponentStream(IMTConnectStreamsHeader header,
outputComponentStream.NativeName = inputComponentStream.NativeName;
outputComponentStream.Uuid = inputComponentStream.Uuid;

if (inputComponentStream.ComponentType == Agent.TypeId || inputComponentStream.ComponentType == Devices.Device.TypeId)
// Fully-qualified to disambiguate from the Devices property below; the namespace using above is shadowed inside this class.
if (inputComponentStream.ComponentType == Agent.TypeId || inputComponentStream.ComponentType == MTConnect.Devices.Device.TypeId)
{
outputComponentStream.Component = GetCachedDevice(deviceUuid);
}
Expand Down Expand Up @@ -1148,7 +1235,8 @@ private async void CheckAssetChanged(IEnumerable<IObservation> observations, Can
{
if (observations != null && observations.Count() > 0)
{
var assetsChanged = observations.Where(o => o.Type.ToUnderscoreUpper() == Devices.DataItems.AssetChangedDataItem.TypeId);
// Fully-qualified to disambiguate from the Devices property below; the namespace using above is shadowed inside this class.
var assetsChanged = observations.Where(o => o.Type.ToUnderscoreUpper() == MTConnect.Devices.DataItems.AssetChangedDataItem.TypeId);
if (assetsChanged != null)
{
foreach (var assetChanged in assetsChanged)
Expand Down
Loading
Loading