Category Archives: Multi-threading

Creating a TaskScheduler

In my previous post, ExecutorService based multi-threading in Java, I looked at the ExectorService which allows us to create our own fixed size threadpool which can, at times, be extremely useful.

As part of a port of some Java code (which used the ExectorService) to C# I had a need for a similar class in .NET and came across the LimitedConcurrencyLevelTaskScheduler which is part of the Samples for Parallel Programming with the .NET Framework. The code and example of this class can also be found here, TaskScheduler Class.

The Task factory’s StartNew method comes with an overload which takes a TaskScheduler class and this is where the LimitedConcurrencyLevelTaskScheduler is used. For example

var taskScheduler = new LimitedConcurrencyLevelTaskScheduler(3);
var tsk = Task.Factory.StartNew(() =>
{
   // do something
}, 
CancellationToken.None, 
TaskCreationOptions.None, 
taskScheduler);

So, if we’re in a situation where we need to create a new TaskScheduler then we simply implement a TaskScheduler class, here’s the required overrides.

public class MyTaskScheduler : TaskScheduler
{
   protected override IEnumerable<Task> GetScheduledTasks()
   {
   }

   protected override void QueueTask(Task task)
   {
   }

   protected override bool TryExecuteTaskInline(
      Task task, bool taskWasPreviouslyQueued)
   {
   }
}

The QueueTask method is called when a Task is added to the scheduler. In the case of an implementation such as LimitedConcurrencyLevelTaskScheduler, this is where we add a task to a queue prior to executing the task.

TryExecuteTaskInline is called when trying to execute a task on the current thread, it’s passed a Boolean taskWasPreviouslyQueued which denotes whether the task has already been queued.

GetScheduledTasks is called to get an enumerable of the tasks currently scheduled within our TaskScheduler.

Optionally, we may wish to override the MaximumConcurrencyLevel property which stipulates the maximum concurrency level supported (i.e. number of threads), the default is MaxValue. Also where we might be maintaining a queue of tasks, we wold want to override the TryDequeue method for when the scheduler tries to remove a previously scheduled task.

Obviously implementing the TaskScheduler will also require the developer to handle any thread synchronization etc. within the implementation.

Pulse and Wait

The Monitor class contains the static methods Pulse and Wait. Well it has more than those two ofcourse, but these are the two this post is interested in.

Both methods must be enclosed in a lock or more explicitly the object that we pass to Pulse and Wait must have had a lock acquired against it. For example

private readonly object sync = new object();

// thread A
lock(sync)
{
   Monitor.Wait(sync);
}

// thread B
lock(sync)
{
   Monitor.Pulse(sync);
}

In the above code, we have two threads. Thread A acquires a lock on sync and then enters a Wait, at which point the lock is released but the thread now blocks until it reacquires the lock. Meanwhile, thread B acquires the lock on the sync object. It calls Monitor.Pulse which basically moved the waiting thread (thread A) to the ready queue and when thread B releases the lock (i.e. exits the lock block) then the next ready thread (in this case thread A) reacquires the lock and any code after the Monitor.Wait would be executed until it exits the lock block and releases the lock.

Producer-consumer pattern

Okay, I’ll admit to a lack of imagination – the producer consumer pattern also known as the producer consumer queue, is a standard sample for showing Pulse and Wait in use and the excellent Threading C# posts by Joseph Albahari are probably the best place to look for all C# threading information, but we’re going to walk through the producer consumer queue here anyway.

So the producer consumer queue is simply put, a queue whereby multiple threads may add to the queue and as data is added a thread within the queue does something with the data, see Producer–consumer problem.

This example creates one or more threads (as specified in the threadCount) which will be used to process of items from the queue.

The Enqueue method locks then adds the action to the queue and then pulses. The “processing” threads wait for the lock on the sync object being released and makes sure there are still items in the queue (otherwise the threads goes into a wait again). Assuming there is an item in the queue we get the item within the lock to ensure no other thread will have removed it then we release the lock and invoke the action before checking for more items to be processed.

public class ProducerConsumerQueue
{
   private readonly Task[] tasks;
   private readonly object sync = new object();
   private readonly Queue<Action> queue;

