Reactive service to service communication with RSocket – Introduction

This article is the first one of the mini-series which will help you to get familiar with RSocket – a new binary protocol which may revolutionize machine-to-machine communication. In the following paragraphs, we discuss the problems of the distributed systems and explain how these issues may be solved with RSocket. We focus on the communication between microservices and the interaction model of RSocket.

Please notice that code examples presented in the article are available on GitHub →

Communication problem in distributed systems

Microservices are everywhere, literally everywhere. We went through the long journey from the monolithic applications, which were terrible to deploy and maintain, to the fully distributed, tiny, scalable microservices. Such architecture design has many benefits; however, it also has drawbacks, worth mentioning. Firstly, to deliver value to the end customers, services have to exchange tons of data. In the monolithic application that was not an issue, as the entire communication occurred within a single JVM. In the microservice architecture, where services are deployed in the separate containers and communicate via an internal or external network, networking is a first-class citizen. Things get more complicated if you decide to run your applications in the cloud, where network issues and periods of increased latency is something you cannot fully avoid. Rather than trying to fix network issues, it is better to make your architecture resilient and fully operational even during a turbulent time.

Let’s dive a bit deeper into the concept of the microservices, data, communication and the cloud. As an example, we will discuss the enterprise-grade system which is accessible through a website and mobile app as well as communicates with small, external devices (e.g home heater controller). The system consists of multiple microservices, mostly written in Java and it has a few Python and node.js components. Obviously, all of them are replicated across multiple availability zones to assure that the whole system is highly available.

To be IaaS provider agnostic and improve developer experience the applications are running on top of PaaS. We have a wide range of possibilities here: Cloud Foundry, Kubernetes or both combined in Cloudboostr are suitable. In terms of communication between services, the design is simple. Each component exposes plain REST APIs – as shown in the diagram below.

At first glance, such an architecture does not look bad. Components are separated and run in the cloud – what could go wrong? Actually, there are two major issues – both of them related to communication.

The first problem is the request/response interaction model of HTTP. While it has a lot of use cases, it was not designed for machine to machine communication. It is not uncommon for the microservice to send some data to another component without taking care about the result of the operation (fire and forget) or stream data automatically when it becomes available (data streaming). These communication patterns are hard to achieve in an elegant, efficient way using a request/response interaction model. Even performing simple fire and forget operation has side effects – the server has to send a response back to the client, even if the client is not interested in processing it.

The second problem is the performance. Let’s assume that our system is massively used by the customers, the traffic increases, and we have noticed that we are struggling to handle more than a few hundred requests per second. Thanks to the containers and the cloud, we are able to scale up our services with ease. However, if we track resource consumption a bit more, we will notice that while we are running out of memory, the CPUs of our VMs are almost idle. The issue comes from the thread per request model usually used with HTTP 1.x, where every single request has its own stack memory. In such a scenario, we can leverage the reactive programming model and non-blocking IO. It will significantly cut down memory usage, nevertheless, it will not reduce the latency. HTTP 1.x is a text-based protocol thus size of data that need to be transferred is significantly higher than in the case of binary protocols.

In the machine to machine communication we should not limit ourselves to HTTP (especially 1.x), its request/response interaction model and poor performance. There are many more suitable and robust solutions out there (on the market). Messaging based on the RabbitMQ, gRPC or even HTTP 2 with its support for multiplexing and binarized payloads will do way better in terms of performance and efficiency than plain HTTP 1.x.

Using multiple protocols allow us to link the microservices in the most efficient and suitable way in a given scenario. However, the adoption of multiple protocols forces us to reinvent the wheel again and again. We have to enrich our data with extra information related to security and create multiple adapters which handle translation between protocols. In some cases, transportation requires external resources (brokers, services, etc.) which need to be highly available. Extra resources entail extra costs, even though all we need is simple, message-based fire and forget operation. Besides, a multitude of different protocols may introduce serious problems related to application management, especially if our system consists of hundreds of microservices.

