Category Archives: Reactive Extensions

Using Rx to read from UI and write on a worker thread

I have a problem whereby I need to iterate over a potentially large number of rows in a UI grid control, the iteration needs to take place on the UI thread but the writing (which in this instance write the data to Excel) can occur on a background thread which should make the UI a little more responsive.

Now this might not be the best solution but it seems to work better than other more synchronous solutions. One thing to note is that this current design expects the call to the method to be on the UI thread and hence doesn’t marshal the call to the grid control onto the UI thread (it assumes it’s on it).

Within the UI iteration method I create a new observable using

var rowObservable = Observable.Create<string>(observer =>
{
   // iterate over grid records calling the OnNext method 
   // for example
   foreach(var cell in Cells)
   {
      observer.OnNext(cell.Value);
   }

   observer.OnCompleted();
   return Disposable.Empty;
});

In the above code we loop through the cells of our UI grid control and then place each value onto the observer using OnNext. When the process completes we then call the OnCompleted method of the observer to tell any subscribers that the process is finished.

Let’s look at the subscriber code

var tsk = new TaskCompletionSource<object>();

rowObservable.Buffer(20).
   SubscribeOn(SynchronizationContext.Current).
   ObserveOn(Scheduler.Default).
   Subscribe(r =>
   {
      foreach (var item in r)
      {
          // write each value to Excel (in this example)
      }
   }, () =>
   {
      tsk.SetResult(null);
   });

return tsk.Task;

In the above code we buffer pushed items from rowObserver so we only process every 20 items. We ObserveOn the default scheduler, so this will be a background thread (i.e. threadpool) but we SubscribeOn the current synchronization context – remember I mentioned this method should be called from the UI thread and hence the SubscribeOn (not ObserveOn) relates to the code we’re observing and this is on the UI thread. When the rowObserver completes we’ll still write out the last few items (if they’re less than the buffer size).

Note: It’s important to remember that SubscribeOn schedules the observable (in this case the UI code) not the code within the Subscribe method. This is scheduled using the ObserveOn method.

You’ll notice we use a puppet task, controlling a TaskCompletionSource and on completion of the rowObserver we set the result on the puppet task, thus allowing our method to be used in async/await scenarios.

Like I mentioned, this actually might not be the best solution to the original problem, but it was interesting getting it up and running.

Observable.FromEventPattern discussed

I was actually looking through my past blog posts and noticed I’ve never published anything on Reactive Extensions (Rx). I actually have a draft “beginners” guide which I started a couple of years back, but never completed, so I may end up publishing that at a later date.

Note: The best resource for Rx information that I’ve found is Introduction to Rx, I’d definitely recommend you visit that site for complete and excellent documentation on Rx.

Okay, for this post I’m just going to focus on the FromEventPattern. Even though the syntax is pretty simple I have a habit of forgetting it everytime I come to use it, so as this blog is about refreshing my memory, I thought I’d write a blog post dedicated to this method.

Why use Rx instead of handling events in the conventional way?

One of the most obvious reasons to use Rx to handle events is that it can help stop those pesky memory leaks when we forget to unsubscribe to an event. Another reason is the IObservable returned from FromEventPattern is composable and we can filter events that we’re not interested in and/or marshal events from one thread onto another – for example when events are coming from a worker thread we might marshal them onto the UI thread.

Take a look at Observable.FromEventPattern Method to see the current syntax/overloads for this method.

I will not be covering all the overloads here but will instead look at the three I’ve used the most. So let’s get started…

FromEventPattern with an EventHandler which takes no EventArgs

In some cases you might have an event handler which doesn’t use EventArgs, for example

public event EventHandler Reverted;

in such a case we use the overload

public static IObservable<EventPattern<EventArgs>> FromEventPattern(
	Action<EventHandler> addHandler,
	Action<EventHandler> removeHandler)

So our code might look like this

var o = Observable.FromEventPattern(
   x => domainObject.Changed += x,
   x => domainObject.Changed -= x).
   Subscribe(_ => DoSomething());

FromEventPattern with standard event patterns (using the EventHandler class)

Building on the previous syntax…

In many cases, where the events that we want to subscribe to, use the “standard .NET event pattern”, (for example where the handler takes an EventArgs derived class) and were declared using the syntax

public event EventHandler<CellsInViewChangedEventArgs> CellsInViewChanged;

we will use this overload. Note: This version expects an EventHandler<> object.