   public ProducerConsumerQueue(int threadCount, CancellationToken cancellationToken)
   {
      Contract.Requires(threadCount > 0);

      queue = new Queue<Action>();
      cancellationToken.Register(() => Close(false));

      tasks = new Task[threadCount];
      for (int i = 0; i < threadCount; i++)
      {
         tasks[i] = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
      }
   }

   public void Enqueue(Action action)
   {
      lock (sync)
      {
         queue.Enqueue(action);
         Monitor.Pulse(sync);
      }
   }

   public void Close(bool waitOnCompletion = true)
   {
      for (int i = 0; i < tasks.Length; i++)
      {
         Enqueue(null);
      }
      if (waitOnCompletion)
      {
         Task.WaitAll(tasks);
      }
   }

   private void Process()
   {
      while (true)
      {
         Action action;
         lock (sync)
         {
            while(queue.Count == 0)
            {
                Monitor.Wait(sync);
            }
            action = queue.Dequeue();
         }
         if (action == null)
         {
            break;
         }
         action();
      }
   }
}

Here’s a simple sample of code to interact with the ProducerConsumerQueue

CancellationTokenSource cts = new CancellationTokenSource();

ProducerConsumerQueue pc = new ProducerConsumerQueue(1, cts.Token);
pc.Enqueue(() => Console.WriteLine("1"));
pc.Enqueue(() => Console.WriteLine("2"));
pc.Enqueue(() => Console.WriteLine("3"));
pc.Enqueue(() => Console.WriteLine("4"));

// various ways to exit the queue in an orderly manner
cts.Cancel();
//pc.Enqueue(null);
//pc.Close();

So in this code we create a ProducerConsumerQueue with a single thread, actions are added to the queue via Enqueue and as the ProducerConsumerQueue has only a single thread and as all the items were added from a single thread, each item is simply invoked in the order they were added to the queue. However we could have been adding from multiple threads as the ProducerConsumerQueue is thread safe. Had we created the ProducerConsumerQueue with multiple threads then the order of processing may also be different.

Waiting for a specified number of threads using the CountdownEvent Class

In a previous post I discussed the Barrier class. The CountdownEvent class is closely related to the Barrier in that it can be used to block until a set number of signals have been received.

One thing you’ll notice is that the signal on the Barrier is tied to a wait, in other words you
SignalAndWait on a thread, whereas the CountdownEvent has a Signal and a separate Wait method, hence threads could signal they’ve reach a point then continue anyway or call Wait after they call Signal equally a thread could signal multiple times.

The biggest difference with the Barrier class is that the CountdownEvent does not automatically reset once the specified number of signals have been received. So, once the CountdownEvent counter reaches zero any attempt to Signal it will result in an InvalidOperationException.

You can increment the participants (or in this case we say that we’re incrementing the count) at runtime, but only before the counter reaches zero.

Time for some sample code

class Program
{
   static CountdownEvent ev = new CountdownEvent(5);
   static Random random = new Random();

   static void Main()
   {
      Task horse1 = Task.Run(() => 
            GetReadyToRace("Horse1", random.Next(1, 1000)));
      Task horse2 = Task.Run(() => 
            GetReadyToRace("Horse2", random.Next(1, 1000)));
      Task horse3 = Task.Run(() => 
            GetReadyToRace("Horse3", random.Next(1, 1000)));
      Task horse4 = Task.Run(() => 
            GetReadyToRace("Horse4", random.Next(1, 1000)));
      Task horse5 = Task.Run(() => 
            GetReadyToRace("Horse5", random.Next(1, 1000)));

      Task.WaitAll(horse1, horse2, horse3, horse4, horse5);
   }

   static void GetReadyToRace(string horse, int speed)
   {
      Console.WriteLine(horse + " arrives at the race course");
      ev.Signal();

      // wait a random amount of time before the horse reaches the starting gate
      Task.Delay(speed);
      Console.WriteLine(horse + " arrive as the start gate");
      ev.Wait();
  }
}

In the above code, each thread signals when it “arrives at the race course”, then we have a delay for effect and then we wait for all threads to signal. The CountdownEvent counter is decremented for each signal and threads wait until the final thread signals the CountdownEvent at which time the threads are unblocked and able to complete.