The issues mentioned above are the core reasons why RSocket was invented and why it may revolutionize communication in the cloud. By its reactiveness and built-in robust interaction model, RSocket may be applied in various business scenarios and eventually unify the communication patterns that we use in the distributed systems.

RSocket to the rescue

RSocket is a new, message-driven, binary protocol which standardizes the approach to communication in the cloud. It helps to resolve common application concerns with a consistent manner as well as it has support for multiple languages (e.g java, js, python) and transport layers (TCP, WebSocket, Aeron).
In the following sections, we will dive deeper into protocol internals and discuss the interaction model.

Framed and message-driven

Interaction in RSocket is broken down into frames. Each frame consists of a frame header which contains the stream id, frame type definition and other data specific to the frame type. The frame header is followed by meta-data and payload – these parts carry data specified by the user.

There are multiple types of frames which represent different actions and available methods of the interaction model. We’re not going to cover all of them as they are extensively described in the official documentation (http://rsocket.io/docs/Protocol). Nevertheless, there are few which are worth noting. One of them is the Setup Frame which the client sends to the server at the very beginning of the communication. This frame can be customized so that you can add your own security rules or other information required during connection initialization. It should be noted that RSocket does not distinguish between the client and the server after the connection setup phase. Each side can start sending the data to the other one – it makes the protocol almost entirely symmetrical.

Performance

The frames are sent as a stream of bytes. It makes RSocket way more efficient than typical text-based protocols. From a developer perspective, it is easier to debug a system while JSONs are flying back and forth through the network, but the impact on the performance makes such convenience questionable. The protocol does not impose any specific serialization/deserialization mechanism, it considers the frame as a bag of bits which could be converted to anything. That makes possible to use JSON serialization or more efficient solutions like Protobuf or AVRO.

The second factor, which has a huge impact on RSocket performance is the multiplexing. The protocol creates logical streams (channels) on the top of the single physical connection. Each stream has its unique ID which, to some extent, can be interpreted as a queue we know from messaging systems. Such design deals with major issues known from HTTP 1.x – connection per request model and weak performance of “pipelining”. Moreover, RSocket natively supports transferring of the large payloads. In such a scenario the payload frame is split into several frames with an extra flag – the ordinal number of the given fragment.

Reactiveness & Flow Control

RSocket protocol fully embraces the principles stated in the Reactive Manifesto . Its asynchronous character and thrift in terms of the resources helps decrease the latency experienced by the end users and costs of the infrastructure. Thanks to streaming we don’t need to pull data from one service to another, instead, the data is pushed when it becomes available. It is an extremely powerful mechanism, but it might be risky as well. Let’s consider a simple scenario: in our system, we are streaming events from service A to service B. The action performed on the receiver side is non-trivial and require some computation time. If service A pushes events faster than B is able to process them, eventually, B will run out of resources – the sender will kill the receiver. Since RSocket uses the reactor, it has built-in support for the flow control, which helps to avoid such situations.

We can easily provide the backpressure mechanism implementation, adjusted to our needs. The receiver can specify how much data it would like to consume and will not get more than that until it notifies the sender that it is ready to process more. On the other hand, to limit the number of incoming frames from the requester, RSocket implements a lease mechanism. The responder can specify how many requests requester may send within a defined time frame.

The API

As mentioned in the previous section, RSocket uses Reactor, so that on the API level we are mainly operating on Mono and Flux objects. It has full support for reactive signals as well – we can easily implement “reaction” on different events – onNext, onError, onClose, etc.

The following paragraphs will cover the API and each and every interaction option available in RSocket. The discussion will be backed with the code snippets and the description for all the examples. Before we jump into the interaction model, it is worth describing the API basics, as it will come up in the multiple code examples.

Setting up the connection with RSocketFactory

Setting up the RSocket connection between the peers is fairly easy. The API provides factory (RSocketFactory) with factory methods receive and connect to create RSocket and CloseableChannel instances on the client and the server side respectively. Second common property present in both parties of the communication (the requester and the responder) is a transport. RSocket can use multiple solutions as a transport layer (TCP, WebSocket, Aeron). Whichever you choose the API provides the factory methods which allows you to tweak and tune the connection.

RSocketFactory.receive()
        .acceptor(new HelloWorldSocketAcceptor())
        .transport(TcpServerTransport.create(HOST, PORT))
        .start()
        .subscribe();
RSocketFactory.connect()
        .transport(TcpClientTransport.create(HOST, PORT))
        .start()
        .subscribe();

Moreover, in the case of the responder, we have to create a socket acceptor instance. The SocketAcceptor is an interface which provides the contract between the peers. It has a single method accept which accepts the RSocket for sending requests and returns an instance of RSocket that will be used for handling the requests from the peer. Besides providing the contract the SocketAcceptor enables us to access the setup frame content. On the API level, it is reflected by ConnectionSetupPayload object.

public interface SocketAcceptor {
   Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);
}

