Understanding RxJS use cases (part I)
Original cover photo by Oskar Yildiz on Unsplash.
When to use RxJS?
On the web, there are a bunch of articles and learning courses that can help us learn and understand RxJS functions, Observables, Subjects and operators fairly well. But sometimes knowing about the existence of some functions is not enough to really harness the power of reactive programming - we need to also become conscious about the scenarios in which different tools for the RxJS toolkit may apply - in essence, understand when to use which. In this article, I am going to go in depth into scenarios where different tools can be used to make our code cleaner, more maintainable and more readable.
In the first part, we are going to look into the most popular functions (usually used for combining Observables) that work with streams, rather than operators. There are too many operators to for them into one article, so we will discuss their use cases in the upcoming articles
Combining Observables
We probably have heard about a bunch of functions that help us combine different Observables into a single stream of data, like combineLatest
, merge
, or zip
. We might even be familiar with their differences - but sometimes recognizing that a certain problem has this solution can be tougher. Let’s now examine the functions that allow combining streams and see which apply when:
merge
: we probably do not care about the nature of the events
The merge
operator is used when we want to create a stream that emits whenever a source Observable (one out of a bunch) emits a value. Here is an example:
// create three observable streams
// one of strings, other of numbers, and the other of booleans
const numbers$ = interval(1000).pipe(
map(i => i + 1),
take(3),
); // take only the first three
const strings$ = interval(2000).pipe(
map(i => 'a' + i),
take(3),
);
const booleans$ = interval(3000).pipe(
map(i => i % 2 === 0),
take(3),
);
// merge the three streams together
const merged$ = merge([numbers$, strings$, booleans$]);
// subscribe to the merged stream
merged$.subscribe(console.log);
Now, in this example we have a stream of numbers, strings and booleans merged into one. They are emitted at various intervals, so we get a stream which at some point emits a number, at some point a boolean, and maybe a string at another point - so, naturally a very mixed stream.
So that the emitted value is different every time in type, it means that it is very probable we do not really care about the emitted value, just about the fact that anything was emitted. While this sounds not very useful, there are actually a bunch of scenarios where this might be helpful.
Imagine this: we want to implement if the user has been inactive for a while and maybe log them out or show a notification. To do this, we need to monitor user activity, like clicks, scrolls and right clicks and such, and act when there have been no events for a certain time periods. Here is a code example:
const ACTIVE_EVENTS = [
'click', 'scroll', 'contextmenu', 'dblclick', 'mousemove',
];
// you can add as many events as you want to define "being inactive"
merge(...ACTIVE_EVENTS.map(event => fromEvent(document, event))).pipe(
bufferWhen(() => interval(10_000)),
filter(events => events.length === 0),
).subscribe(() => alert('You have been inactive for ten seconds!'))
In this example, we create a bunch of Observable streams (using fromEvent
) to monitor different browser events that might indicate user activity, and then merge them into one stream to act when there have been no events for a while. We do not care that the event is a double click, a scroll, a usual click, but just about the fact that the event happened. So, the line of thinking is as follows:
I need to know when some event happened, and only care about the fact of that event, so I probably have to use “merge”
(we use bufferWhen
in this example, the explanation to as how this works is out of scope of this article, here you can read about the operator itself, and here is a link to a Twitter discussion that offers other explanations of this example)․
combineLatest
: we care about values from other sources too
Sometimes we have scenarios when two (or more) separate events both trigger a change in the same part of the UI; but contrary to the previous scenario, we do care about the emitted values, and, more specifically, we need both (or more) of them to calculate some final value which will be reflected in some form in the UI.
Let’s imagine the following: we have a form, where the user has to enter some data in several fields, but the fields are dynamic (let’s say, coming from backend API endpoint), and, an even more complex scenario, depending on what the user inputs in some fields may affect the validations in other fields. For example, if the user selects that they are going to provide their address separately, the “Address Line” field must become required.
So, on one hand, we have a form, which can emit a value that might trigger a change in the UI, and, on the other hand, we have a dynamic value coming from a backend (for this example, we load that data from the backend via a custom service using HttpClient
, which returns an Observable). Now, any change in either the form or the dynamic fields must trigger a change in the UI; and we care about the emitted values themselves, and need both to calculate a final result (and, in this case, perform some side effects: set the validations for some FormControl
s).
Let’s see how we can wrap this beautifully with “combineLatest”:
export class ExampleComponent {
// rest of the component code omitted for the sake of brevity
dynamicControls$ = this.controlsService.getDynamicControls();
formValue$ = combineLatest([
this.form.valueChanges,
this.dynamicControls$,
]).pipe(
tap(([value]) => {
if (value.attachmentsRequired) {
this.controls.attachments
.setValidators(Validators.required);
} else {
this.controls.attachments.clearValidators();
}
}),
map(([value, controls]) => {
const controlsValue = { ...value, ...controls };
return controlsValue;
}),
);
constructor(
private readonly controlsService: ControlsService,
) { }
}
So here we take both values, perform some side effects depending on a condition from one of them, and then merge them into a singular result. merge
from the previous example would not have worked in this case, as it only emits one value at a time, whereas “combineLatest” emits the new value and the latest emitted value from the other Observable.
forkJoin
: only the final result matters
Of course, forkJoin
is probably the most well known function that combines Observables in RxJS. Usually, when talking about this function, we say it acts the same way as Promise.all
.
If we need several instances of data which are retrieved from different APIs, we can use forkJoin
to wait for all of them to be available and only than do something on the UI:
homePageData$ = forkJoin([
this.userService.getUserInfo(),
this.dataService.getData(),
this.otherDataService.getOtherData(),
]).pipe(
map(([userInfo, data, otherData]) => ({
userInfo,
data,
otherData,
})),
catchError(error => of({/*error object*/})),
);
So, in this case, we usually use this operator with HTTP calls.
pairwise
: examining previous values
Sometimes we need to combine the value of an Observable with some value previously emitted by the same Observable.
Imagine this scenario: we have a form that is prefilled with some value from the get go, for example, an edit profile page from. We have a “Save” button, but we want it disabled until the user changes something in the form.
For simple cases, we can use the “dirty” property of the form, but what if we want to account also for the use case when the user changes something, that changes it back to how it was? The form would still be marked as dirty, but the value is exactly the same as in the beginning. So we need to compare the form value to the previous one to make sure something is different. Here is where pairwise
comes to our aid:
disabled$ = this.form.valueChanges.pipe(
pairwise(),
map(([prev, current]) => {
return this.utilitiesService.isEqual(prev, current);
// will disable the button if the form has not changed
}),
);
(the isEqual
method deep compares two objects which are the previous and current value of the form.
withLatestFrom
: take into account some other value, but ignore its emissions
Now sometimes there are scenarios where we have to include a value from some Observable in calculations performed on our source Observable, but we do not want to trigger those actions when the other one triggers, as opposed to combineLatest
, which triggers every time any one of the source Observables emits.
Imagine we want to redirect from a page on successful login, but only when a “redirect_url” query parameter is present. We can take that value from the queryParamMap
Observable, but we do not want to trigger a redirect when the query parameter changes for whatever reason, only when the successful login HTTP call has been finished:
this.authService.login(credentials).pipe(
withLatestFrom(
this.route.queryParamMap.pipe(startWith(new Map())),
),
).subscribe(([, params]) => {
if (params.get('redirectUrl')) {
const navUrl = params.get('redirectUrl') ?? '/home';
this.router.navigateByUrl(decodeURIComponent(navUrl));
}
});
Now in this case we used withLatestFrom
to get the query params when the login is successful, but the action will not be performed when the query params have changed, but only when the login call has been successfully completed.
What’s next?
In this article, we discussed what use cases exist for functions and operators that allow combining Observables. In the next article, we are going to examine use cases for individual operators that work on the streams themselves, to hopefully start using them more.