As stated previously unlike the Barrier once all signals have been received there’s no reset of the CountDownEvent, it has simply finished it’s job.

Using ThreadStatic and ThreadLocal

First off these are two different objects altogether, ThreadStatic is actually ThreadStaticAttribute and you mark a static variable with the attribute. Whereas ThreadLocal is a generic class.

So why are we looking at both in the one post?

Both ThreadStatic and ThreadLocal are used to allow us to declare thread specific values/variables.

ThreadStatic

A static variable marked with the ThreadStatic attribute is not shared between threads, therefore each thread gets it’s own instance of the static variable.

Let’s look at some code

[ThreadStatic]
static int value;

static void Main()
{
   Task t1 = Task.Run(() =>
   {
      value++;
      Console.WriteLine("T1: " + value);
   });
   Task t2 = Task.Run(() =>
   {
      value++;
      Console.WriteLine("T2: " + value);
   });
   Task t3 = Task.Run(() =>
   {
      value++;
      Console.WriteLine("T3: " + value);
   });

   Task.WaitAll(t1, t2, t3);
}

The output from this (obviously the ordering may be different) is

T3: 1
T1: 1
T2: 1

One thing to watch out for is that if we initialize the ThreadStatic variable, for example if we write the following

[ThreadStatic]
static int value = 10;

you need to be aware that this is initialized only on the thread it’s declared on, all the threads which use value will get a variable initialised with it’s default value, i.e. 0.

For example, if we change the code very slightly to get

[ThreadStatic]
static int value = 10;

static void Main()
{
   Task t1 = Task.Run(() =>
   {
      value++;
      Console.WriteLine("T1: " + value);
   });
   Task t2 = Task.Run(() =>
   {
      value++;
      Console.WriteLine("T2: " + value);
   });
   Task t3 = Task.Run(() =>
   {
      value++;
      Console.WriteLine("T3: " + value);
   });

   Console.WriteLine("Main thread: " + value);
   
   Task.WaitAll(t1, t2, t3);
}

The output will look something like

Main thread: 10
T2: 1
T1: 1
T3: 1

Finally as, by definition each variable is per thread, operations upon the instance of the variable are thread safe.

ThreadLocal

Like the ThreadStatic attribute, the ThreadLocal class allows us to declare a variable which is not shared between threads, one of the extra capabilities of this class is that we can initialize each instance of the variable as the class the supplied factory method to create and/or initialize the value to be returned. Also, unlike ThreadStatic which only works on static fields, ThreadLocal can be applied to static or instance variables.

Let’s look at the code

static void Main()
{
   ThreadLocal<int> local = new ThreadLocal<int>(() =>
   {
      return 10;
   });

   Task t1 = Task.Run(() =>
   {
      local.Value++;
      Console.WriteLine("T1: " + local.Value);
   });
   Task t2 = Task.Run(() =>
   {
      local.Value++;
      Console.WriteLine("T2: " + local.Value);
   });
   Task t3 = Task.Run(() =>
   {
      local.Value++;
      Console.WriteLine("T3: " + local.Value);
   });

   Task.WaitAll(t1, t2, t3);
   local.Dispose();
}

The output order may be different, but the output will read something like

T2: 11
T3: 11
T1: 11

As you can see, each thread altered their own instance of the thread local variable and more over we were able to set the default value to 10.

The ThreadLocal class implements IDisposable, so we should Dispose of it when we’ve finished with it.

Apart from Dispose all methods and protected members are thread safe.

Waiting for a specified number of threads using the Barrier Class

First off, if you are looking for the definitive online resource on threading in C#, don’t waste your time here. Go to Threading in C# by Joseph Albahari. This is without doubt the best resource anywhere on C# threading, in my opinion (for what it’s worth).

This said, I’m still going to create this post for my own reference

The Barrier Class

The Barrier class is a synchronization class which allows us to block until a set number of threads have signaled the Barrier. Each thread will signal and wait and thus be blocked by the barrier until the set number of signals has been reach at which time they will all be being unblocked, allowing them to continue. Unlike the CountdownEvent, when the Barrier has been signalled the specified number of times it resets and can block again and again awaiting for specified number of threads to signal.

