Our Way To Reactive Extensions

Photo of Wojciech Gomoła

By Wojciech Gomoła, in Technology

In the modern world we are inundated with information - TV, radio, newspapers, internet – to the extent that often we need some kind of filters to get only those pieces of information which we find interesting. That’s why the most recent news app from Google uses artificial intelligence to customize the newsfeed and make sure users receive only the kind of news they want. This also applies to the APIs that we use both for desktop and web development: sometimes the amount of notifications and data sent even by a simple interface may be overwhelming and needs to be managed by means of filtering before it is presented to the end user.

Architectural background

One of the products that we develop is a desktop app based on a complex library that creates abstraction over communication with the servers and different REST APIs. The image below is a part of the class that wraps a piece of the one API which consists of “Description” and “Size”:

public class Class1
{
    public void SetDescription(string description, CompletionHandler handler)
    {
         /* Some logic here */
    }

    public void SetSize(int size, CompletionHandler handler)
    {
        /* Some logic here */
    }

    public string Description { get; private set; }

    public int Size { get; private set; }

    public event EventHandler DescriptionChanged;

    public event EventHandler SizeChanged;
}
  • The methods “SetDescription” and “SetSize” accept the value argument and the CompletionHandler object which is called when the operation succeeded or failed. The completion handler provides an information about the operation status (e.g. details about the error that occurred) and an optional result. To make our further work easier we have added some wrappers around those methods that translate completion handlers to .NET Task class [1 - https://msdn.microsoft.com/en-us/library/system.threading.tasks.task(v=vs.110).aspx] (similar to the promises in Java and JavaScript).
  • The properties contain information about the last known state of the object.
  • The events notify the application about state change: it doesn’t matter if they are caused by calling the method locally or by some activity of other applications or services.

The library processes all requests and responses on its own thread pool. It has its advantages as well as disadvantages. The advantages are: the UI is not affected by processing network traffic. On the other hand, the disadvantages include the following: events and completion handlers are called on non-UI thread, so the application has to use some kind of queue to process them to ensure that the methods are called on UI thread and the library has to properly route them to the provided thread or queue.

Manual Threads Management – You Don’t Want To Do This

During the development we have figured out that posting each event to UI thread manually causes high volume of long and repeated code which looks like this:

public class SomeOtherClass : IDisposable
{
    private readonly SomeClass _someObject;
    private readonly DispatcherService _dispatcherService;

    public SomeOtherClass(
        SomeClass someObject,
        DispatcherService dispatcherService)
    {
        _someObject = someObject;
        _dispatcherService = dispatcherService;

        _someObject.DescriptionChanged += SomeObjectOnDescriptionChanged;
        _someObject.SizeChanged += SomeObjectOnSizeChanged;
    }
    
    private void SomeObjectOnDescriptionChanged(object sender, EventArgs e)
    {
        _dispatcherService.Invoke(LoadState);
    }

    private void SomeObjectOnSizeChanged(object sender, EventArgs e)
    {
        _dispatcherService.Invoke(LoadState);
    }

    private void LoadState()
    {
        PropertyVisibleOnUI = $"Description: {_someObject.Description} Size: {_someObject.Size}";
    }
    
    public string PropertyVisibleOnUI { get; set; }

    public void Dispose()
    {
        _someObject.DescriptionChanged -= SomeObjectOnDescriptionChanged;
        _someObject.SizeChanged -= SomeObjectOnSizeChanged;
    }
}

The code becomes even more complex if we have to consider multiple nested objects. Just imagine the code required to display a “Your message has been read” notification if you have the following API:

public interface IConversationService
{
    event EventHandler ConverstionStarted;
    event EventHandler ConversationEnded; 
    IEnumerable Conversations { get; }
}

public interface IConversation
{
    event EventHandler MessageAdded;
    void SendMessage(string body, CompletionHandler handler); 
    IEnumerable Messages { get; }
}

public interface IMessage
{
    event EventHandler IsReadChanged;
    bool IsMine { get; }
    string Body { get; }
    bool IsRead { get; }
}

Event Aggregator To The Rescue

Event aggregator (event bus, mediator) seems to be a pretty good solution to the described problem, especially because we had wrappers around the library’s objects. Our notification area subscribes some message type and displays the notification when the event occurs.

_messageAggregator.Register(MessageHandlingMethod);

It makes cross thread calls much easier. We have written a method that extends the event aggregator object and forced the notification-handler to be invoked on the UI thread:

public static void RegisterOnDispatcher(
    this MessageAggregator self,
    DispatcherService dispatcherService,
    Action handler)
{
    self.Register(message => dispatcherService.Invoke(() => handler(message)));
}

It takes a few seconds to understand the method, but it is extremely easy to use later on:

messageAggregator
    .RegisterOnDispatcher(
        dispatcherService,
        MessageHandlingMethod);

Problems With The Event Aggregator

This pattern was convenient enough to dominate most of our code. At one point, we have realized that we used it even if it was the cause of the problem.

First of all, many classes required 2 dependencies: a class that keeps the state and an event aggregator to subscribe for messages.

Sending a message by means of the aggregator required creating a new class for the message, mainly because developers like to take shortcuts and they tend to skip this step if they can, even if it breaks abstraction. In many cases, we had a service that subscribed to couple other services and computed some value. Then, other services used this value, because the service itself didn’t send any messages, those services subscribed for messages sent by the original service.
In our app, some settings can be changed on the fly, without having to restart the app. When at least one of them were changed, a SettingsChangedMessage is sent.

A lot of classes subscribed to this event and were notified about every setting change even if they weren’t particularly interested in the change. We have also added an additional field to Message that contains values about which setting has been changed. However, it also forced us to add an additional enum value and logic that checks if the setting value has changed every time we add new field in the settings.

There was another performance problem that was non-trivial to fix in this architecture. Dispatching the method invocation to other thread is not very expensive if done rarely, but if there is a lot of thread-hopping, it starts to affect performance. It causes temporary hangs of the UI. There were cases where multiple instances of some class depended on a couple of values from different services. Those instances had to subscribe for all messages that were sent when those values changed. The frequent result was that the sent message didn’t affect the overall state of the subscriber. Despite of the fact that the handler was dispatched, we weren’t able to fix this problem easily in our architecture.

Observer and Observable

This design pattern is rarely seen in .NET, mostly because events are sufficient in the common scenarios. Microsoft noticed that there are cases, like ours, that are not so common, but also not that simple and decided to add the IObserver and IObservable interfaces. Those are slightly different to the ones you can find in books about design patterns. First of all, those interfaces contain generic constraints. The Observable contains only one method: Subscribe. That method accepts Observer as an argument. It doesn’t contain the Unsubscribe method. Instead of this, the Subscribe method returns an object that implements IDisposable - disposing this object unregisters the observer. The Observer contains 3 methods: OnNext, OnError and OnCompleted and is focused more on values passed via OnNext than on the Observable.

RX – LINQ for observables

LINQ is a very powerful part of .NET standard library that has been founded on very simple interfaces: IEnumerable and IEnumerator. It made the manipulating of collections easy and became so popular that most of .NET developers would probably rather quit their jobs than resign from using it. It is based on extension methods and a decorator pattern: the extension method creates an object that implements IEnumerable and accepts IEnumerable which represents an object on which it is being called and a Predicate object as parameters. When the GetEnumerator method is called on this object, it calls the same method on the IEnumerable base implementation which yields the object filtered, mapped or merged using specified predicate.

Both interfaces, IObservable and IEnumerable manage and provide a way to access some data. Both are used to deliver object references and provide an information whether or not there are some more items available, but the key difference between them is in the way of delivering items. IEnumerable waits for client code to pull the value, IObservable pushes it.

It seems reasonable to try LINQ approach to extend IObservable and this is how Reactive Extensions were invented. It turns out to be such a good idea that most of modern programming platforms implemented the same paradigm. Since the great platform–independent documentation has been written, we have decided to try it with one of the new features which we were delivering. Surprisingly, despite of the fact that the integration wasn’t included in the estimation we were able to deliver the feature on time.

Delivering property value when event is called

Because of architecture mentioned before, we needed a good way of delivering new property value every time the event was raised. Fortunately, RX contains just right method.

var sizeObservable =
    Observable
        .FromEventPattern(
            handler => someObject.SizeChanged += handler,
            handler => someObject.SizeChanged -= handler)
        .Select(o => someObject.Size);

Because in most cases our code requires loading current property value initially, we have created our own (FromEventAndProperty) extension method which does just that.

Cross-thread communication

Because a lot of messages have to be passed between threads, a support for multithreading was important. RX contains ObserveOn extension that accepts reference to the dispatcher. Unfortunately, our app used a wrapper around the dispatcher that prevented us from using it directly, so we had to write our own extension. Luckily, it was pretty simple: 1 extension method, 2 classes (observable and observer) less than 100 lines of code.

Combining multiple observables

Many UI components depend on more than one value, so combining multiple observables into one was a must-have. RX contains the method CombineLatest that accepts a couple of Observables and a function that accepts values provided by the combined Observables.

Observable
    .CombineLatest(
        firstObservable,
        secondObservable,
        thirdObservable,
        (firstValue, secondValue, thirdValue) => firstValue * secondValue * thirdValue);

The result of combining Observables is also an Observable so there is no need for workarounds to use it in place of regular observable.

Reducing workload

Filtering events
Just like LINQ, RX contains a Where extension that filters the items which match the criteria. All other implementations of RX call this method a Filter.

Removing duplicates
Reactive extensions contain the Distinct method that ensures that each value is delivered to the observer only once. From our experience, DistinctUntilChanged was more useful, because it removes consecutive duplicates.

Debouncing
Debounce extension skips the delivery of some values if they are emitted too often. It is useful if the user cannot consume data in rate enforced by the source. Let’s imagine a sensor that checks level of coolant in a car driving on a bumpy road. If coolant level is somewhere around minimum safe level sensor would sent information that changes all the time, without debouncing it would cause indicator flickering which is annoying from the user perspective.

Problems with unit testing

We are trying to constantly increase our unit tests code coverage. We use mock libraries if it speeds up our work. We use both Moq and NSubstitute which return null for observable properties. It extends our test setup, but it is not as painful as it looks.

Post-mortem debug

Similarly to the LINQ or async methods, RX makes call-stack extremely long. Sometimes it doesn’t even clearly point to any place in our code where the crash occurred, so we have it in mind.

We did not say good bye to the event aggregator

Event aggregator still has its place in our project. There are even areas in code where we use both: observable property and messages sent via the aggregator.

It was a very instructive journey from directly attaching event handlers to using reactive extensions. We have gathered a lot of knowledge about differences between each approach and we have learned to use the right tool for the job.

Share the story

Related