As shown above, setting up the connection between the peers is relatively easy, especially for those of you who worked with WebSockets previously – in terms of the API both solutions are quite similar.

Interaction model

After setting up the connection we are able to move on to the interaction model. RSocket supports following operations:

The fire and forget, as well as the metadata push, were designed to push the data from the sender to the receiver. In both scenarios the sender does not care about the result of the operation – it is reflected on API level in a return type (Mono). The difference between these actions sits in the frame. In case of fire and forget the fully-fledged frame is sent to the receiver, while for the metadata push action the frame does not have payload – it consists only of the header and the metadata. Such a lightweight message can be useful in sending notifications to the mobile or peer-to-peer communication of IoT devices.

RSocket is also able to mimic HTTP behavior. It has support for request-response semantics, and probably that will be the main type of interaction you are going to use with RSocket. In streams context, such an operation can be represented as a stream which consists of the single object. In this scenario, the client is waiting for the response frame, but it does it in a fully non-blocking manner.

More interesting in the cloud applications are the request stream and the request channel interactions which operate on the streams of data, usually infinite. In case of the request stream operation, the requester sends a single frame to the responder and gets back the stream of data. Such interaction method enables services to switch from the pull data to the push data strategy. Instead of sending periodical requests to the responder requester can subscribe to the stream and react on the incoming data – it will arrive automatically when it becomes available.

Thanks to the multiplexing and the bi-directional data transfer support, we can go a step further using the request channel method. RSocket is able to stream the data from the requester to the responder and the other way around using a single physical connection. Such interaction may be useful when the requester updates the subscription – for example, to change the subscription criteria. Without the bi-directional channel, the client would have to cancel the stream and re-request it with the new parameters.

In the API, all operations of the interaction model are represented by methods of RSocket interface shown below.

public interface RSocket extends Availability, Closeable {

    Mono<Void> fireAndForget(Payload payload);

    Mono<Payload> requestResponse(Payload payload);

    Flux<Payload> requestStream(Payload payload);

    Flux<Payload> requestChannel(Publisher<Payload> payloads);

    Mono<Void> metadataPush(Payload payload);

}

To improve the developer experience and avoid the necessity of implementing every single method of the RSocket interface, the API provides abstract AbstractRSocket we can extend. By putting the SocketAcceptor and the AbstractRSocket together, we get the server-side implementation, which in the basic scenario may look like this:

@Slf4j
public class HelloWorldSocketAcceptor implements SocketAcceptor {

    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
        log.info("Received connection with setup payload: [{}] and meta-data: [{}]", setup.getDataUtf8(), setup.getMetadataUtf8());
        return Mono.just(new AbstractRSocket() {
            @Override
            public Mono<Void> fireAndForget(Payload payload) {
                log.info("Received 'fire-and-forget' request with payload: [{}]", payload.getDataUtf8());
                return Mono.empty();
            }

            @Override
            public Mono<Payload> requestResponse(Payload payload) {         
                log.info("Received 'request response' request with payload: [{}] ", payload.getDataUtf8());
                return Mono.just(DefaultPayload.create("Hello " + payload.getDataUtf8()));
            }

            @Override
            public Flux<Payload> requestStream(Payload payload) {
                log.info("Received 'request stream' request with payload: [{}] ", payload.getDataUtf8());
                return Flux.interval(Duration.ofMillis(1000))
                        .map(time -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()));
            }

            @Override
            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return Flux.from(payloads)
                        .doOnNext(payload -> {
                            log.info("Received payload: [{}]", payload.getDataUtf8());
                        })
                        .map(payload -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()))
                        .subscribeOn(Schedulers.parallel());
            }

            @Override
            public Mono<Void> metadataPush(Payload payload) {
                log.info("Received 'metadata push' request with metadata: [{}]", payload.getMetadataUtf8());
                return Mono.empty();
            }
        });
    }
}

On the sender side using the interaction model is pretty simple, all we need to do is invoke a particular method on the RSocket instance we have created using RSocketFactory, e.g.

socket.fireAndForget(DefaultPayload.create("Hello world!"));
For more examples of the methods available in the RSocket interaction model please visit GitHub →

More interesting on the sender side is the implementation of the backpressure mechanism. Let’s consider the following example of the requester side implementation:

public class RequestStream {

    public static void main(String[] args) {

        RSocket socket = RSocketFactory.connect()
                .transport(TcpClientTransport.create(HOST, PORT))
                .start()
                .block();
        socket.requestStream(DefaultPayload.create("Jenny", "example-metadata"))
                .subscribe(new BackPressureSubscriber());

        socket.dispose();
}

    @Slf4j
    private static class BackPressureSubscriber implements Subscriber<Payload> {

        private static final Integer NUMBER_OF_REQUESTED_ITEMS = 5;
        private Subscription subscription;
        int receivedItems;

        @Override
        public void onSubscribe(Subscription s) {
            this.subscription = s;
            subscription.request(NUMBER_OF_REQUESTED_ITEMS);
        }

        @Override
        public void onNext(Payload payload) {
            receivedItems++;
            if (receivedItems % NUMBER_OF_REQUESTED_ITEMS == 0) {
                log.info("Requesting next [{}] elements", NUMBER_OF_REQUESTED_ITEMS);
                subscription.request(NUMBER_OF_REQUESTED_ITEMS);
            }
        }

        @Override
        public void onError(Throwable t) {
            log.error("Stream subscription error [{}]", t);
        }

        @Override
        public void onComplete() {
            log.info("Completing subscription");
        }
    }
    
}

In this example, we are requesting the stream of data, but to ensure that the incoming frames will not kill the requester we have the backpressure mechanism put in place. To implement this mechanism we use request_n frame which on the API level is reflected by the subscription.request(n) method. At the beginning of the subscription [onSubscribe(Subscription s)], we are requesting 5 objects, then we are counting received items in onNext(Payload payload). When all expected frames arrived to the requester, we are requesting the next 5 objects – again using subscription.request(n) method. The flow of this subscriber is shown in the diagram below:

Implementation of the backpressure mechanism presented in this section is very basic. In the production, we should provide a more sophisticated solution based on more accurate metrics e.g. predicted/average time of computation. After all, the backpressure mechanism does not make the problem of an overproducing responder disappear. It shifts the issue to the responder side, where it can be handled better. Further reading about backpressure is available here on Medium and here on GitHub.

Summary

In this article, we discuss the communication issues in the microservice architecture, and how these problems can be solved using RSocket. We covered its API and the interaction model backed with simple “hello world” example and basic backpressure mechanism implementation.

In the next articles of this series, we will cover more advanced features of RSocket including LoadBalancing and Resumability as well as we will discuss abstraction over RSocket – RPC and Spring Reactor.

Please notice that fully working examples are provided here →
Share the story

Related