Skip to content

Commit 5483c77

Browse files
authored
Support expanded keys that all contain json config. (#38)
* Support expanded keys that all contain json config. * Remove typo in documentation.
1 parent 1d01fc3 commit 5483c77

13 files changed

Lines changed: 656 additions & 378 deletions

src/Winton.Extensions.Configuration.Consul/ConsulConfigurationClient.cs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See LICENCE in the project root for license information.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Net;
67
using System.Threading;
78
using System.Threading.Tasks;
@@ -15,38 +16,36 @@ internal sealed class ConsulConfigurationClient : IConsulConfigurationClient
1516
{
1617
private readonly IConsulClientFactory _consulClientFactory;
1718
private readonly object _lastIndexLock = new object();
18-
private readonly IConsulConfigurationSource _source;
1919

2020
private ulong _lastIndex;
2121
private ConfigurationReloadToken _reloadToken = new ConfigurationReloadToken();
2222

23-
public ConsulConfigurationClient(IConsulClientFactory consulClientFactory, IConsulConfigurationSource source)
23+
public ConsulConfigurationClient(IConsulClientFactory consulClientFactory)
2424
{
2525
_consulClientFactory = consulClientFactory;
26-
_source = source;
2726
}
2827

29-
public async Task<QueryResult<KVPair>> GetConfig()
28+
public async Task<QueryResult<KVPair[]>> GetConfig(string key, CancellationToken cancellationToken)
3029
{
31-
QueryResult<KVPair> result = await GetKvPair().ConfigureAwait(false);
30+
QueryResult<KVPair[]> result = await GetKvPairs(key, cancellationToken).ConfigureAwait(false);
3231
UpdateLastIndex(result);
3332
return result;
3433
}
3534

36-
public IChangeToken Watch(Action<ConsulWatchExceptionContext> onException)
35+
public IChangeToken Watch(string key, Action<ConsulWatchExceptionContext> onException, CancellationToken cancellationToken)
3736
{
38-
Task.Run(() => PollForChanges(onException));
37+
Task.Run(() => PollForChanges(key, onException, cancellationToken));
3938
return _reloadToken;
4039
}
4140

42-
private async Task<QueryResult<KVPair>> GetKvPair(QueryOptions queryOptions = null)
41+
private async Task<QueryResult<KVPair[]>> GetKvPairs(string key, CancellationToken cancellationToken, QueryOptions queryOptions = null)
4342
{
4443
using (IConsulClient consulClient = _consulClientFactory.Create())
4544
{
46-
QueryResult<KVPair> result =
45+
QueryResult<KVPair[]> result =
4746
await consulClient
4847
.KV
49-
.Get(_source.Key, queryOptions, _source.CancellationToken)
48+
.List(key, queryOptions, cancellationToken)
5049
.ConfigureAwait(false);
5150

5251
switch (result.StatusCode)
@@ -61,25 +60,25 @@ await consulClient
6160
}
6261
}
6362

64-
private async Task<bool> HasValueChanged()
63+
private async Task<bool> HasValueChanged(string key, CancellationToken cancellationToken)
6564
{
6665
QueryOptions queryOptions;
6766
lock (_lastIndexLock)
6867
{
6968
queryOptions = new QueryOptions { WaitIndex = _lastIndex };
7069
}
7170

72-
QueryResult<KVPair> result = await GetKvPair(queryOptions).ConfigureAwait(false);
71+
QueryResult<KVPair[]> result = await GetKvPairs(key, cancellationToken, queryOptions).ConfigureAwait(false);
7372
return result != null && UpdateLastIndex(result);
7473
}
7574

76-
private async Task PollForChanges(Action<ConsulWatchExceptionContext> onException)
75+
private async Task PollForChanges(string key, Action<ConsulWatchExceptionContext> onException, CancellationToken cancellationToken)
7776
{
78-
while (!_source.CancellationToken.IsCancellationRequested)
77+
while (!cancellationToken.IsCancellationRequested)
7978
{
8079
try
8180
{
82-
if (await HasValueChanged().ConfigureAwait(false))
81+
if (await HasValueChanged(key, cancellationToken).ConfigureAwait(false))
8382
{
8483
ConfigurationReloadToken previousToken = Interlocked.Exchange(
8584
ref _reloadToken,
@@ -90,7 +89,7 @@ private async Task PollForChanges(Action<ConsulWatchExceptionContext> onExceptio
9089
}
9190
catch (Exception exception)
9291
{
93-
var exceptionContext = new ConsulWatchExceptionContext(_source, exception);
92+
var exceptionContext = new ConsulWatchExceptionContext(cancellationToken, exception);
9493
onException?.Invoke(exceptionContext);
9594
}
9695
}

src/Winton.Extensions.Configuration.Consul/ConsulConfigurationProvider.cs

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.IO;
7+
using System.Linq;
78
using System.Threading.Tasks;
89
using Consul;
910
using Microsoft.Extensions.Configuration;
@@ -32,7 +33,7 @@ public ConsulConfigurationProvider(
3233
if (source.ReloadOnChange)
3334
{
3435
ChangeToken.OnChange(
35-
() => _consulConfigClient.Watch(_source.OnWatchException),
36+
() => _consulConfigClient.Watch(_source.Key, _source.OnWatchException, _source.CancellationToken),
3637
async () =>
3738
{
3839
await DoLoad(true).ConfigureAwait(false);
@@ -53,27 +54,14 @@ public override void Load()
5354
}
5455
}
5556

56-
private Dictionary<string, string> ConvertResultToDictionary(QueryResult<KVPair> queryResult)
57-
{
58-
if (!queryResult.HasValue())
59-
{
60-
return new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
61-
}
62-
63-
using (var configStream = new MemoryStream(queryResult.Value()))
64-
{
65-
return new Dictionary<string, string>(
66-
_source.Parser.Parse(configStream),
67-
StringComparer.OrdinalIgnoreCase);
68-
}
69-
}
70-
7157
private async Task DoLoad(bool reloading)
7258
{
7359
try
7460
{
75-
QueryResult<KVPair> queryResult = await _consulConfigClient.GetConfig().ConfigureAwait(false);
76-
if (!queryResult.HasValue() && !_source.Optional)
61+
QueryResult<KVPair[]> result = await _consulConfigClient
62+
.GetConfig(_source.Key, _source.CancellationToken)
63+
.ConfigureAwait(false);
64+
if (!result.HasValue() && !_source.Optional)
7765
{
7866
if (!reloading)
7967
{
@@ -85,7 +73,10 @@ private async Task DoLoad(bool reloading)
8573
return;
8674
}
8775

88-
Data = ConvertResultToDictionary(queryResult);
76+
Data = (result?.Response ?? new KVPair[0])
77+
.Where(kvp => kvp.HasValue())
78+
.SelectMany(kvp => kvp.ConvertToConfig(_source.Key, _source.Parser))
79+
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value, StringComparer.OrdinalIgnoreCase);
8980
}
9081
catch (Exception exception)
9182
{

src/Winton.Extensions.Configuration.Consul/ConsulConfigurationSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public ConsulConfigurationSource(string key, CancellationToken cancellationToken
4848
public IConfigurationProvider Build(IConfigurationBuilder builder)
4949
{
5050
var consulClientFactory = new ConsulClientFactory(this);
51-
var consulConfigClient = new ConsulConfigurationClient(consulClientFactory, this);
51+
var consulConfigClient = new ConsulConfigurationClient(consulClientFactory);
5252
return new ConsulConfigurationProvider(this, consulConfigClient);
5353
}
5454
}

src/Winton.Extensions.Configuration.Consul/ConsulWatchExceptionContext.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ namespace Winton.Extensions.Configuration.Consul
1111
/// </summary>
1212
public sealed class ConsulWatchExceptionContext
1313
{
14-
internal ConsulWatchExceptionContext(IConsulConfigurationSource source, Exception exception)
14+
internal ConsulWatchExceptionContext(CancellationToken cancellationToken, Exception exception)
1515
{
1616
Exception = exception;
17-
Source = source;
17+
CancellationToken = cancellationToken;
1818
}
1919

2020
/// <summary>
@@ -23,9 +23,8 @@ internal ConsulWatchExceptionContext(IConsulConfigurationSource source, Exceptio
2323
public Exception Exception { get; }
2424

2525
/// <summary>
26-
/// Gets the <see cref="IConsulConfigurationSource" /> of the provider that caused the exception.
27-
/// Can be used to access the <see cref="CancellationToken" /> which can terminate the watcher.
26+
/// Gets the <see cref="CancellationToken" /> for the watch task which can be used to terminate it.
2827
/// </summary>
29-
public IConsulConfigurationSource Source { get; }
28+
public CancellationToken CancellationToken { get; }
3029
}
3130
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) Winton. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See LICENCE in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.IO;
7+
using System.Linq;
8+
using Consul;
9+
using Winton.Extensions.Configuration.Consul.Parsers;
10+
11+
namespace Winton.Extensions.Configuration.Consul.Extensions
12+
{
13+
internal static class KVPairExtensions
14+
{
15+
internal static IEnumerable<KeyValuePair<string, string>> ConvertToConfig(
16+
this KVPair kvPair,
17+
string rootKey,
18+
IConfigurationParser parser)
19+
{
20+
using (Stream stream = new MemoryStream(kvPair.Value))
21+
{
22+
return parser
23+
.Parse(stream)
24+
.Select(pair =>
25+
{
26+
var key = $"{kvPair.Key.TrimEnd('/')}:{pair.Key}"
27+
.Replace('/', ':')
28+
.TrimStart(rootKey.ToCharArray())
29+
.TrimStart(':')
30+
.TrimEnd(':');
31+
if (string.IsNullOrEmpty(key))
32+
{
33+
throw new InvalidKeyPairException(
34+
"The key must not be null or empty. Ensure that there is at least one key under the root of the config or that the data there contains more than just a single value.");
35+
}
36+
37+
return new KeyValuePair<string, string>(key, pair.Value);
38+
});
39+
}
40+
}
41+
42+
internal static bool HasValue(this KVPair kvPair)
43+
{
44+
return kvPair.IsLeafNode() && kvPair.Value != null && kvPair.Value.Any();
45+
}
46+
47+
internal static bool IsLeafNode(this KVPair kvPair)
48+
{
49+
return !kvPair.Key.EndsWith("/");
50+
}
51+
}
52+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) Winton. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See LICENCE in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Net;
8+
using Consul;
9+
using Winton.Extensions.Configuration.Consul.Parsers;
10+
11+
namespace Winton.Extensions.Configuration.Consul.Extensions
12+
{
13+
internal static class KVPairQueryResultExtensions
14+
{
15+
internal static bool HasValue(this QueryResult<KVPair[]> queryResult)
16+
{
17+
return queryResult != null
18+
&& queryResult.StatusCode != HttpStatusCode.NotFound
19+
&& queryResult.Response != null
20+
&& queryResult.Response.Any(kvp => kvp.HasValue());
21+
}
22+
}
23+
}

src/Winton.Extensions.Configuration.Consul/Extensions/KvPairQueryResultExtensions.cs

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/Winton.Extensions.Configuration.Consul/IConsulConfigurationClient.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See LICENCE in the project root for license information.
33

44
using System;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Consul;
78
using Microsoft.Extensions.Primitives;
@@ -12,12 +13,16 @@ namespace Winton.Extensions.Configuration.Consul
1213
internal interface IConsulConfigurationClient
1314
{
1415
/// <summary>Gets the config from consul asynchronously.</summary>
16+
/// <param name="key">The key at which the config is located.</param>
17+
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
1518
/// <returns>A task containing the result of the query for the config.</returns>
16-
Task<QueryResult<KVPair>> GetConfig();
19+
Task<QueryResult<KVPair[]>> GetConfig(string key, CancellationToken cancellationToken);
1720

18-
/// <summary>Watches the config for changes.</summary>
21+
/// <summary>Watches for config changes at a specified key.</summary>
22+
/// <param name="key">The key whose value should be watched for changes.</param>
1923
/// <param name="onException">An action to be invoked if an exception occurs during the watch.</param>
24+
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
2025
/// <returns>An <see cref="IChangeToken" /> that will indicated when changes have occured.</returns>
21-
IChangeToken Watch(Action<ConsulWatchExceptionContext> onException);
26+
IChangeToken Watch(string key, Action<ConsulWatchExceptionContext> onException, CancellationToken cancellationToken);
2227
}
2328
}

0 commit comments

Comments
 (0)