Using Rx to read from UI and write on a worker thread

I have a problem whereby I need to iterate over a potentially large number of rows in a UI grid control, the iteration needs to take place on the UI thread but the writing (which in this instance write the data to Excel) can occur on a background thread which should make the UI a little more responsive.

Now this might not be the best solution but it seems to work better than other more synchronous solutions. One thing to note is that this current design expects the call to the method to be on the UI thread and hence doesn’t marshal the call to the grid control onto the UI thread (it assumes it’s on it).

Within the UI iteration method I create a new observable using

var rowObservable = Observable.Create<string>(observer =>
{
   // iterate over grid records calling the OnNext method 
   // for example
   foreach(var cell in Cells)
   {
      observer.OnNext(cell.Value);
   }

   observer.OnCompleted();
   return Disposable.Empty;
});

In the above code we loop through the cells of our UI grid control and then place each value onto the observer using OnNext. When the process completes we then call the OnCompleted method of the observer to tell any subscribers that the process is finished.

Let’s look at the subscriber code

var tsk = new TaskCompletionSource<object>();

rowObservable.Buffer(20).
   SubscribeOn(SynchronizationContext.Current).
   ObserveOn(Scheduler.Default).
   Subscribe(r =>
   {
      foreach (var item in r)
      {
          // write each value to Excel (in this example)
      }
   }, () =>
   {
      tsk.SetResult(null);
   });

return tsk.Task;

In the above code we buffer pushed items from rowObserver so we only process every 20 items. We ObserveOn the default scheduler, so this will be a background thread (i.e. threadpool) but we SubscribeOn the current synchronization context – remember I mentioned this method should be called from the UI thread and hence the SubscribeOn (not ObserveOn) relates to the code we’re observing and this is on the UI thread. When the rowObserver completes we’ll still write out the last few items (if they’re less than the buffer size).

Note: It’s important to remember that SubscribeOn schedules the observable (in this case the UI code) not the code within the Subscribe method. This is scheduled using the ObserveOn method.

You’ll notice we use a puppet task, controlling a TaskCompletionSource and on completion of the rowObserver we set the result on the puppet task, thus allowing our method to be used in async/await scenarios.

Like I mentioned, this actually might not be the best solution to the original problem, but it was interesting getting it up and running.