Where would a synchronization class be without a real world analogy – so, let’s assume we are waiting for a known number of race horses (our threads) to arrive at a start barrier (our Barrier class) and once they all arrive we can release them. Only when they’ve all reached the end line (the Barrier class again) do we output the result of the race.

Let’s look at some code (note: this sample can just be dropped straight into a console app’s Program class)

static Barrier barrier = new Barrier(5, b => 
         Console.WriteLine("--- All horse have reached the barrier ---"));
static Random random = new Random();

static void Main()
{
   Task horse1 = Task.Run(() => Race("Horse1", random.Next(1, 1000)));
   Task horse2 = Task.Run(() => Race("Horse2", random.Next(1, 1000)));
   Task horse3 = Task.Run(() => Race("Horse3", random.Next(1, 1000)));
   Task horse4 = Task.Run(() => Race("Horse4", random.Next(1, 1000)));
   Task horse5 = Task.Run(() => Race("Horse5", random.Next(1, 1000)));

   // this is solely here to stop the app closing until all threads have completed
   Task.WaitAll(horse1, horse2, horse3, horse4, horse5);
}

static void Race(string horse, int speed)
{
   Console.WriteLine(horse + " is at the start gate");
   barrier.SignalAndWait();

   // wait a random amount of time before the horse reaches the finish line
   Task.Delay(speed);
   Console.WriteLine(horse + " reached finishing line");
   barrier.SignalAndWait();
}

Maybe I’ve taken the horse race analogy a little too far :)

Notice that the Barrier constructor allows us to add an action which is executed after each phase, i.e. when the specified number of threads have signalled the barrier.

We can add participants to the barrier at runtime, so if we were initially waiting for three threads and these created another three threads (for example), we could then notify the barrier that we want to add the three more threads using the AddParticipant or AddParticipants methods. Likewise we could reduce the participant count using RemoveParticipant or RemoveParticipants.

As you can see from the code above, when a thread wishes to signal it’s completed a phase of it’s processing it calls the SignalAndWait method on the barrier. With, as the name suggests, signals the Barrier class then waits until the Barrier releases it.

Mutex – Running a single instance of an application

Occasionally you might want to create an application that can only have a single instance running on a machine at a time. For example, maybe you’ve a tray icon application or maybe an application that caches data for other applications to use etc.

We can achieve this by using a Mutex. A Mutex is very much like a Monitor/lock but can exist across multiple processes.

class Program
{
   static void Main(string[] args)
   {
      using(Mutex mutex = new Mutex(false, "some_unique_application_key"))
      {
         if(!mutex.WaitOne(TimeSpan.FromSeconds(3), false))
         {
            // another instance of the application must be running so exit
            return;
         }
         // Put code to run the application here
      }
   }
}

So in the above code we create a mutex with a unique key (“some_unique_application_key”). It’s best to use something like a URL and application name, so maybe “putridparrot.com\MyApp” as an example. The mutex is held onto for the life of the application for obvious reasons.

Next we try to WaitOne, we need a timeout on this otherwise we’ll just block at this point until the other instance closes down (or more specifically the Mutex on the other instance is released). So choose a time span of your choice. If the timeout occurs then false is returned from WaitOne and we can take it that another instance of the application is running, so we exit this instance of the application.

On the other hand if (within the timeout) WaitOne returns true then no other instance of the application (well more specifically the named Mutex) exists and we can go ahead and do whatever we need to run our application.

Semaphore & SemaphoreSlim

Basically a Semaphore allows us to set the initial count and the maximum number of threads than can enter a critical section.

The standard analogy of how a Semaphore works is the nightclub and bouncer analogy. So a nightclub has a capacity of X and the bouncer stops any clubbers entering once the nightclub reaches it’s capacity. Those clubbers can only enter when somebody leaves the nightclub.

In the case of a Semaphore we use WaitOne (or Wait on SemaphoreSlim) to act as the “bouncer” to the critical section and Release to inform the “bouncer” that a thread has left the critical section. For example

private readonly Semaphore semaphore = new Semaphore(3, 3);

private void MyCriticalSection(object o)
{
    sempahore.WaitOne();
    // do something
    semaphore.Release();
}