public static IObservable<EventPattern<TEventArgs>> FromEventPattern<TEventArgs>(
	Action<EventHandler<TEventArgs>> addHandler,
	Action<EventHandler<TEventArgs>> removeHandler
)
where TEventArgs : EventArgs

To use this we simple supply the EventArgs derived type, for example

var o = Observable.FromEventPattern<CellsInViewChangedEventArgs>(
   x => grid.CellsInViewChanged += x,
   x => grid.CellsInViewChanged -= x).
   Subscribe(_ => DoSomething());

FromEvent pattern with standard event patterns (not using the Event Handler class)

Finally, for this post, in some cases we have events which do not use the EventHandler<> class, for example the WPF RoutedEvents use

public delegate void RoutedEventHandler(
   object sender, 
   System.Windows.RoutedEventArgs e)

In such cases we need to supply both the Delegate type and EventArgs type to the FromEventPattern, hence we use the syntax

public static IObservable<EventPattern<TEventArgs>> FromEventPattern<TDelegate, TEventArgs>(
	Action<TDelegate> addHandler,
	Action<TDelegate> removeHandler
)
where TEventArgs : EventArgs

our code will now look like this

var o = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
   x => button.Click += x,
   x => button.Click -= x).
   Subscribe(_ => DoSomething());

More…

I don’t intend to cover all the possible uses of the IObservable which is returned by the FromEventPattern, but suffice to say (as mentioned earlier). We can ensure events are marshaled onto the thread which created the observable, so for example if we create this from the UI thread we can write

var o = Observable.FromEventPattern<ValuesChangedEventArgs>(
   x => server.ValuesChanged += x,
   x => server.ValuesChanged -= x).
   ObserveOn(SynchronizationContext.Current).
   Subscribe(_ => DoSomething());

here, the ObserveOn will ensure that the events that might be coming from a worker thread are marshaled onto the current thread. Obviously if this was created on the UI thread this will marshal back onto that thread.

Testing Reactive Extension code using the TestScheduler

I often use the RX Throttle method, which is brilliant for scenario’s such as allowing a user to type text into a textbox and then only calling my code when the user pauses typing for an assigned amount of time. Or when scrolling up and down lists and when the user stops selecting items getting more data for the selected item from a webservice or the likes.

But what do we do about testing such interactions ?

Sidetrack

Instead of looking at the Throttle method, I needed to create a Throttle that actually buffers the data sent to it and when the timeout occurs gives the subscriber all the data not just the last piece of data (as Throttle would). So here’s some code I wrote (based heavily upon a StackOverflow post)

This post is meant to be about how we test such code, so let’s look at one of the unit test I have for this

[Fact]
public void ThrottleWithBuffer_EnsureValuesCorrectWhenOnNext()
{
   var scheduler = new TestScheduler();
   var expected = new[] { 'a', 'b', 'c' };
   var actual = new List<List<char>>();

   var subject = new Subject<char>();

   subject.AsObservable().
      ThrottleWithBuffer(TimeSpan.FromMilliseconds(500), scheduler).
      Subscribe(i => actual.Add(new List<char>(i)));

   scheduler.Schedule(TimeSpan.FromMilliseconds(100), () => subject.OnNext('a'));
   scheduler.Schedule(TimeSpan.FromMilliseconds(200), () => subject.OnNext('b'));
   scheduler.Schedule(TimeSpan.FromMilliseconds(300), () => subject.OnNext('c'));
   scheduler.Start();

   Assert.Equal(expected, actual[0]);
}

Now the key things here are the use of the TestScheduler which allows us the bend time (well not really, but it allows us to simulate the passing of time).

The TestScheduler is available in the Microsoft.Reactive.Testing.dll or from NuGet using Install-Package Rx-Testing

As you can see from the test, we now create the subscription to the observable and following that the scheduler is passed the “simulated” times. The first argument tells it the time an item is scheduled for, i.e. the first item is scheduled at 100ms, the next 200ms and finally 300ms. We then call Start on the scheduler to begin the time simulation.

Just think, if your code relied on something happening after a pause of a second then each test would have to wait one second before it could verify data, making a large test suit a lot slower to run. Using the TestScheduler we simply simulate a second passing.

WPF behaviors

Behaviors came into WPF via Expression Blend. They’re basically a standardized way of adding extra functionality to WPF classes using XAML. So why not use attached properties you might ask. I’ll discuss the differences as we go.

Let’s look at a real world example…

