Skip to content

Commit c49a0f5

Browse files
iwillspeakChoc13
authored andcommitted
Configuration Provider Cancellation Refactor (#78)
* Add Configuration Overloads without Cancellation Updates the cofiguration builder extensions to allow adding the consul configuration provider without specifying a cancellation token. If no token is provided then the default cancellation token is used instead. * Cancel Watching if Config Provider Disposed Add an `IDisposable` implemnetation to the `ConsulConfigurationProvider`. When disposed the provider cancels the watch of the KV node. This brings the cancellation behavior more in line with that of other config providers. There are still a few open questions with this however: * The cancellation token source _and_ chnage token registration are a bit clunky. Ideally we could just have one or the other but the change token doesn't cancel the watch properly if that's all we have. * Ideally if we _do_ need two cancellation tokens, one from the source and one to pass along the `Dispose` call we would combine them with `CancellationTokenSource.CreateLinkedTokenSource` rather than the registration we have right now. This may work better if the watching of the KV node is moved more into the configuration provider. * Remove `CancellationToken` from `ConsulConfigurationSource` With the `IDisposable` support for the `ConsulConfiugrationProvider` this token is no longer needed. semver: breaking. * Fixup Unit Test Mocking Switch to accepting any cancellation token to fixup the failing test cases * Switch to Task for Polling Changes Move away from using change tokens to provide better `async` support when checking for changes. This brings the polling for changes more in line with other config providers with background polling task. * Fixup Documentation Violations * Address Race in Reload Test Fixup the race between the test and the background task performing a reload. * Make `PollWaitTime` Configurable Addresses the confugrability of the wait when polling for changes. Asrequested in PR comments. * Extract Polling into Separate Method This runs the first part of the poll within the context of the constructor to bring the behaviour back in line with the origional changetoken based polling. * Code Review Fixups Address issues from PR review. * Refactor to remove ConsulConfigurationClient class. * Re-add net461 and add test for default KeyToRemove. * Fix comments.
1 parent 9888d29 commit c49a0f5

13 files changed

Lines changed: 753 additions & 748 deletions

src/Winton.Extensions.Configuration.Consul/ConfigurationBuilderExtensions.cs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,12 @@ public static class ConfigurationBuilderExtensions
1919
/// </summary>
2020
/// <param name="builder">The builder to add consul to.</param>
2121
/// <param name="key">The key in consul where the configuration is located.</param>
22-
/// <param name="cancellationToken">
23-
/// The <see cref="CancellationToken" /> used to cancel any open Consul connections or
24-
/// watchers.
25-
/// </param>
2622
/// <returns>The builder.</returns>
2723
public static IConfigurationBuilder AddConsul(
2824
this IConfigurationBuilder builder,
29-
string key,
30-
CancellationToken cancellationToken)
25+
string key)
3126
{
32-
return builder.AddConsul(key, cancellationToken, options => { });
27+
return builder.AddConsul(key, options => { });
3328
}
3429

3530
/// <summary>
@@ -38,21 +33,16 @@ public static IConfigurationBuilder AddConsul(
3833
/// </summary>
3934
/// <param name="builder">The builder to add consul to.</param>
4035
/// <param name="key">The key in consul where the configuration is located.</param>
41-
/// <param name="cancellationToken">
42-
/// The <see cref="CancellationToken" /> used to cancel any open Consul connections or
43-
/// watchers.
44-
/// </param>
4536
/// <param name="options">An action used to configure the options of the <see cref="IConsulConfigurationSource" />.</param>
4637
/// <returns>The builder.</returns>
4738
public static IConfigurationBuilder AddConsul(
4839
this IConfigurationBuilder builder,
4940
string key,
50-
CancellationToken cancellationToken,
5141
Action<IConsulConfigurationSource> options)
5242
{
53-
var consulConfigSource = new ConsulConfigurationSource(key, cancellationToken);
43+
var consulConfigSource = new ConsulConfigurationSource(key);
5444
options(consulConfigSource);
5545
return builder.Add(consulConfigSource);
5646
}
5747
}
58-
}
48+
}

src/Winton.Extensions.Configuration.Consul/ConsulConfigurationClient.cs

Lines changed: 0 additions & 126 deletions
This file was deleted.

src/Winton.Extensions.Configuration.Consul/ConsulConfigurationProvider.cs

Lines changed: 119 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,74 +2,86 @@
22
// Licensed under the Apache License, Version 2.0. See LICENCE in the project root for license information.
33

44
using System;
5-
using System.Linq;
5+
using System.Net;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Consul;
89
using Microsoft.Extensions.Configuration;
9-
using Microsoft.Extensions.Primitives;
1010
using Winton.Extensions.Configuration.Consul.Extensions;
1111

