Limiting the number of threads
In our project we faced the problem of how to make running some calculations concurrently. We can simply start each calculation in a separate thread but the problem is how to limit the number of threads.
The first idea was to use ThreadPool from .NET library. We could easily limit the number of threads using ThreadPool.SetMaxThreads(). But here we have another problem: imagine, that our application uses threads somewhere else, or even more - some of the referenced libraries use threads internally. ThreadPool
is a static class and setting its maximal number of concurrent threads may break other stuff.
All we want is to limit the number of threads that we use locally. So, here we come to the necessity of implementing our own local thread pool. Of course, you can use existing implementations such as Smart Thread Pool. Unfortunately, in our case we are limited to .NET 3.5, and adding an additional third-party library is a problem.
After numerous hours of investigating how to get a solution with minimal effort, I wrote the next simple class:
public sealed class ThreadManager
{
private readonly WaitHandle[] _syncObjects;
public ThreadManager() : this(Environment.ProcessorCount)
{
}
public ThreadManager(int maxThreads)
{
MaxThreads = maxThreads;
_syncObjects = new WaitHandle[maxThreads];
for (var i = 0; i < maxThreads; i++)
{
_syncObjects[i] = new AutoResetEvent(true);
}
}
public int MaxThreads { get; private set; }
public void StartTask(Action action)
{
var freeIndex = WaitHandle.WaitAny(_syncObjects);
ThreadPool.QueueUserWorkItem(
state =>
{
action();
((AutoResetEvent)state).Set();
},
_syncObjects[freeIndex]);
}
}
First of all, we don't want to manage each thread, observe when to send it to sleep, wake up, how to reuse, etc. - all that work that ThreatPool
does. So, the approach is very simple: use threads from ThreadPool
, let him do all work for us :), and limit their number using sync objects.
In the class constructor we create the array of sync objects according to the specified number of threads. By default, to achieve maximal performance, the class creates the number of sync objects equal to the number of machine's processors. Each object is in the signaled state from the start.
To run the calculation in a separate thread, we simply call StartTask()
method and pass the action. Inside StartTask()
method the first thing that we do is waiting for any sync object using WaitHandle.WaitAny() method. For sync objects I use AutoResetEvent because WaitHandle.WaitAny()
automatically switches them to a non-signaled state and doing it as an atomic operation, preventing from taking the same sync object by two threads.
StartTask()
blocks code execution until some sync object will be freed (which means that some thread has finished work). Then it takes thread from ThreatPool
, executes passed action in it and switches sync object back to signaled state after finishing.
Lets take a look on the example scenario to understand how it works:
- Created
ThreadManager
with limitation to 2 concurrent threads. - Called
StartTask()
for calculation 1. The first internal sync object became non-signaled. Calculation 1 is executing in a separate thread. - Called
StartTask()
for calculation 2. The second internal sync object became non-signaled. Calculation 2 is executing in a separate thread. - Called
StartTask()
for calculation 3.StartTask()
blocked execution waiting for any sync object with the signaled state. - Calculation 1 finished. Method
Set()
was called on an appropriate sync object and switched it to the signaled state. StartTask()
continued execution, took freed signaled sync object and started calculation 3 in a separate thread.
Such approach to limit threads has some inconvenience:
- we can't check if we have free threads and how many;
- it's quite difficult to test it because ideally we need to inject
ThreadPool
to have the ability to mock it.
To solve (at least, partially) listed problems lets make some improvements:
internal sealed class ThreadManager
{
private readonly WaitHandle[] _syncObjects;
public ThreadManager() : this(Environment.ProcessorCount)
{
}
public ThreadManager(int maxThreads)
{
MaxThreads = maxThreads;
_syncObjects = new WaitHandle[maxThreads];
for (var i = 0; i < maxThreads; i++)
{
_syncObjects[i] = new AutoResetEvent(true);
}
}
public int MaxThreads { get; private set; }
public bool StartTask(Action action, bool wait)
{
var timeout = wait ? Timeout.Infinite : 0;
var freeIndex = WaitHandle.WaitAny(_syncObjects, timeout);
if (freeIndex == WaitHandle.WaitTimeout)
{
return false;
}
ThreadPool.QueueUserWorkItem(
state =>
{
action();
((AutoResetEvent)state).Set();
},
_syncObjects[freeIndex]);
return true;
}
}
From now we can call StartTask()
method with or without waiting for a free thread. We simply use a possibility to specify the timeout in WaitHandle.WaitAny()
. If we pass 0 and no sync objects in the signaled state are available then it returns an index equal to WaitHandle.WaitTimeout
. This modification allows checking for free threads without blocking the execution of the current thread.
Now its much easier to write unit-test to check how ThreadManager
limits number of threads and executes code in separate threads:
[TestFixture]
public class ThreadManagerTests
{
[Test]
[Timeout(2000)]
public void CodeExecutionTest()
{
// Setup
const int maxThreads = 3;
const int tasksNumber = maxThreads + 2;
var manager = new ThreadManager(maxThreads);
var syncObjects = new WaitHandle[tasksNumber];
for (var i = 0; i < tasksNumber; i++)
{
syncObjects[i] = new ManualResetEvent(false);
}
var n = new[] { 0 };
// Execute
for (var i = 0; i < tasksNumber; i++)
{
var syncObjectId = i;
manager.StartTask(
() =>
{
lock (n)
{
n[0]++;
}
((ManualResetEvent)syncObjects[syncObjectId]).Set();
},
true);
}
WaitHandle.WaitAll(syncObjects);
// Verify
Assert.AreEqual(tasksNumber, n[0]);
}
[Test]
[Timeout(2000)]
public void ThreadsUsageTest()
{
// Setup
const int maxThreads = 3;
var manager = new ThreadManager(maxThreads);
var syncObjects1 = new WaitHandle[maxThreads];
var syncObjects2 = new WaitHandle[maxThreads];
for (var i = 0; i < maxThreads; i++)
{
syncObjects1[i] = new ManualResetEvent(false);
syncObjects2[i] = new ManualResetEvent(false);
}
var threadIds = new List<int>();
// Execute
for (var i = 0; i < maxThreads; i++)
{
var syncObjectId = i;
manager.StartTask(
() =>
{
lock (threadIds)
{
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
}
((ManualResetEvent)syncObjects1[syncObjectId]).Set();
syncObjects2[syncObjectId].WaitOne();
},
true);
}
WaitHandle.WaitAll(syncObjects1);
foreach (var waitHandle in syncObjects2)
{
((ManualResetEvent)waitHandle).Set();
}
// Verify
Assert.AreEqual(maxThreads, threadIds.Distinct().Count());
}
[Test]
[Timeout(2000)]
public void ThreadsLimitTest()
{
// Setup
const int maxThreads = 3;
var manager = new ThreadManager(maxThreads);
var syncObjects = new WaitHandle[maxThreads];
for (var i = 0; i < maxThreads; i++)
{
syncObjects[i] = new ManualResetEvent(false);
}
// Execute
for (var i = 0; i < maxThreads; i++)
{
var syncObjectId = i;
manager.StartTask(() => syncObjects[syncObjectId].WaitOne(), true);
}
// Verify
var result = manager.StartTask(() => { }, false);
Assert.IsFalse(result);
((ManualResetEvent)syncObjects[0]).Set();
result = manager.StartTask(() => { }, true);
Assert.IsTrue(result);
}
}
The first test checks that code in each thread really executes. The second test checks that every task runs in a separate thread (we collect thread Ids and check that they all are different). In the third test we verify the ability of ThreadManager
to limit the number of threads.
That's all. We have a pretty simple class for using a limited number of threads, useful in situations when we have to use the old .NET Framework versions.