I’m using Infragistics XamDataGrid to display rows of data, but I would like a simple filtering mechanism so that when the user enters text into a text box, all columns are filtered to show data with the filter text in it. I also want it so that when the text box is cleared the filters are cleared. Then I want a Button to enable me to clear the filter text for me with the click of the button.

How might we implement this ? Well the title of the post is a giveaway, but let’s look at some other possibilities first

  • We could write this in the code-behind but generally we try to stay clear of writing such code and instead would generally prefer use XAML to help make our code reusable across projects
  • We could derive a new class from the XamDataGrid and add dependency properties, but this means the code will only be usable via our new type, so it’s not as useful across other projects as it requires anyone wanting to use a XamDataGrid to use our version
  • We could use attached properties, which allow us to create “external” code, i.e. code not in code behind or in a derived class, which can work in conjunction with a XamDataGrid, but the problem here is that attached properties are written in static classes and we will want to store instance variables with the data (see my reasons for this requirement below). With a static class implementation we would have to handle the management of such data ourselves, not difficult, but not ideal.

The attached properties route looked promising – I’m going to need a property for the associated TextBox (acting as our filter text) and the Button (used to clear the filter) and ofcourse these may be different per instance of a XamDataGrid – I also need to handle the attachment and detachment of event handlers and any other possible state. As mentioned, we could implement such state management ourselves, but behaviors already give us this capability out of the box as they are created on a per instance basis.

So the best way for to think of a behavior is that it’s like attached properties but allows us to create multiple instances of the code and thus saves us a lot of the headaches that might occur managing multiple instance data.

Note: The code I’m about to show/discuss includes Reactive Extension code. I will not go into any depth on what it does or how it works but the main use here is to handle attachment and detachment of events and also to allow throttling of the input, this means as the user types in the filter text box, we do not update the filter until the user stops typing for half a second. This ensures we’re not continually updating the filters on the XamDataGrid as the user types

Creating a behavior

To create a behavior we simply create a class and derive it from the Behavior class which is part of the System.Windows.Interactivity namespace. The Behavior takes a generic argument which defines the type it can be used on. So to start off our code would look like this

public class XamDataGridFilterBehavior : Behavior<XamDataGrid>
{
   protected override void OnAttached()
   {
      base.OnAttached();
   }

   protected override void OnDetaching()
   {
      base.OnDetaching();
   }
}

So the key parts here (apart from the base class which has already been mentioned) are the OnAttached and OnDetaching overrides. So here we can attach and detach from events on the associated class (i.e. the XamDataGrid) and/or handle initialization/disposal of data/objects as required.

Before we look at a possible implementation of these methods, I wrote a simple list of requirements at the top of this post. One was the requirement for a TextBox to be associated with the XamDataGrid to act as the filter text and the other a Button to be associated to clear the filter. So let’s add the dependency properties to our class to implement these requirements.

public static readonly DependencyProperty FilterTextBoxProperty =
   DependencyProperty.Register(
   "FilterTextBox",
   typeof(TextBox),
   typeof(XamDataGridFilterBehavior));

public TextBox FilterTextBox
{
   get { return (TextBox)GetValue(FilterTextBoxProperty); }
   set { SetValue(FilterTextBoxProperty, value); }
}

public static readonly DependencyProperty ResetButtonProperty =
   DependencyProperty.Register(
   "ResetButton",
   typeof(Button),
   typeof(XamDataGridFilterBehavior));

public Button ResetButton
{
   get { return (Button)GetValue(ResetButtonProperty); }
   set { SetValue(ResetButtonProperty, value); }
}

So nothing exciting there, just standard stuff.

Now to the more interesting stuff, let’s implement the OnAttached and OnDetaching code. As I’m using Reactive Extensions we’ll need to have two instance variables, both of type IDisposable to allow us to clean up/detach any event handling. Let’s see all the code

private IDisposable disposableFilter;
private IDisposable disposableReset;