1212
namespace Winton.Extensions.Configuration.Consul
1313
{
14-
internal sealed class ConsulConfigurationProvider : ConfigurationProvider
14+
/// <summary>
15+
/// Each instance loads configuration for the key in Consul that is specified in
16+
/// the contained <see cref="IConsulConfigurationSource"/>.
17+
/// It has the ability to automatically reload the config if it changes in Consul.
18+
/// </summary>
19+
/// <remarks>
20+
/// Each instance maintains its own <c>lastIndex</c> and uses this to detect changes.
21+
/// Each instance ensures calls to Consul are serialised, to avoid concurrent access to <c>lastIndex</c>.
22+
/// </remarks>
23+
internal sealed class ConsulConfigurationProvider : ConfigurationProvider, IDisposable
1524
{
16-
private readonly IConsulConfigurationClient _consulConfigClient;
25+
private readonly CancellationTokenSource _cancellationTokenSource;
26+
private readonly IConsulClientFactory _consulClientFactory;
1727
private readonly IConsulConfigurationSource _source;
28+
private ulong _lastIndex;
29+
private Task _pollTask;
1830

1931
public ConsulConfigurationProvider(
2032
IConsulConfigurationSource source,
21-
IConsulConfigurationClient consulConfigClient)
33+
IConsulClientFactory consulClientFactory)
2234
{
2335
if (source.Parser == null)
2436
{
2537
throw new ArgumentNullException(nameof(source.Parser));
2638
}
2739

28-
_consulConfigClient = consulConfigClient;
2940
_source = source;
41+
_consulClientFactory = consulClientFactory;
42+
_cancellationTokenSource = new CancellationTokenSource();
43+
}
3044

31-
if (source.ReloadOnChange)
32-
{
33-
ChangeToken.OnChange(
34-
() => _consulConfigClient.Watch(_source.Key, _source.OnWatchException, _source.CancellationToken),
35-
async () =>
36-
{
37-
await DoLoad(true).ConfigureAwait(false);
38-
OnReload();
39-
});
40-
}
45+
public void Dispose()
46+
{
47+
_cancellationTokenSource.Cancel();
48+
_pollTask?.Wait(500);
49+
_cancellationTokenSource.Dispose();
4150
}
4251

4352
public override void Load()
4453
{
45-
DoLoad(false).GetAwaiter().GetResult();
54+
// If polling has already begun then calling load is pointless
55+
if (_pollTask != null)
56+
{
57+
return;
58+
}
59+
60+
DoLoad().GetAwaiter().GetResult();
61+
62+
// Polling starts after the initial load to ensure no concurrent access to the key from this instance
63+
if (_source.ReloadOnChange)
64+
{
65+
_pollTask = Task.Run(PollingLoop);
66+
}
4667
}
4768

48-
private async Task DoLoad(bool reloading)
69+
private async Task DoLoad()
4970
{
5071
try
5172
{
52-
QueryResult<KVPair[]> result = await _consulConfigClient
53-
.GetConfig(_source.Key, _source.CancellationToken)
54-
.ConfigureAwait(false);
55-
if (!result.HasValue() && !_source.Optional)
56-
{
57-
if (!reloading)
58-
{
59-
throw new Exception(
60-
$"The configuration for key {_source.Key} was not found and is not optional.");
61-
}
73+
QueryResult<KVPair[]> result = await GetKvPairs(false).ConfigureAwait(false);
6274

63-
// Don't overwrite mandatory config with empty data if not found when reloading
64-
return;
75+
if (result.HasValue())
76+
{
77+
SetData(result);
78+
}
79+
else if (!_source.Optional)
80+
{
81+
throw new Exception($"The configuration for key {_source.Key} was not found and is not optional.");
6582
}
6683

67-
string keyToRemove = _source.KeyToRemove ?? _source.Key;
68-
69-
Data = (result?.Response ?? new KVPair[0])
70-
.Where(kvp => kvp.HasValue())
71-
.SelectMany(kvp => kvp.ConvertToConfig(keyToRemove, _source.Parser))
72-
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value, StringComparer.OrdinalIgnoreCase);
84+
SetLastIndex(result);
7385
}
7486
catch (Exception exception)
7587
{
@@ -81,5 +93,76 @@ private async Task DoLoad(bool reloading)
8193
}
8294
}
8395
}
96+
97+
private async Task<QueryResult<KVPair[]>> GetKvPairs(bool waitForChange)
98+
{
99+
using (IConsulClient consulClient = _consulClientFactory.Create())
100+
{
101+
var queryOptions = new QueryOptions
102+
{
103+
WaitTime = _source.PollWaitTime,
104+
WaitIndex = waitForChange ? _lastIndex : 0
105+
};
106+
107+
QueryResult<KVPair[]> result =
108+
await consulClient
109+
.KV
110+
.List(_source.Key, queryOptions, _cancellationTokenSource.Token)
111+
.ConfigureAwait(false);
112+
113+
switch (result.StatusCode)
114+
{
115+
case HttpStatusCode.OK:
116+
case HttpStatusCode.NotFound:
117+
return result;
118+
default:
119+
throw new Exception(
120+
$"Error loading configuration from consul. Status code: {result.StatusCode}.");
121+
}
122+
}
123+
}
124+
125+
private async Task PollingLoop()
126+
{
127+
var consecutiveFailureCount = 0;
128+
while (!_cancellationTokenSource.Token.IsCancellationRequested)
129+
{
130+
try
131+
{
132+
QueryResult<KVPair[]> result = await GetKvPairs(true).ConfigureAwait(false);
133+
134+
if (result.HasValue() && result.LastIndex > _lastIndex)
135+
{
136+
SetData(result);
137+
OnReload();
138+
}
139+
140+
SetLastIndex(result);
141+
consecutiveFailureCount = 0;
142+
}
143+
catch (Exception exception)
144+
{
145+
TimeSpan wait =
146+
_source.OnWatchException?.Invoke(
147+
new ConsulWatchExceptionContext(exception, ++consecutiveFailureCount, _source)) ??
148+
TimeSpan.FromSeconds(5);
149+
await Task.Delay(wait, _cancellationTokenSource.Token);
150+
}
151+
}
152+
}
153+
154+
private void SetData(QueryResult<KVPair[]> result)
155+
{
156+
Data = result.ToConfigDictionary(_source.KeyToRemove, _source.Parser);
157+
}
158+
159+
private void SetLastIndex(QueryResult result)
160+
{
161+
_lastIndex = result.LastIndex == 0
162+
? 1
163+
: result.LastIndex < _lastIndex
164+
? 0
165+
: result.LastIndex;
166+
}
84167
}
85168
}

0 commit comments

Comments
 (0)