47 Degrees joins forces with Xebia read more

The Road to (Functional) Reactive Programming in JavaScript, Part 3

The Road to (Functional) Reactive Programming in JavaScript, Part 3

This is the final piece of our three-part series on the road to reactive programming in JavaScript. This road is comprised of:


The road ahead: Streams

At the time of writing this, there’s a proposal to add Observables natively to the JavaScript spec in progress. It’s in the first stage of the TC-39 technical committee process, meaning that it could still have a long ways to go, but the fact that it’s there is promising.

Alternative to the slow TC-39 acceptance process is another proposal that’s been submitted to the WHATWG to add Observable types directly to the DOM.

The Observer pattern

What is a stream?

A stream can be considered as a sequence of ongoing events ordered in time. It can emit three different things: a value, an error, or a completed signal.

Now consider that we capture those emitted events asynchronously, with some different functions that will execute when a value, error, or completed events are emitted. Maybe you can even omit the last two events and just focus on defining the function for values. The listening to the stream is called subscribing. The functions we are defining are observers. The stream is the subject or observable being observed.

This is the Observer Pattern.

What is an observable?

Observables are an evolution of the promises + generator concept, but with a slightly different approach. You can consider them as a “push” equivalent to Iterables, e.g. Arrays, which behave in a “pull” way. Recalling the generator example:

   // (...)
   // With the generator, we create an iterator: it
   it = gen.apply( this, args );
   var next = it.next();
   return next.value.then(
      function(value) {
         it.next(value);
      },
      function(error) {
         it.throw(error);
      }
   );
   // (...)

When you want a new value, you call the next method to pull the value. By contrast, with an observable, the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

Observables work somewhat like an inverted generator function and are inherently asynchronous. Like generator functions, they embrace the idea of iteration and generally do this by providing familiar Array functions like forEach() and map() in its spec.

This is a basic observable example (considering that we got an Observable class following the shape of Observable proposal API):

// Consider we already got an Observable class
const basicStream = new Observable((observer) => {  
    setTimeout(() => observer.next('hello'), 1000); // First value emiited
    setTimeout(() => observer.next('world'), 2000); //Second value emitted
    setTimeout(() => observer.complete(), 3000); // Complete signal emitted
});

// `.subscribe` starts the listening process
basicStream.subscribe(  
   (msg) => console.log(`Received event message: ${msg}`), // next
   (error) => console.log(`Error: ${error.message}`), // error
   () => console.log('Stream complete') // complete
);
// Generally the subscribe method returns a subscription object, which can then be used to unsubscribe.

In this example, basicStream is an Observable. Observables are lazy, meaning nothing happens until .subscribe is called. Once called, the three handler functions are mapped to a subscriber object and registered.

  • The first function will fire every time the Observable’s internal .next() method is triggered.
  • The second function will fire if the Observable’s internal .error() method is triggered.
  • The last function will fire when the Observable’s internal .complete() method is triggered, considering there have been no errors.
What observables imply

The approach to handle async with observables is somewhat different. But the question is, if async/await already offered almost all of the needed functionality, why should we still consider observables?

Well, the concept isn’t new. Event buses or typical click events are really asynchronous event streams, on which you can listen and react accordingly. With observables, you can create data streams of anything, not just mouse events. Anything can be a stream: variables, properties, user inputs, etc. For example, a websocket connection would be a data stream in the same fashion that click events are. You can observe that stream and create some side effects.

This doesn’t mean that everything should necessarily be treated as an observable. Simple, one-time asynchronous calls can still be handled with callbacks or promises. Complex series of promises can also be simplified into synchronous-looking code with async functions. Also, in classic/simple webpages, interaction is essentially about submitting a form to the backend and performing basic rendering to the frontend.

But the benefits of observables are evident in modern web apps that offer a highly interactive experience to the user, with multiple UI events, data events, real-time data. They can be really helpful when a data connection is not reliable, or the data behaves like a continuous stream instead of an eventual request-response, or your async code starts to look messy from having diverse endpoints that behave differently.

The following are some of the features of Observable:

  • Declarative syntax
  • Clean way of handling errors
  • Replayable/retryable
  • Composable
  • Cancellable
  • Cache-able
  • Shareable
  • Can emit multiple values

If you remember the concerns we’ve highlighted on async flow management architecture throughout this series, you can see that this pattern takes care of them all. Observables can be considered the ideal way to interact with asynchronous sequences (streams) of multiple items.

(Functional) Reactive Programming

Now that we have a pattern that shapes these (async) streams, what can we do to empower it? Well, it would be great, considering that a stream is analogous to an iterator, if we could work with tools like the ones available to manage Arrays or other iterables.

Reactive programming is programming with asynchronous data streams.

André Staltz

Enter ReactiveX spec.

