Skip to content

Commit 2b1570e

Browse files
committed
Change dequeue behavior to improve resource utilization.
Previously, TryDequeue would dequeue only the first item in the ready queue, and only if the predicate accepted it. Now, TryDequeue will scan the entire ready queue and dequeue the first entry that the predicate accepts.
1 parent e659df6 commit 2b1570e

4 files changed

Lines changed: 53 additions & 51 deletions

File tree

DependencyQueue.Tests/DependencyQueueEntryQueueViewTests.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace DependencyQueue;
55

6-
using Collection = Queue<DependencyQueueEntry<Value>>;
6+
using Collection = PredicateQueue<DependencyQueueEntry<Value>>;
77
using View = DependencyQueueEntryQueueView<Value>;
88
using Inner = DependencyQueueEntry<Value>;
99
using Outer = DependencyQueueEntry<Value>.View;
@@ -25,7 +25,6 @@ public void Peek()
2525
h.View.Invoking(v => v.Peek()).Should().Throw<ObjectDisposedException>();
2626
}
2727

28-
#if NETCOREAPP
2928
[Test]
3029
public void TryPeek()
3130
{
@@ -41,7 +40,6 @@ public void TryPeek()
4140

4241
h.View.Invoking(v => v.Peek()).Should().Throw<ObjectDisposedException>();
4342
}
44-
#endif
4543

4644
private protected override Inner ItemA { get; } = new Entry("a");
4745
private protected override Inner ItemB { get; } = new Entry("b");

DependencyQueue.Tests/DependencyQueueTests.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -744,43 +744,45 @@ async Task WorkerMainAsync(Context_ context)
744744
}
745745