In the code above, say our calling method creates 20 threads all running the MyCriticalSection code, i.e.

for (int i = 0; i < 20; i++)
{
   Thread thread = new Thread(Run);
   thread.Start();
}

What happens is that the first three threads to arrive at semaphore.WaitOne will be allowed access to the code between the WaitOne and Release. All other threads block until one or more threads calls release. Thus ensuring that at any one time a maximum of (in this case) 3 threads can have access to the critical section.

Okay but the Semaphore allows an initial count (the first argument) so what’s that about ?

So let’s assume instead we have an initial count of 0, what happens then ?

private readonly Semaphore semaphore = new Semaphore(0, 3);

The above code still says we have a maximum of 3 threads allowed in our critical section, but it’s basically reserved 3 threads (maximum threads – initial count = reserved). In essence this is like saying the calling thread called WaitOne three times. The point being that when we fire off our 20 threads none will gain access to the critical section as all slots are reserved. So we would need to Release some slots before the blocked threads would be allowed into the critical section.

Obviously this is useful if we wanted to start a bunch of threads but we weren’t ready for the critical section to be entered yet.

However we can also set the initial count to another number, so let’s say we set it to 1 and maximum is still 3, now we have a capacity of 3 threads for our critical section but currently only one is allowed to enter the section until the reserved slots are released.

Note: It’s important to be sure that you only release the same number of times that you WaitOne or in the case of reserved slots you can only release up to the number of reserved slots.

To put it more simply, think reference counting. If you WaitOne you must call Release once and only once for each WaitOne. In the case of where we reserved 3 slots you can call Release(3) (or release less than 3) but you cannot release 4 as this would cause a SemaphoreFullException.

Important: Unlike a Mutex or Monitor/lock a Semaphore does not have thread affinity, in other words we can call Release from any thread, not just the thread which called WaitOne.

SemaphoreSlim

SemaphoreSlim, as the name suggests in a lightweight implementation of a Semaphore. The first thing to notice is that it uses Wait instead of WaitOne. The real purpose of the SemaphoreSlim is to supply a faster Semaphore (typically a Semaphore might take 1 ms per WaitOne and per Release, the SemaphoreSlim takes a quarter of this time, source ).

See also

Semaphore and SemaphoreSlim
Overview of Synchronization Primitives

and the brilliant Threading in C#

Puppet Task – TaskCompletionSource

Occasionally we will either want to wrap some code in a Task to allow us to use async await and maybe creating an actual Task is not required. It could be legacy code, for example maybe we’re running a threadpool thread and want to make this appear as a async await capable method or maybe we are mocking a method in our unit tests.

In such cases we can use the TaskCompletionSource.

Let’s look at a slightly convoluted example. We have a third party (or binary only) library with a ExternalMethod class and a method called CalculateMeanAsync which uses the threadpool to execute some process and when the process is completed it calls the supplied callback Action passing a double value as a result. So it’s method signature might look like

public static void CalculateMeanValueAsync(Action<double> callback)

Now we want to use this method in a more async await style i.e. we’d like to call the method like this

double d = await CalculateMeanValueAsync();

We want to basically create a task to handle this but we want to manually control it from outside of the external method. We can thus use the TaskCompletedSource as a puppet task, writing something like the code below to wrap the external method call in an async await compatible piece of code.

public Task<double> CalculateMeanValueAsync()
{
   var tcs = new TaskCompletionSource<double>();

   ExternalMethod.CalculateMeanValueAsync(result => 
   {
      tcs.SetResult(result);
   });

   return tcs.Task;
}

