Who is this article for?
This article is written for Angular developers, but a lot of the RxJS content here can be applied to any framework!
I came across the “late subscriber” problem when trying to understand common RxJS difficulties developers can experience when trying to manage local state, http calls, and many other things in combination within their Angular Components.
In the end I was able to combine all the different solutions together in a light-weight component state management library called 📦 @rx-angular/state, and even a vanilla JS version for the other frameworks like vue, reactor svelte! 📦 rxjs-state.
If you’re interested in what went into all of this, check out the complete research paper here: research-on-reactive-ephemeral-state-in-component-oriented-frameworks
Before we get started, let’s make sure we’re all on the same page with a few Rx concepts we’ll be looking at throughout this article:
Producer
I use the word “producer” to describe the part beginning of an RxJS process where values are produced. This can be a RxJS creation function like interval
that produces a series of incrementing numbers, or some logic that calls a Subjects
next
method for example.
Consumer
“Consumer” is the part of your code that subscribes to the produced values and processes them. A consumer could be in a subscription callback or even a side effect operator.
Composition
“Composition” means we process newly arriving values - whether it be applying additional behavior and/or combining the values with other values. Think of composition as how we’re piercing together all of the reactive streams data.
Table of Content
What is the RxJS Late Subscriber Problem?
In a nutshell, this problem occurs when incoming Rx values arrive before the subscription has happened.
Let's take a look at an example:
Let’s say we have some state coming in through an @Input()
decorator which arrives before the view gets rendered and we’re using an async pipe in the template - which is unable to receive the value right away.
@Component({ selector: 'app-late-subscriber', template: ` {{ state$ | async | json }} `,})export class LateSubscriberComponent { state$ = new Subject<ApplicationState>(); @Input() set state(v) { this.state$.next(v); }}
In this case, the view is late in subscribing to the values from the @Input()
properties.
We call this situation the “late subscriber” problem.
There are several other situations that can cause a similar problem:
- Input Decorators
- Transporting values from
@Input
toAfterViewInit
hook - Transporting values from
@Input
to the view - Transporting values from
@Input
to the constructor
- Transporting values from
- Component And Directive Life Cycle Hooks
- Transporting
OnChanges
to the view - Getting the state of any Life Cycle hook later in time (important when hooks are composed)
- Transporting
- Local State
- Transporting the current local state to the view
- Getting the current local state for other compositions that involve global state
A quick solution here would be replaying the latest notification.
As common quick fix we could place a BehaviourSubject
or better a ReplaySubject
with a bufferSize of 1
.
This would in fact cache the latest emitted value and replay it when the async pipe subscribes.
Primitive Solution
@Component({ selector: 'app-late-subscriber', template: ` {{ state$ | async | json }} `,})export class LateSubscriberComponent { state$ = new ReplaySubject<ApplicationState>(1); @Input() set state(v) { this.state$.next(v); }}
(used RxJS parts: ReplaySubject)
BUT - This quick solution has 2 major caveats!
First Caveat:
The downside here is that we can only replay the latest value emitted.
Replaying more values would cause problems for later compositions of this stream,
as a new subscriber would get all past values of the @Input
binding.
And we definitely don’t want that to happen in our application!
More importantly is the fact that we had to push workload to the consumer. We can not assume everybody will handle this the same way.
If we would make every source replay at least the last value we would have to implement this logic in the following places:
- View Input bindings (multiple times)
- View events (multiple times)
- Other Service Changes (multiple times)
- Component Internal interval (multiple times)
It would also force different parts of your application to cache values and increase overall memory consumption. This method also forces our self to put a replay behaviour in our local state management as well as the third party user to implement this too!
IMHO this isn’t scalable. 👎
Another downside we could think of is the bundle size of ShareReplay
.
This can be ignored IMHO as a reply operator might be used somewhere else too in our architecture.
Therefore this might not be the biggest problem.
Second Caveat:
The second and even trickier caveat is that the composition is still cold. As we rely on the consumer to initialize the state composition.
Before we dive deeper, let's quickly clarify hot/cold and unicast/multicast and make sure we’re all on the same page.
So in RxJS, what do we mean by unicast or multicast?
Unicast
The producer is unique PER subscription.
Any creation operator is unicast. (publish operators are not yet refactored to creation operators, but they would be the only exception)
interval
for example would call setInterval
for every subscriber separately.
Multicast
The producer is shared over ALL subscriptions.
Subject
for example emits it's value to multiple subscribers without executing some producer logic again.
Cold composition
The internal logic of the observable is executed only UPON subscription.
The consumer controls the moment when internal logic is executed over the subscribe
function call.
The interval
creation operator, for example, will only start it's internal tick if we subscribe to it. Also, nearly every pipe-able operator will also ONLY execute if we have an active subscriber.
An interesting example for a cold operator is share
.
Even if it multicasts its notifications to multiple subscribers,
it will not emit any notifications until at least one subscriber is present.
So it's cold at the beginning but multicast when the first one subscribed. ⭐️
Hot composition
The internal logic is executed independently from any consumer.
A Subject
for example can emit values without any consumer present.
There are also operators that can turn all logic into a hot path.
Multicast
and publish
operators mostly return a ConnectableObservable
.
If we call connect
on it, we can connect to the source. This means we subscribe to its notifications.
In turn this starts to execute the logic and all the operators in between publish
and it's source observable.
So now even if we have no subscriber present, incoming emissions will get processed.
Now that we have a better understanding of cold/hot composition, let’s look at an example use case when dealing with local state within our components.
When we’re dealing with the "state" of a component, there are many things we often have to handle such as:
- View Interactions ( e.g. button click )
- Global State Changes ( e.g. HTTP update )
- Component State Changes ( e.g. triggered internal logic )
We know that putting all this (state management) logic in the component class isn’t a best practice.
So what’s the problem here?
Doing so lacks any separation of concerns! Not to mention the fact that we would have to implement similar code over and over again (in many other components that may need to handle their own local “state”).
Ideally we want to create logic to handle the "state" within our components in a compositional way that can be reused and independent of any one component!
So far all of our RxJS sources got subscribed to when our components view was ready and we rendered the components state.
As the @Input()
from the view is a hot producer of values (same with injected services), we have to decouple the service(s) that handle component state from these other sources.
What are some ways we can solve our local state problem?
Since our components have hot
sources and we have to compose them in a more ideal compositional fashion.
If we try to “compose” our state, we have to consider that even if we try to use an operator like scan
to achieve this - the scan operator also returns a cold observable. Remember that nearly every operator returns a cold source - even if it was hot before.
So no matter what we do (before or after an operation) - we get a cold observable, and we end up having to subscribe to that operation in order to trigger the entire composition.
So if some of sources might be cold
, what’s the best way to solve this issue?
- Make all sources replay (at least) the latest value, pushing the workload to all relevant sources.
- Make the composition hot as early as possible (push workload to the component related part)
We already looked at #1
above by utilizing the ReplaySubject()
, so let's dive into how can we make sure the composition is "hot" as early as possible.
What could be the earliest moment to make the composition hot?
In Angular, we know that Services, even if locally provided, are instantiated first, before the component.
If we would put it there we could take over the workload from:
- View Input bindings (multiple times)
- View events (multiple times)
- Component Internal interval (multiple times)
- Locally provided services
- Global services
Let's take a look at a simple example where we rely on the consumer to start the composition.
Service:
export class SomeService { commands$ = new Subject<SomeCommands>(); composedState$ = this.commands$.pipe( tap((v) => console.log('compute state ', v)), scan( (acc, i) => { return { sum: acc['sum'] + i['sum'] }; }, { sum: 0 } ), // operator here shareReplay({ refCount: true, bufferSize: 1 }) );}
(used RxJS parts: scan)
In this service we could try to solve our problem by using:
- share()
- shareReplay({refCount: true, bufferSize: 1})
- shareReplay({refCount: false, bufferSize: 1})
Component:
@Component({ selector: 'cold-composition', template: ` <h1>Cold Composition</h1> <button (click)="updateState()">update state</button><br /> <label><input [(ngModel)]="isOpen" type="checkbox" /> Show result</label> <div *ngIf="isOpen"> someService.composedState$: {{ someService.composedState$ | async | json }} </div> `, providers: [SomeService],})export class ColdCompositionComponent { isOpen = false; constructor(public someService: SomeService) {} updateState() { this.someService.commands$.next({ sum: 1 }); }}
If we run the code and click the button first and then open the result area we see we missed the values emitted before opening the area.
No matter which above method we try, nothing seems to work! The reason for that is all those operators relying on the subscriber to initialize logic. We always lose values if no subscriber is present.
Even if the source is hot (the subject in the service is defined on instantiation) since we used scan
, it actually made the stream cold
again.
This means the composed values can be received only if there is at least 1 subscriber.
Since we utilized Angular's async
pipe, this created a subscription automatically for us (behind the scenes), allowing everything to work as expected!
Let's see how we can implement the above in a way we could run the processing of emitted values immediately (make the composition hot):
Hot Composition Service:
export class SomeService { subscription: Subscription; commands$ = new Subject<{ sum: number }>(); composedState$ = this.commands$.pipe( tap((v) => console.log('compute state ', v)), scan( (acc, i) => { return { sum: acc['sum'] + i['sum'] }; }, { sum: 0 } ), // operator here publishReplay(1) ); constructor() { // Composition is hot from here on this.subscription = this.composedState$.connect(); }}
(used RxJS parts: publishReplay, Subscription)
In this example, we kept the component untouched and only applied changes to the service.
We used the publishReplay
operator to make the
source replay the last emitted value by using 1
as our bufferSize
.
In the service constructor, we called connect
to make it hot, meaning we subscribe to the source within the publishReplay
operator.
That's it! :)
So what did we learn here?
If we take a look at our operator reference list at the end of this document we can see the concepts we needed to understand to solve our problem were:
- unicast vs. multicast
- hot vs. cold
The main outcome here was that we should ensure that the moment of computation of states (or in other words the composition of observables triggered by a subscribe call) is not controlled by the subscriber. It should be HOT.
An example implementation of our learning can be found in the resources.
Based on that solution and other related problems of reactive ephemeral state I implemented a lightweight flexible state management lib called 📦 @rx-angular/state.
Resources
- 📦 @rx-angular/state
- 💾 The late subscriber problem - source code
- 💾 Col Composition - source code
- 💾 research-on-reactive-ephemeral-state-in-component-oriented-frontend-frameworks
- 📦 rxjs-state
Used RxJS operator reference list:
RxJs Enterprise Training & Workshops
We offer Remote Enterprise trainings & workshops on RxJs and a variety of other topics including:
- Reactive Programming
- Angular (basic & advanced concepts)
- NgRx
- NestJS (and Node.js ecosystem)
- ...and much more!
Are you looking to help level-up your team? Reach out, we'd love to hear from you!
Request WorkshopLearn NestJS - Official NestJS Courses 📚
Level-up your NestJS and Node.js ecosystem skills in these incremental workshop-style courses, from the NestJS Creator himself, and help support the NestJS framework! 🐈🚀 The NestJS Fundamentals Course is now LIVE and 25% off for a limited time!
🎉 NEW - NestJS Course Extensions now live!
- NestJS Advanced Concepts Course now LIVE!
- NestJS Authentication / Authorization Course now LIVE!
- NestJS GraphQL Course (code-first & schema-first approaches) are now LIVE!
- NestJS Authentication / Authorization Course now LIVE!