746746
[Test]
747-
public async Task RunAsync_Funky()
747+
public async Task RunAsync_UsingPredicate()
748748
{
749+
const int
750+
WorkerSpecificEntryCount = 4,
751+
Parallelism = 6;
752+
749753
var entryA = Entry("a", requires: Items("b"));
750754
var entryB0 = Entry("b0", provides: Items("a", "b"));
751755
var entryB1 = Entry("b1", provides: Items("a", "b"));
752756
var entryB2 = Entry("b2", provides: Items("a", "b"));
753757
var entryB3 = Entry("b3", provides: Items("a", "b"));
754-
var entryB4 = Entry("b4", provides: Items("a", "b"));
755-
var entryB5 = Entry("b5", provides: Items("a", "b"));
756-
var entryB6 = Entry("b6", provides: Items("a", "b"));
757-
var entryB7 = Entry("b7", provides: Items("a", "b"));
758758
var entryC = Entry("c", requires: Items("a"));
759759

760-
var entriesB = new[] { entryB0, entryB1, entryB2, entryB3, entryB4, entryB5, entryB6, entryB7 };
760+
var entriesB = Items(entryB0, entryB1, entryB2, entryB3);
761761

762-
using var queue = Queue(entryA, entryB0, entryB1, entryB2, entryB3, entryB4, entryB5, entryB6, entryB7, entryC);
762+
using var queue = Queue(entryA, entryB0, entryB1, entryB2, entryB3, entryC);
763763
using var cts = new CancellationTokenSource();
764764

765765
queue.Should().BeValid();
766766

767767
async Task WorkerMainAsync(Context_ context)
768768
{
769-
var takeable = context.WorkerId is var n and <= 8
770-
? new[] { entryA.Value, entriesB[n - 1].Value, entryC.Value }
771-
: new[] { entryA.Value, entryC.Value };
769+
var takeable = context.WorkerId is var n and <= WorkerSpecificEntryCount
770+
? Items(entryA.Value, entriesB[n - 1].Value, entryC.Value)
771+
: Items(entryA.Value, entryC.Value);
772772

773773
for (;;)
774774
{
775-
await Task.Delay((17 - context.WorkerId) * 10);
776-
var entry = await context.GetNextEntryAsync(takeable.Contains);
775+
// Slow down workers with lower IDs to increase chances of the
776+
// predicate rejecting entries
777+
await Task.Delay((Parallelism - context.WorkerId + 1) * 10); // ms
778+
var entry = await context.GetNextEntryAsync(predicate: takeable.Contains);
777779
if (entry is null) break;
778780
}
779781
}
780782

781783
var data = new Data();
782784

783-
await queue.RunAsync(WorkerMainAsync, data, parallelism: 16, cancellation: cts.Token);
785+
await queue.RunAsync(WorkerMainAsync, data, Parallelism, cts.Token);
784786

785787
queue.Should().HaveReadyEntries(/*none*/);
786788
queue.Should().HaveTopicCount(0);

DependencyQueue/DependencyQueue.cs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace DependencyQueue;
1313
public class DependencyQueue<T> : IDependencyQueue<T>, IDisposable
1414
{
1515
// Entries that are ready to dequeue
16-
private readonly Queue<DependencyQueueEntry<T>> _ready;
16+
private readonly PredicateQueue<DependencyQueueEntry<T>> _ready;
1717

1818
// Topics keyed by name
1919
private readonly Dictionary<string, DependencyQueueTopic<T>> _topics;
@@ -50,7 +50,7 @@ public DependencyQueue(StringComparer? comparer = null)
5050
/// <summary>
5151
/// Gets the collection of entries that are ready to dequeue.
5252
/// </summary>
53-
internal Queue<DependencyQueueEntry<T>> ReadyEntries => _ready;
53+
internal PredicateQueue<DependencyQueueEntry<T>> ReadyEntries => _ready;
5454

5555
/// <summary>
5656
/// Gets the dictionary that maps topic names to topics.
@@ -145,6 +145,8 @@ public void Enqueue(DependencyQueueEntry<T> entry)
145145
if (!_isValid)
146146
throw Errors.NotValid();
147147

148+
predicate ??= AcceptAny;
149+
148150
using var @lock = _monitor.Acquire();
149151

150152
for (;;)
@@ -157,18 +159,15 @@ public void Enqueue(DependencyQueueEntry<T> entry)
157159
if (!_topics.Any())
158160
return null;
159161

160-
// Check if the ready queue has an entry to dequeue
161-
if (_ready.Any())
162-
// Check if caller accepts the entry
163-
if (predicate is null || predicate(_ready.Peek().Value))
164-
// Dequeue it
165-
return _ready.Dequeue();
162+
// Check if the ready queue has an entry to dequeue that the caller accepts
163+
if (_ready.TryDequeue(GetValue, predicate, out var entry))
164+
return entry;
166165

167166
// Some entries are in progress, and either there are no more ready
168-
// entries, or the predicate rejected the next ready entry. Wait
169-
// for in-progress entries to complete and unblock some ready
170-
// entry(ies), or for one second to elapse, after which the
171-
// predicate might change its mind.
167+
// entries, or the predicate rejected all of them. Wait for any
168+
// in-progress entries to complete and unblock some ready entries,
169+
// or for one second to elapse, after which the predicate might
170+
// change its mind.
172171
@lock.ReleaseUntilPulse(OneSecond);
173172
}
174173
}
@@ -214,6 +213,8 @@ public void Enqueue(DependencyQueueEntry<T> entry)
214213
if (!_isValid)
215214
throw Errors.NotValid();
216215

216+
predicate ??= AcceptAny;
217+
217218
using var @lock = await _monitor.AcquireAsync(cancellation);
218219

219220
for (;;)
@@ -226,22 +227,25 @@ public void Enqueue(DependencyQueueEntry<T> entry)
226227
if (!_topics.Any())
227228
return null;
228229

229-
// Check if the ready queue has an entry to dequeue
230-
if (_ready.Any())
231-
// Check if caller accepts the entry
232-
if (predicate is null || predicate(_ready.Peek().Value))
233-
// Dequeue it
234-
return _ready.Dequeue();
230+
// Check if the ready queue has an entry to dequeue that the caller accepts
231+
if (_ready.TryDequeue(GetValue, predicate, out var entry))
232+
return entry;
235233

236234
// Some entries are in progress, and either there are no more ready
237-
// entries, or the predicate rejected the next ready entry. Wait
238-
// for in-progress entries to complete and unblock some ready
239-
// entry(ies), or for one second to elapse, after which the
240-
// predicate might change its mind.
235+
// entries, or the predicate rejected all of them. Wait for any
236+
// in-progress entries to complete and unblock some ready entries,
237+
// or for one second to elapse, after which the predicate might
238+
// change its mind.
241239
await @lock.ReleaseUntilPulseAsync(OneSecond, cancellation);
242240
}
243241
}
244242

