11using System ;
2+ using System . Collections . Generic ;
3+ using System . Linq ;
24using System . Net ;
35using System . Threading ;
46using System . Threading . Tasks ;
@@ -118,18 +120,21 @@ public sealed class Watch : ConsulConfigurationClientTests
118120 private async Task ShouldCallReloadOnChangeTokenIfIndexForKeyHasUpdated ( )
119121 {
120122 var configChangedCompletion = new TaskCompletionSource < bool > ( ) ;
123+ var getKvTaskSource = new TaskCompletionSource < QueryResult < KVPair > > ( ) ;
121124 _kvMock
122125 . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
123- . ReturnsAsync (
124- new QueryResult < KVPair >
125- {
126- LastIndex = 1 ,
127- StatusCode = HttpStatusCode . OK
128- } ) ;
126+ . Returns ( getKvTaskSource . Task ) ;
127+
129128 ChangeToken . OnChange (
130129 ( ) => _consulConfigurationClient . Watch ( null ) ,
131130 ( ) => configChangedCompletion . SetResult ( true ) ) ;
132131
132+ getKvTaskSource . SetResult (
133+ new QueryResult < KVPair >
134+ {
135+ LastIndex = 1 ,
136+ StatusCode = HttpStatusCode . OK
137+ } ) ;
133138 bool completed = await configChangedCompletion . Task ;
134139
135140 completed . Should ( ) . BeTrue ( ) ;
@@ -141,10 +146,11 @@ private async Task ShouldInvokeExceptionActionWhenWatchThrowsException()
141146 Exception actualException = null ;
142147 var expectedException = new Exception ( ) ;
143148 var configChangedCompletion = new TaskCompletionSource < bool > ( ) ;
149+ var getKvTaskSource = new TaskCompletionSource < QueryResult < KVPair > > ( ) ;
144150
145151 _kvMock
146152 . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
147- . ThrowsAsync ( expectedException ) ;
153+ . Returns ( getKvTaskSource . Task ) ;
148154
149155 _consulConfigurationClient . Watch (
150156 exceptionContext =>
@@ -154,129 +160,147 @@ private async Task ShouldInvokeExceptionActionWhenWatchThrowsException()
154160 configChangedCompletion . SetResult ( true ) ;
155161 } ) ;
156162
163+ getKvTaskSource . SetException ( expectedException ) ;
157164 await configChangedCompletion . Task ;
158165
159166 actualException . Should ( ) . BeSameAs ( expectedException ) ;
160167 }
161168
162169 [ Fact ]
163- private void ShouldUseLongPollingToPollForChanges ( )
170+ private async Task ShouldUseLongPollingToPollForChanges ( )
164171 {
165- Func < Task < bool > > watching = ( ) => SimulateConfigChange ( 1 ) ;
172+ var kvTaskSources = Enumerable . Range ( 0 , 10 ) . Select ( i => new TaskCompletionSource < QueryResult < KVPair > > ( ) ) . ToArray ( ) ;
173+ var kvTaskQueue = new Queue < Task < QueryResult < KVPair > > > ( kvTaskSources . Select ( kts => kts . Task ) ) ;
174+
175+ _kvMock
176+ . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
177+ . Returns ( ( ) => kvTaskQueue . Dequeue ( ) ) ;
178+
179+ var watchCompletion = new TaskCompletionSource < bool > ( ) ;
180+ _consulConfigurationClient
181+ . Watch ( exceptionContext =>
182+ {
183+ watchCompletion . SetResult ( false ) ;
184+ throw exceptionContext . Exception ;
185+ } )
186+ . RegisterChangeCallback ( o => watchCompletion . SetResult ( true ) , new object ( ) ) ;
166187
167- watching . Should ( ) . NotThrow ( ) ;
188+ // The first 5 long polling calls return an unchanged last index
189+ foreach ( int i in Enumerable . Range ( 0 , 5 ) )
190+ {
191+ kvTaskSources [ i ] . SetResult (
192+ new QueryResult < KVPair >
193+ {
194+ LastIndex = 0 ,
195+ StatusCode = HttpStatusCode . OK
196+ } ) ;
197+ }
198+
199+ // The 6th call returns an updated index indicating that the config has changed
200+ kvTaskSources [ 5 ] . SetResult (
201+ new QueryResult < KVPair >
202+ {
203+ LastIndex = 1 ,
204+ StatusCode = HttpStatusCode . OK
205+ } ) ;
206+
207+ await watchCompletion . Task ;
208+
209+ Action verifying = ( ) => _kvMock
210+ . Verify ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) , Times . Exactly ( 6 ) ) ;
168211 }
169212
170213 [ Fact ]
171214 private async Task ShouldUseLongPollingWithLatestIndexFromGet ( )
172215 {
173- ulong lastWaitIndex = 0 ;
174- const ulong lastIndex = 1 ;
175- var completion = new TaskCompletionSource < bool > ( ) ;
216+ ulong ? watchWaitIndex = 0 ;
217+ var getKvTaskSource = new TaskCompletionSource < QueryResult < KVPair > > ( ) ;
218+ var watchKvTaskSource = new TaskCompletionSource < QueryResult < KVPair > > ( ) ;
219+ var kvTaskQueue = new Queue < Task < QueryResult < KVPair > > > (
220+ new List < Task < QueryResult < KVPair > > > { getKvTaskSource . Task , watchKvTaskSource . Task } ) ;
176221
177- // Get config once which should update the latest index
178222 _kvMock
179223 . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
180- . ReturnsAsync (
181- new QueryResult < KVPair >
224+ . Returns ( ( string key , QueryOptions options , CancellationToken cancellationToken ) =>
225+ {
226+ watchWaitIndex = options ? . WaitIndex ;
227+ return kvTaskQueue . Dequeue ( ) ;
228+ } ) ;
229+
230+ getKvTaskSource . SetResult (
231+ new QueryResult < KVPair >
182232 {
183233 LastIndex = 1 ,
184234 StatusCode = HttpStatusCode . OK
185235 } ) ;
186236
237+ // Get config once which should update the latest index
187238 await _consulConfigurationClient . GetConfig ( ) ;
188239
189- var result = new QueryResult < KVPair >
190- {
191- LastIndex = lastIndex + 1 ,
192- StatusCode = HttpStatusCode . OK
193- } ;
194- _kvMock
195- . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
196- . Callback (
197- ( string key , QueryOptions options , CancellationToken cancellationToken ) =>
198- {
199- lastWaitIndex = options . WaitIndex ;
200- } )
201- . ReturnsAsync ( result ) ;
202-
203- // Calling it a second time should invoke a long polling with the index from the last update
204- _consulConfigurationClient . Watch ( null ) . RegisterChangeCallback (
205- o => completion . SetResult ( true ) ,
206- new object ( ) ) ;
240+ // Calling it a second time should invoke a long polling with the index from the previous get call
241+ var watchCompletion = new TaskCompletionSource < bool > ( ) ;
242+ _consulConfigurationClient
243+ . Watch ( null )
244+ . RegisterChangeCallback ( o => watchCompletion . SetResult ( true ) , new object ( ) ) ;
207245
208- await completion . Task ;
246+ watchKvTaskSource . SetResult (
247+ new QueryResult < KVPair >
248+ {
249+ LastIndex = 2 ,
250+ StatusCode = HttpStatusCode . OK
251+ } ) ;
252+ await watchCompletion . Task ;
209253
210- lastWaitIndex . Should ( ) . Be ( lastIndex ) ;
254+ watchWaitIndex . Should ( ) . Be ( 1 ) ;
211255 }
212256
213257 [ Fact ]
214258 private async Task ShouldUseLongPollingWithWaitIndexFromPreviousWatch ( )
215259 {
216- ulong lastWaitIndex = 0 ;
217- const ulong lastIndex = 1 ;
218- var completion = new TaskCompletionSource < bool > ( ) ;
260+ ulong ? waitIndex = 0 ;
261+ var watchKvTaskSource1 = new TaskCompletionSource < QueryResult < KVPair > > ( ) ;
262+ var watchKvTaskSource2 = new TaskCompletionSource < QueryResult < KVPair > > ( ) ;
263+ var kvTaskQueue = new Queue < Task < QueryResult < KVPair > > > (
264+ new List < Task < QueryResult < KVPair > > > { watchKvTaskSource1 . Task , watchKvTaskSource2 . Task } ) ;
219265
220- // Simulate the first change in config which generates a new index
221- await SimulateConfigChange ( lastIndex ) ;
222-
223- var result = new QueryResult < KVPair >
224- {
225- LastIndex = lastIndex + 1 ,
226- StatusCode = HttpStatusCode . OK
227- } ;
228266 _kvMock
229267 . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
230- . Callback (
231- ( string key , QueryOptions options , CancellationToken cancellationToken ) =>
232- {
233- lastWaitIndex = options . WaitIndex ;
234- } )
235- . ReturnsAsync ( result ) ;
236-
237- // Calling it a second time should invoke a long polling with the index from the last update
238- _consulConfigurationClient . Watch ( null ) . RegisterChangeCallback (
239- o => completion . SetResult ( true ) ,
240- new object ( ) ) ;
241-
242- await completion . Task ;
243-
244- lastWaitIndex . Should ( ) . Be ( lastIndex ) ;
245- }
268+ . Returns ( ( string key , QueryOptions options , CancellationToken cancellationToken ) =>
269+ {
270+ waitIndex = options ? . WaitIndex ;
271+ return kvTaskQueue . Dequeue ( ) ;
272+ } ) ;
246273
247- private async Task < bool > SimulateConfigChange ( ulong lastIndex )
248- {
249- var configChangedCompletion = new TaskCompletionSource < bool > ( ) ;
274+ // The KV result initiated by the first watch returns with an updated index of 1
275+ var watchCompletion1 = new TaskCompletionSource < bool > ( ) ;
276+ _consulConfigurationClient
277+ . Watch ( null )
278+ . RegisterChangeCallback ( o => watchCompletion1 . SetResult ( true ) , new object ( ) ) ;
250279
251- // Initially setup mock with 0 last index so that no changes occur
252- var result = new QueryResult < KVPair >
253- {
254- LastIndex = 0 ,
255- StatusCode = HttpStatusCode . OK
256- } ;
257- _kvMock
258- . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
259- . ReturnsAsync ( result ) ;
280+ watchKvTaskSource1 . SetResult (
281+ new QueryResult < KVPair >
282+ {
283+ LastIndex = 1 ,
284+ StatusCode = HttpStatusCode . OK
285+ } ) ;
286+ await watchCompletion1 . Task ;
260287
261- // Watch for changes
288+ // The KV result from the second watch returns with an updated index so that it can be determined that it ran inside the watch
289+ var watchCompletion2 = new TaskCompletionSource < bool > ( ) ;
262290 _consulConfigurationClient
263- . Watch (
264- exceptionContext =>
291+ . Watch ( null )
292+ . RegisterChangeCallback ( o => watchCompletion2 . SetResult ( true ) , new object ( ) ) ;
293+
294+ watchKvTaskSource2 . SetResult (
295+ new QueryResult < KVPair >
265296 {
266- _cancellationTokenSource . Cancel ( ) ;
267- configChangedCompletion . SetException ( exceptionContext . Exception ) ;
268- } )
269- . RegisterChangeCallback (
270- o => configChangedCompletion . SetResult ( true ) ,
271- new object ( ) ) ;
272-
273- // Update mocked result to return a higher last index, which is what happens when changes occur
274- result . LastIndex = lastIndex ;
275- _kvMock
276- . Setup ( kv => kv . Get ( "Test" , It . IsAny < QueryOptions > ( ) , _cancellationToken ) )
277- . ReturnsAsync ( result ) ;
297+ LastIndex = 2 ,
298+ StatusCode = HttpStatusCode . OK
299+ } ) ;
300+ await watchCompletion2 . Task ;
278301
279- return await configChangedCompletion . Task ;
302+ // The wait index sent the second time should be the value returned from the first KV result
303+ waitIndex . Should ( ) . Be ( 1 ) ;
280304 }
281305 }
282306 }
0 commit comments