RxJS subscription management with Angular
Subscription of observables is the bread and butter of using RxJS. With each subscription, we create a Subscription
and it is being held in memory. If not handled, the subscription will be kept in memory and potentially cause memory leak.
With this article, I'm going to cover the various methods to manage subscriptions and how to decide on which method to use. RxJS subscription management is one of the common mistakes among developers, partly due to the learning curve of RxJS, partly the opaqueness of subscription, yet it is essential to using RxJS effectively.
Put simply, managing RxJS subscription is to know when to unsubscribe. Anytime there is a subscription (a .subscribe()
call), developer should know or be aware of when it will be unsubscribed, be it after nth emission, or when the component has been destroyed.
I'm going to cover 6 ways we can use to manage our subscription, and when to use them, not including any external libraries / plugins besides RxJS itself. Namely async
pipe, first
operator, take
operator, takeWhile
operator, takeUntil
operator, and finally the .unsubscribe()
call. Which to use is dependent on the context, ie. is the piped function (ie. side effects) complex? How often or how long should the subscription be kept alive?
async pipe
The first method that we should always try to reach for is the async
pipe method. With async
pipe, we won't need to handle the unsubscribe manually, both subscribe and unsubscribe will be handled for you in the Pipe
. It unsubscribes from the observable as soon as the component is destroyed. Internally, it also handles the change detection for you. With async
pipe, there'll be a lot less code in your .ts file. Less code, less bug.
async pipe used in app.component.html
<p>{{ data$ | async }}</p>
However, there's one caveat with async
pipe method, it is that every time the UI re-renders in Angular, any async
piped observable will trigger, causing any functions, operations or side effects in between to be triggered, as many times as the UI re-renders. If you have an expensive operation in between the pipe, it'll be heavy on the resources. Keep this in mind and move the heavy operations to a different stream of observable and handle them in the component's class (.ts) file.
Also, async
pipe is really only applicable when the data needs to be printed in template. Still, it should be the first method we reach for to manage observable subscription when the situation allows.
Reference: AsyncPipe
first operator
Despite the name, the first
operator is the second method we consider. With first
operator, your observable subscription will be unsubscribed as soon as there's one emission that passes through. We can pass in a function as our predicate / validator to ensure that the one emission through this operator is the value that we want. This is the operator to go for when we know very surely that we'll only need one emission from the subscription.
const data$ = from([0, 1, 2, 3]);
// without requirement
// unsubscribed with one emission
data$.pipe(
first()
).subscribe();
// output: 0
// with guard / validator function
// ensures only truthy value can pass through
// will only unsubscribe after one truthy value
data$.pipe(
first(value => !!value)
).subscribe();
// output: 1
Reference: first
take operator
Similar to first
operator, take
operator accepts a finite number of emission, difference being that it can take more than one emission. One other difference being first
will emit an error if the stream completes before a value is emitted, while take
won't. Use this operator when you know only a finite number of emission is needed. You may also consider adding a filter
operator to guard against the emissions to ensure that the nth number of emission that you get are of value to you.
const data$ = of(0, 1, 2, 3);
// takes 3 truthy value
data$.pipe(
filter(x => !!x)
take(3)
).subscribe();
// output: 1, 2, 3
Reference: take
takeWhile operator
takeWhile
operator will keep the subscription alive while a condition is true. This operator will take in a predicate / validator function to determine the condition to be true or false.
const data$ = of(1, 1, 1, 2, 4, 8);
// take while value is less than 4
data$.pipe(
takeWhile(value => value < 4)
).subscribe();
// output: 1, 1, 1, 2
It is not common to reach for this operator, however there are certain situation that calls for it, ie. we want the subscription to stop as soon as a condition is fulfilled. Another instance, we want to remain subscribed as soon as we hover on an element, and unsubscribe as soon as we leave.
Reference: takeWhile
takeUntil operator
takeUntil
operator accepts an observable as its notifier that will tell it when to end the subscription. When the notifier is passed in to takeUntil
operator, it will subscribe to that observable internally, and as soon as there's one emit from the notifier, it will unsubscribe from both the source and the notifier observable. Do note that the notifier observable is unsubscribed by takeUntil
internally, hence it is unecessary to unsubscribe or complete the notifier observable if nothing else is subscribed to the notifier.
This operator is likely the most common operator that we'll use. If none of the above mentioned operators worked for your situation, then takeWhile
will most likely be the one for the job. It is most often used to keep a subscription alive until an event happened, i.e. when a component is destroyed.
takeUntil to end subscription when component is destroyed
class AppComponent implements OnInit, OnDestroy {
private readonly destroyed$ = new Subject<void>();
ngOnInit() {
const interval$ = interval(1000);
interval$.pipe(
tap(val => console.log(val)),
takeUntil(this.destroyed$)
).subscribe();
}
ngOnDestroy() {
this.destroyed$.next();
}
}
When using takeUntil
to end a subscription, make sure that you place it last in the chain of operators to ensure that it covers all the streams in between. This will prevent the subscription leak to streams that are after the takeUntil
operator.
takeUntil subscription leak
const streamA$ = interval(1000);
const streamB$ = interval(50);
const notifier$ = new Subject();
streamA$.pipe(
takeUntil(notifier$),
switchMap(() => streamB$)
).subscribe();
notifier$.next();
In the snippet above, streamA$
would end after the notifier$
emitted, but streamB$
would not end, it's subscription would be kept alive and that is the takeUntil
leak.
Reference: takeUntil
.unsubscribe()
Finally, we can simply call .unsubscribe()
of a Subscription
to end the subscription. You'll have to first assign your subscription to a variable, or a class property, then call .unsubscribe()
when it is time to end the subscription.
single subscription .unsubscribe()
const interval$ = interval(1000);
const subscription = interval$.subscribe();
subscription.unsubscribe();
However, because we'll have to assign each subscription to a variable, it is a lot more work / code to do, and it is done very manually and imperatively compared to the few above mentioned methods. It is especially so when there are more than one subscription.
batch subscriptions .unsubscribe()
const subsciptions = [];
const interval$ = interval(1000);
const subscriptionA = interval$.subscribe();
const subscriptionB = interval$.subscribe();
subscriptions.push(subscriptionA);
subscriptions.push(subscriptionB);
subscriptions.forEach(subscription => subscription.unsubscribe());
Reference: Subscription
Conclusion
So I've covered 6 ways we can manage our RxJS subscriptions, ordered according to a heirarchy of decisions. Learn them, and decide for yourself which one is best suited to solve your problem in the context.
Worth mentioning, there's a library named UntilDestroy that can help you unsubscribe from observables when the component is destroyed. It's like takeUntil
except you write less code.
That's all I have to share. Happy coding!