What happens here is we create a TaskCompletionSource, and return a Task from it (which obviously may or may not return prior to the ExternalMethod.CalculateMeanValueAsync completion. Our calling code awaits our new CalculateMeanValueAsync method and when the callback Action is called we set the result using tcs.SetResult. This will now cause our TaskCompletionSource task to complete allows our code to continue to any code after the await in the calling method.

So we’ve essentially made a method appear as a Task style async method but controlling the flow via the TaskCompletionSource.

Another example might be in unit testing. When mocking an interface which returns a Task we could create a TaskCompletionSource and create a setup that returns it’s Task property, then set the mock result on the TaskCompletionSource. An example of such a test is listed below:

[Fact]
public async Task CalculateMean_ExpectCallToIStatisticCalculator_ResultShouldBeSuppliedByMock()
{
   TaskCompletionSource<double> tc = new TaskCompletionSource<double>();

   var mock = new Mock<IStatisticsCalculator>();
   mock.Setup(s => s.CalculateMeanAsync()).Returns(tc.Task);

   tc.SetResult(123.45);

   Calculator c = new Calculator(mock.Object);
   double mean = await c.CalculateMeanAsync();
   Assert.Equal(123.45, mean);

   mock.Verify();
}

So in this example we have a Calculator class which takes an IStatisticsCalculator (maybe it allows us to swap in and out different code for the calculations – I didn’t say it was a perfect example). Now in our test we want to create a mock (I’m using xUnit and Moq for this). We expect the Calculator to call the mock code and return the result from it.

As you can see the mock sets up the Task and then we set the result on the TaskCompletionSource to emulate a completion of the task.

Note: In this example you must return Task on your async test method or you may find the test passing even when it clearly shouldn’t

The Parallels

The Parallel static class gives us a few nice helper methods to carry out For, ForEach and Invoke methods in a parallel way. In reality we should say a potentially parallel way as there’s no guarantee they will be executed in separate tasks.

Parallel.ForEach

The ForEach method has several overloads, but basically this will loop through an IEnumerable passing each value into an Action. For example

int[] values = new[] { 3, 5, 1, 45, 12, 6, 49 };
Parallel.ForEach(values, Console.WriteLine);

The order in which the Console.WriteLine method is passed values is non-deterministic. In other words there’s no guarantee that 1 will be processed before 45 and so on. A key thing to remember is that all values will be processed before the Parallel.ForEach call returns control to the calling code. In other word a Wait stops the calling code to continue until all items in the enumerable object passed to the ForEach method have been processed.

This means if you run Parallel.ForEach from a UI thread then it will block even though each item placed into the ForEach list is potentially run in parallel. So you need to do something like

Task.Factory.StartNew(() => Parallel.ForEach(values, SomeAction));

Obviously if anything within the action needs to update the UI thread you’ll need to marshal it onto the UI thread.

Parallel.For

Along with the ForEach we also have the For loop. Which allows us to loop from a (inclusive) to b (exclusive) indices calling the supplied Action with the index. For example

int[] values = new[] { 3, 5, 1, 45, 12, 6, 49 };
Parallel.For(0, 2, i => Console.WriteLine(values[i]));

In this instance only “values” 3 and 5 will be passed to the action as we’re starting at (and including) index 0 and stopping before index 2 (as it’s exclusive). The same issues/traits exist for the For loop in that it is blocking etc.

Invoke

Finally we have Invoke which takes an array of Actions and calls them in a potentially parallel manner. Thus if we had several actions that we want to call potentially in parallel we can pass them to the Invoke method to be executed. For example

Parallel.Invoke(() => Console.WriteLine(3), 
                () => Console.WriteLine(4), 
                () => Console.WriteLine(5));

As per For and ForEach this method will potentially execute the actions in parallel but will block the calling thread.

Exceptions

If exceptions occur within the loops or invoke the Parallel library will throw an AggregationException which will contain one or more InnerExceptions.

ParallelLoopState

We can also pass a ParallelLoopState object into either the ForEach or For loops which allows us to Break or Stop a parallel loop as well as allowing us to find out whether a loop has exceptioned or is stopped etc.

ParallelOptions

Each of the For, ForEach and Invoke method have an overload which takes ParallelOptions. This enables us to set the maximum degree of parallelism, the task scheduler and the cancellation token.

The maximum degree of parallelism allows us to limit the possible number of threads uses in a Parallel method.

The cancellation token allows us a way to cancel a parallel task however this is a cooperative operations, in other words the algorithm writer must poll the cancellation token and take the action to cancel the algorithm etc. when the token is set to Cancel. Tasks are not interrupted or stopped in anyway, it’s down to the algorithm to detect and stop.

The task scheduler allows us to assign a custom scheduler to the parallel code.