protected override void OnAttached()
{
   base.OnAttached();

   var filter = FilterTextBox;
   if (filter != null)
   {
      disposableFilter = Observable.FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
         x => filter.TextChanged += x,
         x => filter.TextChanged -= x).
         Throttle(TimeSpan.FromMilliseconds(500)).
         ObserveOn(SynchronizationContext.Current).
         Subscribe(_ =>
         {
            var dp = AssociatedObject as DataPresenterBase;

            if (dp != null && dp.DefaultFieldLayout != null)
            {
               dp.DefaultFieldLayout.RecordFilters.Clear();
               dp.DefaultFieldLayout.Settings.RecordFiltersLogicalOperator = LogicalOperator.Or;

               foreach (var field in dp.DefaultFieldLayout.Fields)
               {
                  var recordFilter = new RecordFilter(field);
                  recordFilter.Conditions.Add(
                     new ComparisonCondition(ComparisonOperator.Contains, filter.Text));
								                  
                  dp.DefaultFieldLayout.RecordFilters.Add(recordFilter);
               }
           }
      });
   }

   var reset = ResetButton;
   if (reset != null)
   {
      disposableReset = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
         x => reset.Click += x,
         x => reset.Click -= x).
         ObserveOn(SynchronizationContext.Current).
         Subscribe(_ =>
         {
             FilterTextBox.Text = String.Empty;
             // whilst the above will clear the filter it's throttled so can
             // look delayed - better we clear the filter immediately
             var dp = AssociatedObject as DataPresenterBase;

             if (dp != null && dp.DefaultFieldLayout != null)
             {
                dp.DefaultFieldLayout.RecordFilters.Clear();
             }
        });
    }
}

protected override void OnDetaching()
{
   base.OnDetaching();

   if (disposableFilter != null)
   {
      disposableFilter.Dispose();
      disposableFilter = null;
   }
   if (disposableReset != null)
   {
      disposableReset.Dispose();
      disposableReset = null;
   }
}

This post isn’t mean’t to be about using the RX library or Infragistics, but the basics are that when OnAttached is called we use the RX FromEventPattern method to create our event handler attachment/detachment points. In the case of the TextBox we attach to the KeyDown event on the TextBox, we throttle the Observable for half a second so (as previously mentioned) we don’t update the filters on every change of the TextBox, we delay for half a second to allow the user to pause and then we filter. We also ensure the Subscribe code is run on the UI thread (well as the code that call OnAttached will be on the UI thread we’re simply observing on the current thread, which should be the UI thread). In the Subscribe method we get the AssociatedObject, this is where our Behavior generic argument comes in. The AssociatedObject is the object this behavior is applied to (we’ll see a sample of the XAML code next). Now we clear any current filters and create new ones based upon the supplied TextBox Text property. Finally we connect to the Click event on the supplied ResetButton. When the button is pressed we clear the FilterText and clear the filters.

In the code above I felt that the UX of a delay between clearing the FilterText and the filters clearing (the half a second delay) didn’t feel right when the user presses a button, so in this instance we also clear the filters immediately.

The OnDetaching allows us cleanup, so we dispose of our Observables and that will detach our event handlers and nicely clean up after usage.

How do we use this code in XAML

Finally, we need to see how we use this code in our XAML, so here it is

<TextBox x:Name="filter"/>
<Button Content="Reset" x:Name="reset" />

<XamDataGrid>
   <i:Interaction.Behaviors>
     <controls:XamDataGridFilterBehavior 
        FilterTextBox="{Binding ElementName=filter}" 
        ResetButton="{Binding ElementName=reset}" />
   </i:Interaction.Behaviors>
</XamDataGrid>

And that’s it, now we have a reusable class which a designer could use to add “behavior” to our XamDataGrid.

Reactive Extensions Tips & Tricks

Creating an observable from an event

Observing a generic event such as the StatusChangedEvent (EventHandler<StatusChangedEvent>) is as simple as

var statusChanged = Observable.FromEventPattern<StatusChangedEventArgs>(
                e => KinectSensor.KinectSensors.StatusChanged += e,
                e => KinectSensor.KinectSensors.StatusChanged -= e);

Cast and OfType

Cast and OfType allow us to observe an observable and in the case of Cast will cast each item to the specified type, causing on OnError if the type cannot be cast (due to an InvalidCastException). OfType does much the same except when a type is not castable to the specified type it’s ignored.

Cast in use…

// cast in use, failing due to InvalidCastException
object[] items = new object[3];
items[0] = 23;
items[1] = "Hello";
items[2] = 444;

var o = items.ToObservable().Cast<int>().Subscribe(Console.WriteLine);

The result of the above will be 23, then an error.

OfType in use…

// OfType used, only int's will be output
object[] items = new object[3];
items[0] = 23;
items[1] = "Hello";
items[2] = 444;

var o = items.ToObservable().OfType<int>().Subscribe(Console.WriteLine);

The result of this will be 23 & 44, with the string ignored.