diff --git a/docs/configure/consumer.md b/docs/configure/consumer.md index 8e6f9ce91..a07b8827f 100644 --- a/docs/configure/consumer.md +++ b/docs/configure/consumer.md @@ -82,7 +82,9 @@ client.Start(); `CurrentReceived` fires once on stream initialization—`Start()` drives the `/sample` long-poll, not periodic `/current` polling. For periodic snapshots, call `GetCurrent()` directly on a timer; for the streaming case, subscribe to `SampleReceived`. -The client handles reconnects on the consumer's behalf; the `ConnectionError` event fires on transport failures and the client backs off and retries. Parsing or dispatch failures surface through `InternalError`. +The client also exposes the post-probe device model. `DeviceReceived` fires once per parsed device for every `/probe` response, carrying the fully wired [`IDevice`](/api/MTConnect.Devices.IDevice) instance with its DataItems' `Container` and `Device` back-pointers set. The `Devices` property returns a read-only snapshot of every device the most recent probe yielded, keyed by UUID; entries are replaced wholesale on each probe and devices the agent no longer advertises are evicted at the start of the next probe. Both surfaces were added per [issue #176](https://github.com/TrakHound/MTConnect.NET/issues/176). + +The client handles reconnects on the consumer's behalf; the `ConnectionError` event fires on transport failures and the client backs off and retries. Parsing or dispatch failures surface through `InternalError`. A `DeviceReceived` handler that throws is isolated: the exception is routed through `InternalError` and the cache fill plus the rest of the per-device fan-out continue normally. ## MQTT—the cppagent-parity broker / relay tree diff --git a/docs/examples/client-http.md b/docs/examples/client-http.md index bfd1ebb21..9f1ddc152 100644 --- a/docs/examples/client-http.md +++ b/docs/examples/client-http.md @@ -50,7 +50,35 @@ client.CurrentReceived += (s, response) => { /* iterate response.Streams */ }; client.Start(); ``` -`MTConnectHttpClient` wraps the agent's HTTP endpoints (`/probe`, `/current`, `/sample`, `/asset`) and exposes them as a long-polling subscription with event hooks. The `Interval` property is the consumer-side sample interval — the client requests `/sample?interval=100` and receives each sequence batch through `CurrentReceived` (or `SampleReceived` if you wire that hook up instead). +`MTConnectHttpClient` wraps the agent's HTTP endpoints (`/probe`, `/current`, `/sample`, `/asset`) and exposes them as a long-polling subscription with event hooks. The `Interval` property is the consumer-side sample interval—the client requests `/sample?interval=100` and receives each sequence batch through `CurrentReceived` (or `SampleReceived` if you wire that hook up instead). + +### Subscribing to the wired device model + +The client maintains a wired snapshot of every device it sees from `/probe` responses, keyed by UUID. Subscribe to `DeviceReceived` to react per-device while the snapshot is filling, and read the `Devices` accessor at any point to consult the post-probe model: + +```csharp +client.DeviceReceived += (s, device) => +{ + Console.WriteLine($"Device Received : {device.Uuid} : {device.Name}"); + foreach (var dataItem in device.GetDataItems()) + { + // Container + Device back-pointers are wired in by the parser. + Console.WriteLine($" {dataItem.Id} : {dataItem.Type} : container={dataItem.Container?.Id}"); + } +}; + +client.Start(); + +// At any point after the first probe completes: +foreach (var (uuid, device) in client.Devices) +{ + Console.WriteLine($"Cached device : {uuid} : {device.Name}"); +} +``` + +`DeviceReceived` fires once per parsed device for every Probe response, carrying the fully wired `IDevice` instance—the same one the `Devices` snapshot accessor returns for that UUID—with the agent's `InstanceId` stamped on each DataItem. A handler that throws is isolated by the client: the exception is forwarded through `InternalError` and the cache fill continues. `Devices` returns a fresh read-only snapshot on every read; cache the reference if you read repeatedly between probes. Devices that disappear between probes are evicted at the start of the next probe, so the snapshot never carries entries the agent no longer advertises. + +The `Devices` accessor and the `DeviceReceived` fan-out were added per [issue #176](https://github.com/TrakHound/MTConnect.NET/issues/176) (approved 2026-06-01 by @PatrickRitchie). ### Per-observation validation diff --git a/libraries/MTConnect.NET-HTTP/Clients/MTConnectHttpClient.cs b/libraries/MTConnect.NET-HTTP/Clients/MTConnectHttpClient.cs index f3ec94e84..13a029cc9 100644 --- a/libraries/MTConnect.NET-HTTP/Clients/MTConnectHttpClient.cs +++ b/libraries/MTConnect.NET-HTTP/Clients/MTConnectHttpClient.cs @@ -11,6 +11,7 @@ using MTConnect.Streams; using System; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -179,12 +180,66 @@ private void Init() /// public bool UseStreaming { get; set; } + /// + /// A snapshot of the device model received from the most recent Probe response, + /// keyed by device UUID. The dictionary is empty until the first probe succeeds. + /// Each call returns a fresh snapshot reflecting the most recently completed probe; + /// previously returned snapshots are unaffected by subsequent probes, and devices + /// that disappear between probes are evicted at the start of the next probe so the + /// snapshot never carries entries the agent no longer advertises. Each + /// 's DataItems carry the fully wired + /// and + /// back-pointers set during parsing, so consumers can walk the component ancestry + /// of any DataItem without re-parsing the document. + /// + /// + /// + /// The dictionary returned by each call is a fresh allocation independent of the + /// cache, wrapped in a + /// so consumers cannot mutate the snapshot through a downcast. The + /// values within are shared references to cached instances that the client replaces + /// wholesale on each probe. The accessor acquires the client's internal lock, so + /// callers may enumerate the snapshot without synchronizing against the worker thread + /// that processes subsequent probes. + /// + /// + /// Each access allocates a fresh dictionary. Cache the returned reference if you + /// read repeatedly between probes. + /// + /// + /// Empty probe responses (where the agent answers with no devices) do not clear the + /// cache; the previous snapshot stays presented until the next non-empty probe. + /// + /// + public IReadOnlyDictionary Devices + { + get + { + lock (_lock) + { + return new ReadOnlyDictionary( + new Dictionary(_devices)); + } + } + } + #region "Events" /// - /// Raised when a Device is received + /// Raised once per parsed device for every Probe response the client receives, + /// carrying the fully wired instance—the same instance the + /// snapshot accessor would return for that UUID—with its + /// DataItems' and + /// back-pointers set, and the agent's InstanceId stamped on each DataItem. /// + /// + /// Handlers fire in document order while the cache is still being populated. + /// Subscribe to to receive notification after the full + /// probe response has been processed. A handler that throws is isolated by the + /// client: the exception is forwarded through and the + /// cache fill plus subsequent fan-out continue normally. + /// public event EventHandler DeviceReceived; /// @@ -828,14 +883,15 @@ private void ProcessProbeDocument(IDevicesResponseDocument document) { if (document != null && !document.Devices.IsNullOrEmpty()) { - // Clear Cached DataItems and Components + // Clear cached DataItems, Components, and the device snapshot so devices + // the agent no longer advertises are evicted before the new probe is loaded. lock (_lock) { _cachedComponents.Clear(); _cachedDataItems.Clear(); + _devices.Clear(); } - var outputDevices = new List(); foreach (var device in document.Devices) { var outputDevice = ProcessDevice(document.Header, device); @@ -846,11 +902,12 @@ private void ProcessProbeDocument(IDevicesResponseDocument document) _devices.Remove(outputDevice.Uuid); _devices.Add(outputDevice.Uuid, outputDevice); } - } - foreach (var outputDevice in outputDevices) - { - DeviceReceived?.Invoke(this, outputDevice); + // Fire per-device inside the populate loop so the cache and event stay in lockstep. + // Isolate subscriber exceptions per delegate so one bad handler cannot abort the + // populate loop, suppress ProbeReceived, or short-circuit later subscribers in the + // invocation list; route each fault through InternalError instead. + RaiseDeviceReceived(outputDevice); } // Raise ProbeReceived Event @@ -858,6 +915,35 @@ private void ProcessProbeDocument(IDevicesResponseDocument document) } } + // Iterate the invocation list so one throwing subscriber cannot short-circuit the + // multicast and starve later subscribers. Each fault is forwarded through + // InternalError; if InternalError itself faults, swallow that secondary fault so the + // populate loop and remaining DeviceReceived subscribers still get every device. + private void RaiseDeviceReceived(IDevice device) + { + var handler = DeviceReceived; + if (handler == null) return; + + foreach (var subscriber in handler.GetInvocationList()) + { + try + { + ((EventHandler)subscriber).Invoke(this, device); + } + catch (Exception ex) + { + try + { + InternalError?.Invoke(this, ex); + } + catch + { + // A faulting InternalError handler must not break DeviceReceived fan-out. + } + } + } + } + private void ProcessCurrentDocument(IStreamsResponseDocument document, CancellationToken cancel) { _lastResponse = UnixDateTime.Now; @@ -1092,7 +1178,8 @@ private IComponentStream ProcessComponentStream(IMTConnectStreamsHeader header, outputComponentStream.NativeName = inputComponentStream.NativeName; outputComponentStream.Uuid = inputComponentStream.Uuid; - if (inputComponentStream.ComponentType == Agent.TypeId || inputComponentStream.ComponentType == Devices.Device.TypeId) + // Fully-qualified to disambiguate from the Devices property below; the namespace using above is shadowed inside this class. + if (inputComponentStream.ComponentType == Agent.TypeId || inputComponentStream.ComponentType == MTConnect.Devices.Device.TypeId) { outputComponentStream.Component = GetCachedDevice(deviceUuid); } @@ -1148,7 +1235,8 @@ private async void CheckAssetChanged(IEnumerable observations, Can { if (observations != null && observations.Count() > 0) { - var assetsChanged = observations.Where(o => o.Type.ToUnderscoreUpper() == Devices.DataItems.AssetChangedDataItem.TypeId); + // Fully-qualified to disambiguate from the Devices property below; the namespace using above is shadowed inside this class. + var assetsChanged = observations.Where(o => o.Type.ToUnderscoreUpper() == MTConnect.Devices.DataItems.AssetChangedDataItem.TypeId); if (assetsChanged != null) { foreach (var assetChanged in assetsChanged) diff --git a/tests/MTConnect.NET-HTTP-Tests/Clients/HttpClientDeviceModel.cs b/tests/MTConnect.NET-HTTP-Tests/Clients/HttpClientDeviceModel.cs new file mode 100644 index 000000000..1024c084c --- /dev/null +++ b/tests/MTConnect.NET-HTTP-Tests/Clients/HttpClientDeviceModel.cs @@ -0,0 +1,410 @@ +// Copyright (c) 2026 TrakHound Inc., All Rights Reserved. +// TrakHound Inc. licenses this file to you under the MIT license. + +using MTConnect.Clients; +using MTConnect.Devices; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace MTConnect.Tests.Http.Clients +{ + /// + /// Drives the full MTConnectHttpClient (the streaming client, not the + /// single-shot probe client) against the real embedded MTConnectHttpServer + /// started by AgentRunner, pinning the two surfaces issue #176 added: + /// the public Devices snapshot accessor (which exposes the post-probe + /// device cache the client already maintains internally) and the + /// DeviceReceived event (which historically built an empty list and + /// iterated it, so it never fired in the field). Both invariants are + /// exercised end to end through Start()/Stop(), so the probe round trip + /// and the ProcessProbeDocument hand-off both run. + /// + [TestFixture] + public class HttpClientDeviceModel : HttpClientFixture + { + // Generous CI-safe bounds. The streaming client raises DeviceReceived + // and populates the snapshot from the first probe, which AgentRunner + // guarantees is answerable before the fixture returns from Start(). + private const int ProbeWaitTimeoutMs = 15000; + private const int ProbeWaitPollMs = 50; + + /// Pins the behaviour expressed by the test name: devices accessor is empty before probe. + [Test] + public void DevicesAccessorIsEmptyBeforeProbe() + { + var client = new MTConnectHttpClient(Hostname, Port); + + Assert.That(client.Devices, Is.Not.Null, "Devices accessor returned null"); + Assert.That(client.Devices.Count, Is.EqualTo(0), "Devices accessor was non-empty before any probe ran"); + } + + /// Pins the behaviour expressed by the test name: devices accessor is populated after probe. + [Test] + public void DevicesAccessorIsPopulatedAfterProbe() + { + var client = new MTConnectHttpClient(Hostname, Port); + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + // The accessor returns a snapshot, so capture once and assert + // against the captured dictionary. + var snapshot = client.Devices; + + Assert.That(snapshot, Is.Not.Null, "Devices accessor returned null after probe"); + Assert.That(snapshot.Count, Is.EqualTo(ExpectedDocumentEntryCount), "Devices accessor did not surface every probed device"); + Assert.That(snapshot.Values.Any(d => d.Name == DeviceName), Is.True, $"Devices accessor did not surface the {DeviceName} device"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: devices accessor returns snapshot not live view. + [Test] + public void DevicesAccessorReturnsSnapshotNotLiveView() + { + var client = new MTConnectHttpClient(Hostname, Port); + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + var snapshot = client.Devices; + + // The accessor must hand back an independent snapshot, not a + // reference to the live cache — mutating the returned + // dictionary must not be possible, and successive reads must + // not alias one another. + Assert.That(snapshot, Is.InstanceOf>(), "Devices accessor did not return an IReadOnlyDictionary"); + Assert.That(ReferenceEquals(snapshot, client.Devices), Is.False, "Devices accessor handed back an aliased instance instead of a snapshot"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: device received fires once per device on first probe. + [Test] + public void DeviceReceivedFiresOncePerDeviceOnFirstProbe() + { + var client = new MTConnectHttpClient(Hostname, Port); + + // Capture both the count and the carried IDevice instances so we + // can assert the wired model is delivered, not a placeholder. + var receivedDevices = new List(); + var receivedLock = new object(); + client.DeviceReceived += (_, device) => + { + lock (receivedLock) { receivedDevices.Add(device); } + }; + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + // The event is raised from inside ProcessProbeDocument, which + // runs on the worker thread; ProbeReceived fires at the end of + // that same method, so once it has been observed, every + // DeviceReceived invocation for this probe has already + // happened. + List snapshot; + lock (receivedLock) { snapshot = new List(receivedDevices); } + + Assert.That(snapshot.Count, Is.EqualTo(ExpectedDocumentEntryCount), "DeviceReceived did not fire once per probed device"); + Assert.That(snapshot.All(d => d != null), Is.True, "DeviceReceived carried a null device"); + Assert.That(snapshot.Any(d => d.Name == DeviceName), Is.True, $"DeviceReceived did not deliver the {DeviceName} device"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: device received carries wired data item back pointers. + [Test] + public void DeviceReceivedCarriesWiredDataItemBackPointers() + { + var client = new MTConnectHttpClient(Hostname, Port); + + IDevice? targetDevice = null; + var targetLock = new object(); + client.DeviceReceived += (_, device) => + { + if (device?.Name == DeviceName) + { + lock (targetLock) { targetDevice = device; } + } + }; + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + IDevice? captured; + lock (targetLock) { captured = targetDevice; } + + Assert.That(captured, Is.Not.Null, $"DeviceReceived did not deliver the {DeviceName} device"); + + // Walk the DataItem tree and assert the back-pointers the + // issue calls out are wired through to the caller — the whole + // point of surfacing the cached IDevice is access to this + // ancestry without re-parsing. + var dataItems = captured!.GetDataItems().ToList(); + Assert.That(dataItems, Is.Not.Empty, $"Device {DeviceName} carried no DataItems"); + + foreach (var dataItem in dataItems) + { + Assert.That(dataItem.Device, Is.Not.Null, $"DataItem {dataItem.Id} lost its Device back-pointer"); + Assert.That(dataItem.Container, Is.Not.Null, $"DataItem {dataItem.Id} lost its Container back-pointer"); + } + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: device received fires on every subsequent probe, not just the first. + [Test] + public void DeviceReceivedFiresOnEverySubsequentProbe() + { + var client = new MTConnectHttpClient(Hostname, Port); + + using var firstProbeSignal = new ManualResetEventSlim(false); + using var secondProbeSignal = new ManualResetEventSlim(false); + var probeCount = 0; + client.ProbeReceived += (_, _) => + { + var n = Interlocked.Increment(ref probeCount); + if (n == 1) firstProbeSignal.Set(); + else if (n == 2) secondProbeSignal.Set(); + }; + + var deviceFireCount = 0; + client.DeviceReceived += (_, _) => Interlocked.Increment(ref deviceFireCount); + + try + { + client.Start(); + Assert.That(firstProbeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "First ProbeReceived did not fire within the timeout"); + + var firstFireCount = Volatile.Read(ref deviceFireCount); + Assert.That(firstFireCount, Is.EqualTo(ExpectedDocumentEntryCount), "DeviceReceived did not fire once per device on the first probe"); + } + finally + { + client.Stop(); + } + + // Restart the client to force a second probe round-trip through the same + // agent; the event must fire again for every device in the second probe, + // not stay quiet. + try + { + client.Start(); + Assert.That(secondProbeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "Second ProbeReceived did not fire within the timeout"); + + var totalFireCount = Volatile.Read(ref deviceFireCount); + Assert.That(totalFireCount, Is.EqualTo(ExpectedDocumentEntryCount * 2), "DeviceReceived did not re-fire on the second probe"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: devices accessor reflects the latest probe with stale entries evicted. + [Test] + public void DevicesAccessorReflectsLatestProbeWithStaleEntriesEvicted() + { + var client = new MTConnectHttpClient(Hostname, Port); + + using var firstProbeSignal = new ManualResetEventSlim(false); + using var secondProbeSignal = new ManualResetEventSlim(false); + var probeCount = 0; + client.ProbeReceived += (_, _) => + { + var n = Interlocked.Increment(ref probeCount); + if (n == 1) firstProbeSignal.Set(); + else if (n == 2) secondProbeSignal.Set(); + }; + + try + { + client.Start(); + Assert.That(firstProbeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "First ProbeReceived did not fire within the timeout"); + } + finally + { + client.Stop(); + } + + try + { + client.Start(); + Assert.That(secondProbeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "Second ProbeReceived did not fire within the timeout"); + + // The cache is cleared at the start of every probe, so the post-second-probe + // snapshot count must equal the agent's device count, not double it. This + // pins the eviction-on-reprobe contract surfaced by the §19 review. + var snapshot = client.Devices; + Assert.That(snapshot.Count, Is.EqualTo(ExpectedDocumentEntryCount), "Devices accessor accumulated devices across probes instead of evicting stale entries"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: device received handler throwing does not break cache population. + [Test] + public void DeviceReceivedHandlerThrowingDoesNotBreakCachePopulation() + { + var client = new MTConnectHttpClient(Hostname, Port); + + var internalErrorCount = 0; + client.InternalError += (_, _) => Interlocked.Increment(ref internalErrorCount); + + // First subscriber throws on every fan-out; second subscriber records every + // device it sees. With exception-isolation in place, the cache must still + // fully populate, the second subscriber must still see every device, and the + // throw must surface through InternalError. + client.DeviceReceived += (_, _) => throw new InvalidOperationException("intentional test fault"); + + var receivedDevices = new List(); + var receivedLock = new object(); + client.DeviceReceived += (_, device) => + { + lock (receivedLock) { receivedDevices.Add(device); } + }; + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + List snapshot; + lock (receivedLock) { snapshot = new List(receivedDevices); } + + Assert.That(snapshot.Count, Is.EqualTo(ExpectedDocumentEntryCount), "Throwing handler suppressed downstream subscribers"); + Assert.That(client.Devices.Count, Is.EqualTo(ExpectedDocumentEntryCount), "Throwing handler corrupted the cache fill"); + Assert.That(Volatile.Read(ref internalErrorCount), Is.GreaterThanOrEqualTo(ExpectedDocumentEntryCount), "Throwing handler did not surface through InternalError"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: device received fires for all subscribers when one throws. + [Test] + public void DeviceReceivedFiresForAllSubscribersWhenOneThrows() + { + // Wire two subscribers: a throwing one FIRST, a recording one SECOND. + // The recording subscriber must still receive every device because + // RaiseDeviceReceived iterates the invocation list with per-delegate + // try/catch, not a single try/catch over the multicast. + var client = new MTConnectHttpClient(Hostname, Port); + + client.DeviceReceived += (_, _) => throw new InvalidOperationException("first handler throws"); + + var recorded = new List(); + var recordedLock = new object(); + client.DeviceReceived += (_, device) => + { + lock (recordedLock) { recorded.Add(device); } + }; + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + List snapshot; + lock (recordedLock) { snapshot = new List(recorded); } + + Assert.That(snapshot.Count, Is.EqualTo(ExpectedDocumentEntryCount), + "subscribers after a throwing one must still be invoked"); + } + finally + { + client.Stop(); + } + } + + /// Pins the behaviour expressed by the test name: internal error handler throwing does not break device received fan out. + [Test] + public void InternalErrorHandlerThrowingDoesNotBreakDeviceReceivedFanOut() + { + // The fix routes swallowed DeviceReceived faults through InternalError. + // If InternalError ITSELF throws, that secondary fault must also be + // swallowed so the rest of the DeviceReceived fan-out continues. + var client = new MTConnectHttpClient(Hostname, Port); + + client.InternalError += (_, _) => throw new InvalidOperationException("InternalError throws"); + client.DeviceReceived += (_, _) => throw new InvalidOperationException("first DeviceReceived throws"); + + var recorded = new List(); + var recordedLock = new object(); + client.DeviceReceived += (_, device) => + { + lock (recordedLock) { recorded.Add(device); } + }; + + using var probeSignal = new ManualResetEventSlim(false); + client.ProbeReceived += (_, _) => probeSignal.Set(); + + try + { + client.Start(); + + Assert.That(probeSignal.Wait(ProbeWaitTimeoutMs), Is.True, "ProbeReceived did not fire within the timeout"); + + List snapshot; + lock (recordedLock) { snapshot = new List(recorded); } + + Assert.That(snapshot.Count, Is.EqualTo(ExpectedDocumentEntryCount), + "InternalError throwing must not break the DeviceReceived fan-out"); + } + finally + { + client.Stop(); + } + } + } +}