243+
private static readonly Func<DependencyQueueEntry<T>, T>
244+
GetValue = e => e.Value;
245+
246+
private static readonly Func<T, bool>
247+
AcceptAny = _ => true;
248+
245249
/// <summary>
246250
/// Marks the specified entry as done.
247251
/// </summary>

DependencyQueue/DependencyQueueEntryQueueView.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ namespace DependencyQueue;
1313
public readonly struct DependencyQueueEntryQueueView<T>
1414
: IReadOnlyCollection<DependencyQueueEntry<T>.View>
1515
{
16-
private readonly Queue<DependencyQueueEntry<T>> _queue;
17-
private readonly AsyncMonitor.Lock _lock;
16+
private readonly PredicateQueue<DependencyQueueEntry<T>> _queue;
17+
private readonly AsyncMonitor.Lock _lock;
1818

1919
internal DependencyQueueEntryQueueView(
20-
Queue<DependencyQueueEntry<T>> queue,
21-
AsyncMonitor.Lock @lock)
20+
PredicateQueue<DependencyQueueEntry<T>> queue,
21+
AsyncMonitor.Lock @lock)
2222
{
2323
_queue = queue;
2424
_lock = @lock;
@@ -27,7 +27,7 @@ internal DependencyQueueEntryQueueView(
2727
/// <summary>
2828
/// Gets the underlying queue.
2929
/// </summary>
30-
internal Queue<DependencyQueueEntry<T>> Queue => _queue;
30+
internal PredicateQueue<DependencyQueueEntry<T>> Queue => _queue;
3131

3232
/// <inheritdoc/>
3333
/// <exception cref="ObjectDisposedException">
@@ -42,7 +42,7 @@ public int Count
4242
}
4343
}
4444

45-
/// <inheritdoc cref="Queue{T}.Peek" />
45+
/// <inheritdoc cref="PredicateQueue{T}.Peek" />
4646
/// <exception cref="ObjectDisposedException">
4747
/// The underlying lock has been released.
4848
/// </exception>
@@ -52,8 +52,7 @@ public DependencyQueueEntry<T>.View Peek()
5252
return new(_queue.Peek(), _lock);
5353
}
5454

55-
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP
56-
/// <inheritdoc cref="Queue{T}.TryPeek(out T)" />
55+
/// <inheritdoc cref="PredicateQueue{T}.TryPeek(out T)" />
5756
/// <exception cref="ObjectDisposedException">
5857
/// The underlying lock has been released.
5958
/// </exception>
@@ -64,7 +63,6 @@ public bool TryPeek([MaybeNullWhen(false)] out DependencyQueueEntry<T>.View resu
6463
? (r: true, result = new(obj, _lock)).r
6564
: (r: false, result = default).r;
6665
}
67-
#endif
6866

6967
/// <inheritdoc cref="IEnumerable{T}.GetEnumerator"/>
7068
/// <exception cref="ObjectDisposedException">
@@ -97,12 +95,12 @@ IEnumerator IEnumerable.GetEnumerator()
9795
/// </summary>
9896
public struct Enumerator : IEnumerator<DependencyQueueEntry<T>.View>
9997
{
100-
private Queue<DependencyQueueEntry<T>>.Enumerator _enumerator;
101-
private readonly AsyncMonitor.Lock _lock;
98+
private PredicateQueue<DependencyQueueEntry<T>>.Enumerator _enumerator;
99+
private readonly AsyncMonitor.Lock _lock;
102100

103101
internal Enumerator(
104-
Queue<DependencyQueueEntry<T>>.Enumerator enumerator,
105-
AsyncMonitor.Lock @lock)
102+
PredicateQueue<DependencyQueueEntry<T>>.Enumerator enumerator,
103+
AsyncMonitor.Lock @lock)
106104
{
107105
_enumerator = enumerator;
108106
_lock = @lock;

0 commit comments

Comments
 (0)