ReactiveX is a library, or a series of libraries really (available for Java, Scala, JavaScript…), that extends the observer pattern to support sequences of data and/or events. It also adds operators that allow you to compose sequences together declaratively, while abstracting away concerns about inner implementations and associated issues like synchronization, concurrent data structures, or non-blocking I/O.

RxJS is the ReactiveX implementation JavaScript. In RxJS, the operators are built as pure functions.

[Observer pattern + Operators = RxJS => (Functional) Reactive Programming in JavaScript]

Misnomer

Throughout this series, we noticed that the term Functional Reactive Programming must be handled carefully. This is because that paradigm is different from what can be achieved with the techniques we’ve described thus far. I’ve used the phrase (Functional) Reactive Programming, in attempts to point out that this can be considered reactive programming with added concepts of functional programming, hence the parens. Though the name can be misleading, I can’t think of any other term that couldn’t lead to confusion too: functional, reactive programming, reactive functional programming, evented functional programming?

As stated by the ReactiveX documentation:

It is sometimes called “functional reactive programming” but this is a misnomer. ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time. (See Conal Elliott’s work for more-precise information on functional reactive programming).

What are operators?

To answer that, let’s first say that a pure function is a function where the return value is only determined by its input values, without detectable side effects. Operators fit this definition:

An Operator is a method on the Observable type which, when called, creates a new Observable based on the input one, without modifying it.

RxJS is mostly useful for its operators, even though the Observable is the foundation. Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner. They enable a functional programming style of dealing with streams/collections with operations like map, filter, concat, merge, etc.

They can be categorized as different types based on their purpose:

  • Creation: create, range
  • Transformation: buffer, map
  • Filtering: distinct, take
  • Combination: concat, merge
  • Multicasting: publish, share
  • Error handling: catch, retry
  • Utility: do, delay
  • Conditional: isEmpty, find
  • Mathematical: count, reduce

If none of these fit your needs, custom operator creation is easy following its definition. For example, we can create a custom operator function that multiplies each value received from an input observable by 10:

function multiplyByTen(input) {
   // We return another Observable
   return Observable.create((observer) => {
      // We subscribe to the input observable to define
      // next, error and complete functions based on it
      input.subscribe(
         (value) => observer.next(value*10), // next
         (error) => observer.error(error), // error
         () => observer.complete() // complete
      );
   });
}

const input = Observable.from([1, 2, 3, 4]);
const output = multiplyByTen(input);
output.subscribe(x => console.log(x)); // 10, 20, 30, 40

A good reference for checking the behaviour of each operator is the project RxMarbles, which offers interactive marble diagrams, or this other alternative, which shows them as data flow visualizations.

What RxJS implies

RxJS combines the Observer pattern with the Iterator pattern and functional programming concepts to look for an ideal way of managing value sequences spread over time. This model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays.

Using RxJS frees you from creating tangled webs of callbacks or promise chains, thereby making your code more readable and less prone to bugs. Besides that, relying on data generated by using pure functions will make your code easier to maintain and control. Otherwise, it could affect external parts of the code not related to the origin of the changes.

Let’s see an example:

var count = 0;
const button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));

Using RxJS, side effects can be avoided:

const button = document.querySelector('button');
Observable.fromEvent(button, 'click')
   // scan calculate values through input param, right value is the initial one
   .scan(count => count + 1, 0)
   .subscribe(count => console.log(`Clicked ${count} times`));

The main idea of (functional) reactive programming is to build programs in a declarative way, by defining the streams, how they are linked together, and what happens if a new stream value arrives over time.

Programs designed following this principle can be built with very little to no application state variables, which are in general a source of errors. The application will have state, but it’s typically stored on certain controlled places, not spread out over the application code itself.

A final example

Let’s see a pretty straightforward, and real-world, use case where we implement an input autocomplete functionality using RxJS:

// We are creating an Observable from a DOM event
const inputStream = Observable.fromEvent(input, 'keyup')
   .map(e => e.target.value)
   .filter(value => value.length > 2)
   .distinctUntilChanged()
   .debounceTime(500)
   .mergeMap(word => this.http.get('...word...'))
   .retry(2)
   .subscribe(res => console.log(res))

What’s happening here can easily be followed:

  • Listen keyboard presses.
  • Extract the value associated with the input on the key event.
  • Filter the values shorter than 3 chars.
  • Only emit when the current value is different from the previous one.
  • Discard emitted values that take less than the 500ms between output.
  • Make an external http petition and get a result.
  • In case of failure, retry a maximum of two times.
  • Finally, we react to this result, printing it through the console.

As you can see, RxJS allows you to solve hard problems with less code, promotes maintainability, readability, flexibility, and composability. Learning how to handle asynchrony this way instead of using promises and callbacks will also greatly reduce the probability that you are leaking resources.

Just like it does for backend, big-data management, microservices communication, etc., a (functional) reactive programming approach works great for event-heavy frontends and webapps, and it is probably considered the best way to handle async in JavaScript.


Sources and additional reading:

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.