Pulse and Wait

The Monitor class contains the static methods Pulse and Wait. Well it has more than those two ofcourse, but these are the two this post is interested in.

Both methods must be enclosed in a lock or more explicitly the object that we pass to Pulse and Wait must have had a lock acquired against it. For example

private readonly object sync = new object();

// thread A
lock(sync)
{
   Monitor.Wait(sync);
}

// thread B
lock(sync)
{
   Monitor.Pulse(sync);
}

In the above code, we have two threads. Thread A acquires a lock on sync and then enters a Wait, at which point the lock is released but the thread now blocks until it reacquires the lock. Meanwhile, thread B acquires the lock on the sync object. It calls Monitor.Pulse which basically moved the waiting thread (thread A) to the ready queue and when thread B releases the lock (i.e. exits the lock block) then the next ready thread (in this case thread A) reacquires the lock and any code after the Monitor.Wait would be executed until it exits the lock block and releases the lock.

Producer-consumer pattern

Okay, I’ll admit to a lack of imagination – the producer consumer pattern also known as the producer consumer queue, is a standard sample for showing Pulse and Wait in use and the excellent Threading C# posts by Joseph Albahari are probably the best place to look for all C# threading information, but we’re going to walk through the producer consumer queue here anyway.

So the producer consumer queue is simply put, a queue whereby multiple threads may add to the queue and as data is added a thread within the queue does something with the data, see Producer–consumer problem.

This example creates one or more threads (as specified in the threadCount) which will be used to process of items from the queue.

The Enqueue method locks then adds the action to the queue and then pulses. The “processing” threads wait for the lock on the sync object being released and makes sure there are still items in the queue (otherwise the threads goes into a wait again). Assuming there is an item in the queue we get the item within the lock to ensure no other thread will have removed it then we release the lock and invoke the action before checking for more items to be processed.

public class ProducerConsumerQueue
{
   private readonly Task[] tasks;
   private readonly object sync = new object();
   private readonly Queue<Action> queue;

   public ProducerConsumerQueue(int threadCount, CancellationToken cancellationToken)
   {
      Contract.Requires(threadCount > 0);

      queue = new Queue<Action>();
      cancellationToken.Register(() => Close(false));

      tasks = new Task[threadCount];
      for (int i = 0; i < threadCount; i++)
      {
         tasks[i] = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
      }
   }

   public void Enqueue(Action action)
   {
      lock (sync)
      {
         queue.Enqueue(action);
         Monitor.Pulse(sync);
      }
   }

   public void Close(bool waitOnCompletion = true)
   {
      for (int i = 0; i < tasks.Length; i++)
      {
         Enqueue(null);
      }
      if (waitOnCompletion)
      {
         Task.WaitAll(tasks);
      }
   }

   private void Process()
   {
      while (true)
      {
         Action action;
         lock (sync)
         {
            while(queue.Count == 0)
            {
                Monitor.Wait(sync);
            }
            action = queue.Dequeue();
         }
         if (action == null)
         {
            break;
         }
         action();
      }
   }
}

Here’s a simple sample of code to interact with the ProducerConsumerQueue

CancellationTokenSource cts = new CancellationTokenSource();

ProducerConsumerQueue pc = new ProducerConsumerQueue(1, cts.Token);
pc.Enqueue(() => Console.WriteLine("1"));
pc.Enqueue(() => Console.WriteLine("2"));
pc.Enqueue(() => Console.WriteLine("3"));
pc.Enqueue(() => Console.WriteLine("4"));

// various ways to exit the queue in an orderly manner
cts.Cancel();
//pc.Enqueue(null);
//pc.Close();

So in this code we create a ProducerConsumerQueue with a single thread, actions are added to the queue via Enqueue and as the ProducerConsumerQueue has only a single thread and as all the items were added from a single thread, each item is simply invoked in the order they were added to the queue. However we could have been adding from multiple threads as the ProducerConsumerQueue is thread safe. Had we created the ProducerConsumerQueue with multiple threads then the order of processing may also be different.