Skip to content

Commit 49e1af2

Browse files
committed
New ConcurrentForEach library
1 parent 667b2f1 commit 49e1af2

File tree

9 files changed

+361
-0
lines changed

9 files changed

+361
-0
lines changed

Source/Redgate.MicroLibraries.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ULibs.FullExceptionString",
1515
EndProject
1616
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.TinyJsonDeser", "ULibs.TinyJsonDeser\ULibs.TinyJsonDeser.csproj", "{F46791E7-5434-400B-B5B4-6659909C262E}"
1717
EndProject
18+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ULibs.ConcurrentForEach", "ULibs.ConcurrentForEach\ULibs.ConcurrentForEach.csproj", "{B9950135-5A39-4B4C-8A61-D6105CA03396}"
19+
EndProject
1820
Global
1921
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2022
Debug|Any CPU = Debug|Any CPU
@@ -45,6 +47,10 @@ Global
4547
{F46791E7-5434-400B-B5B4-6659909C262E}.Debug|Any CPU.Build.0 = Debug|Any CPU
4648
{F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.ActiveCfg = Release|Any CPU
4749
{F46791E7-5434-400B-B5B4-6659909C262E}.Release|Any CPU.Build.0 = Release|Any CPU
50+
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
51+
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Debug|Any CPU.Build.0 = Debug|Any CPU
52+
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.ActiveCfg = Release|Any CPU
53+
{B9950135-5A39-4B4C-8A61-D6105CA03396}.Release|Any CPU.Build.0 = Release|Any CPU
4854
EndGlobalSection
4955
GlobalSection(SolutionProperties) = preSolution
5056
HideSolutionNode = FALSE
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
using System.Runtime.CompilerServices;
2+
3+
[assembly: InternalsVisibleTo("Ulibs.Tests")]
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
using System;
2+
using System.Collections.Generic;
3+
/***using System.Diagnostics.CodeAnalysis;***/
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace /***$rootnamespace$.***/ULibs.ConcurrentForEach
8+
{
9+
/***[ExcludeFromCodeCoverage]***/
10+
internal static class Concurrent
11+
{
12+
/// <summary>
13+
/// Applies an asynchronous operation to each element in a sequence.
14+
/// </summary>
15+
/// <typeparam name="T">The type of elements in the sequence.</typeparam>
16+
/// <param name="source">The input sequence of elements.</param>
17+
/// <param name="func">The asynchronous operation applied to each element in the sequence. It's strongly
18+
/// recommended that the operation should take care to handle its own expected exceptions.</param>
19+
/// <param name="maxConcurrentTasks">
20+
/// <para>
21+
/// The maximum number of concurrent operations. Must be greater than or equal to 1.
22+
/// </para>
23+
/// <para>
24+
/// The number of simultaneous operations can vary depending on the use case. For cpu intensive
25+
/// operations, consider using <see cref="Environment.ProcessorCount">Environment.ProcessorCount</see>.
26+
/// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
27+
/// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
28+
/// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
29+
/// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
30+
/// different web service for each item, a search for the connection limits used by common web-browsers
31+
/// suggests that a value in the range 10-20 is appropriate.
32+
/// </para>
33+
/// </param>
34+
/// <param name="cancellationToken">Used to cancel the operations.</param>
35+
/// <returns>A task that can be awaited upon for all operations to complete. Awaiting on the task will
36+
/// raise an <see cref="AggregateException"/> if any operation fails, or work is cancelled via the
37+
/// <paramref name="cancellationToken"/>.</returns>
38+
public static Task ForEach<T>(
39+
this IEnumerable<T> source,
40+
Func<T, Task> func,
41+
int maxConcurrentTasks,
42+
CancellationToken cancellationToken)
43+
{
44+
if (func == null) throw new ArgumentNullException(nameof(func));
45+
return source.ForEach((item, _) => func(item), maxConcurrentTasks, cancellationToken);
46+
}
47+
48+
/// <summary>
49+
/// Applies an asynchronous operation to each element in a sequence.
50+
/// </summary>
51+
/// <typeparam name="T">The type of elements in the sequence.</typeparam>
52+
/// <param name="source">The input sequence of elements.</param>
53+
/// <param name="func">The asynchronous operation applied to each element in the sequence. It's strongly
54+
/// recommended that the operation should take care to handle its own expected exceptions.</param>
55+
/// <param name="maxConcurrentTasks">
56+
/// <para>
57+
/// The maximum number of concurrent operations. Must be greater than or equal to 1.
58+
/// </para>
59+
/// <para>
60+
/// The number of simultaneous operations can vary depending on the use case. For cpu intensive
61+
/// operations, consider using <see cref="Environment.ProcessorCount">Environment.ProcessorCount</see>.
62+
/// For operations that invoke the same web service for each item, RFC 7230 suggests that the number
63+
/// of simultaneous requests/connections should be limited (https://tools.ietf.org/html/rfc7230#section-6.4).
64+
/// A search for the connection limits used by common web-browsers suggests that a value in the range 6-8 is
65+
/// appropriate (any more, and you risk triggering abuse detection mechanisms). For operations that invoke a
66+
/// different web service for each item, a search for the connection limits used by common web-browsers
67+
/// suggests that a value in the range 10-20 is appropriate.
68+
/// </para>
69+
/// </param>
70+
/// <param name="cancellationToken">Used to cancel the operations.</param>
71+
/// <returns>A task that can be awaited upon for all operations to complete. Awaiting on the task will
72+
/// raise an <see cref="AggregateException"/> if any operation fails, or work is cancelled via the
73+
/// <paramref name="cancellationToken"/>.</returns>
74+
public static async Task ForEach<T>(
75+
this IEnumerable<T> source,
76+
Func<T, CancellationToken, Task> func,
77+
int maxConcurrentTasks,
78+
CancellationToken cancellationToken)
79+
{
80+
if (maxConcurrentTasks < 1)
81+
throw new ArgumentException("Value cannot be less than 1", nameof(maxConcurrentTasks));
82+
if (source == null) throw new ArgumentNullException(nameof(source));
83+
if (func == null) throw new ArgumentNullException(nameof(func));
84+
85+
using (var semaphore = new SemaphoreSlim(maxConcurrentTasks, maxConcurrentTasks))
86+
{
87+
var tasks = new List<Task>();
88+
foreach (var item in source)
89+
{
90+
// Wait for the next available slot.
91+
try
92+
{
93+
await semaphore.WaitAsync(cancellationToken);
94+
}
95+
catch (OperationCanceledException exception)
96+
{
97+
tasks.Add(Task.FromException(exception));
98+
break;
99+
}
100+
101+
// Discard completed tasks. Not strictly necessary, but keeps the list size down.
102+
tasks.RemoveAll(task => task.IsCompleted);
103+
104+
// Kick-off the next task.
105+
tasks.Add(CreateTask(func, item, cancellationToken).ReleaseSemaphoreOnCompletion(semaphore));
106+
}
107+
108+
await Task.WhenAll(tasks);
109+
}
110+
}
111+
112+
private static Task CreateTask<T>(
113+
Func<T, CancellationToken, Task> func, T item, CancellationToken cancellationToken)
114+
{
115+
try
116+
{
117+
return func(item, cancellationToken);
118+
}
119+
catch (Exception exception)
120+
{
121+
return Task.FromException(exception);
122+
}
123+
}
124+
125+
private static async Task ReleaseSemaphoreOnCompletion(this Task task, SemaphoreSlim semaphore)
126+
{
127+
try
128+
{
129+
await task;
130+
}
131+
finally
132+
{
133+
semaphore.Release();
134+
}
135+
}
136+
}
137+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Provides a way to asynchronously apply an operation to each element in a sequence, whilst limiting the maximum number of concurrent operations.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# ULibs.ConcurrentForEach release notes
2+
3+
## 1.0.0
4+
5+
### Features
6+
7+
- New `ForEach` extension method for `IEnumerable<T>` that applies an asynchronous operation to each element, whilst limiting the maximum number of concurrent operations.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netstandard2.0</TargetFramework>
5+
<Copyright>2018</Copyright>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.DotNet.Analyzers.Compatibility" Version="0.2.12-alpha" />
10+
</ItemGroup>
11+
12+
</Project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?xml version="1.0"?>
2+
<!-- http://docs.nuget.org/docs/reference/nuspec-reference -->
3+
<package >
4+
<metadata>
5+
<id>RedGate.ULibs.ConcurrentForEach.Sources</id>
6+
<version>$version$</version>
7+
<authors>red-gate</authors>
8+
<owners>red-gate</owners>
9+
<requireLicenseAcceptance>false</requireLicenseAcceptance>
10+
<summary><![CDATA[$summary$]]></summary>
11+
<description><![CDATA[$description$]]></description>
12+
<releaseNotes><![CDATA[$releaseNotes$]]></releaseNotes>
13+
<copyright>Copyright $copyrightYear$ Red Gate Software Ltd</copyright>
14+
<projectUrl>https://github.com/red-gate/MicroLibraries</projectUrl>
15+
</metadata>
16+
<files>
17+
<file src="*.pp" target="content\App_Packages\RedGate.ULibs.ConcurrentForEach.$version$"/>
18+
</files>
19+
</package>
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using NUnit.Framework;
7+
using ULibs.ConcurrentForEach;
8+
9+
namespace Ulibs.Tests.ConcurrentForEach
10+
{
11+
[TestFixture]
12+
public class ConcurrentTests
13+
{
14+
private readonly Random _random = new Random();
15+
16+
private int GetRandomDelay()
17+
{
18+
lock (_random)
19+
{
20+
return 50 + _random.Next(50);
21+
}
22+
}
23+
24+
[Test]
25+
public void ForEach_ArgChecks()
26+
{
27+
IEnumerable<int> sequence = Enumerable.Range(0, 10);
28+
IEnumerable<int> nullSequence = null;
29+
30+
Func<int, Task> operation = i => Task.CompletedTask;
31+
Func<int, Task> nullOperation = null;
32+
33+
Func<int, CancellationToken, Task> cancellableOperation = (i, c) => Task.CompletedTask;
34+
Func<int, CancellationToken, Task> nullCancellableOperation = null;
35+
36+
Assert.That(() => sequence.ForEach(nullOperation, 1, CancellationToken.None), Throws.ArgumentNullException);
37+
Assert.That(() => sequence.ForEach(nullCancellableOperation, 1, CancellationToken.None), Throws.ArgumentNullException);
38+
Assert.That(() => nullSequence.ForEach(operation, 1, CancellationToken.None), Throws.ArgumentNullException);
39+
Assert.That(() => nullSequence.ForEach(cancellableOperation, 1, CancellationToken.None), Throws.ArgumentNullException);
40+
Assert.That(() => sequence.ForEach(operation, 0, CancellationToken.None), Throws.ArgumentException);
41+
Assert.That(() => sequence.ForEach(cancellableOperation, 0, CancellationToken.None), Throws.ArgumentException);
42+
}
43+
44+
[Test]
45+
public void ForEach_ExecutesTheOperationAgainstEachElement()
46+
{
47+
// ARRANGE
48+
var count = 0;
49+
Task Operation(int value) => Task.Run(() => Interlocked.Increment(ref count));
50+
var sequence = Enumerable.Range(0, 10);
51+
52+
// ACT
53+
var task = sequence.ForEach(Operation, 2, CancellationToken.None);
54+
55+
// ASSERT
56+
Assert.That(task.Wait(2000), Is.True);
57+
Assert.That(count, Is.EqualTo(10));
58+
}
59+
60+
[Test]
61+
public void ForEach_LimitsTheNumberOfSimultaneousOperations()
62+
{
63+
// ARRANGE
64+
var currentOperationCount = 0;
65+
var maxOperationCount = 0;
66+
var monitor = new object();
67+
async Task Operation(int value)
68+
{
69+
lock (monitor)
70+
{
71+
currentOperationCount++;
72+
maxOperationCount = Math.Max(maxOperationCount, currentOperationCount);
73+
}
74+
await Task.Delay(GetRandomDelay());
75+
lock (monitor)
76+
{
77+
currentOperationCount--;
78+
}
79+
}
80+
var sequence = Enumerable.Range(0, 10);
81+
82+
// ACT
83+
var task = sequence.ForEach(Operation, 2, CancellationToken.None);
84+
85+
// ASSERT
86+
Assert.That(task.Wait(2000), Is.True);
87+
Assert.That(maxOperationCount, Is.EqualTo(2));
88+
}
89+
90+
[Test]
91+
public void ForEach_ExecutesTheOperationAgainstAllElements_InSpiteOfExceptions()
92+
{
93+
// ARRANGE
94+
var count = 0;
95+
96+
async Task Operation(int value)
97+
{
98+
Interlocked.Increment(ref count);
99+
if (value % 5 == 0)
100+
{
101+
// Sometimes the func should fail immediately.
102+
throw new ApplicationException("Error");
103+
}
104+
await Task.Delay(GetRandomDelay());
105+
if (value % 2 == 0)
106+
{
107+
// Sometimes the func should fail after the first await.
108+
throw new ApplicationException("Error");
109+
}
110+
}
111+
var sequence = Enumerable.Range(0, 10);
112+
113+
// ACT
114+
var task = sequence.ForEach(Operation, 2, CancellationToken.None);
115+
116+
// ASSERT
117+
Assert.That(() => Assert.That(task.Wait(2000), Is.True),
118+
Throws.InstanceOf<AggregateException>());
119+
Assert.That(count, Is.EqualTo(10));
120+
}
121+
122+
[Test]
123+
public void ForEach_AbortsWhenTheCancellationTokenIsCancelled_WhereEachTaskReceivesTheCancellationSignal()
124+
{
125+
// ARRANGE
126+
var count = 0;
127+
Task Operation(int value, CancellationToken token) => Task.Run(async () =>
128+
{
129+
Interlocked.Increment(ref count);
130+
await Task.Delay(GetRandomDelay(), token);
131+
});
132+
var sequence = Enumerable.Range(0, 10);
133+
var cancellationTokenSource = new CancellationTokenSource(200);
134+
var cancellationToken = cancellationTokenSource.Token;
135+
136+
// ACT
137+
var task = sequence.ForEach(Operation, 2, cancellationToken);
138+
139+
// ASSERT
140+
Assert.That(() => Assert.That(task.Wait(2000), Is.True),
141+
Throws.InstanceOf<AggregateException>());
142+
Assert.That(count, Is.GreaterThan(0).And.LessThan(10));
143+
}
144+
145+
[Test]
146+
public void ForEach_AbortsWhenTheCancellationTokenIsCancelled_WhereNoTaskReceivesTheCancellationSignal()
147+
{
148+
// ARRANGE
149+
var count = 0;
150+
Task Operation(int value) => Task.Run(async () =>
151+
{
152+
Interlocked.Increment(ref count);
153+
await Task.Delay(GetRandomDelay());
154+
});
155+
var sequence = Enumerable.Range(0, 10);
156+
var cancellationTokenSource = new CancellationTokenSource(200);
157+
var cancellationToken = cancellationTokenSource.Token;
158+
159+
// ACT
160+
var task = sequence.ForEach(Operation, 2, cancellationToken);
161+
162+
// ASSERT
163+
Assert.That(() => Assert.That(task.Wait(2000), Is.True),
164+
Throws.InstanceOf<AggregateException>());
165+
Assert.That(count, Is.GreaterThan(0).And.LessThan(10));
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)