Dealing with Late Subscribers in RxJS.

Michael Hladky | Trilon Consulting
Michael Hladky

Who is this article for?

This article is written for Angular developers, but a lot of the RxJS content here can be applied to any framework!

I came across the “late subscriber” problem when trying to understand common RxJS difficulties developers can experience when trying to manage local state, http calls, and many other things in combination within their Angular Components.

In the end I was able to combine all the different solutions together in a light-weight component state management library called 📦 @rx-angular/state, and even a vanilla JS version for the other frameworks like vue, reactor svelte! 📦 rxjs-state.

If you’re interested in what went into all of this, check out the complete research paper here: research-on-reactive-ephemeral-state-in-component-oriented-frameworks


Before we get started, let’s make sure we’re all on the same page with a few Rx concepts we’ll be looking at throughout this article:

Producer

I use the word “producer” to describe the part beginning of an RxJS process where values are produced. This can be a RxJS creation function like interval that produces a series of incrementing numbers, or some logic that calls a Subjects next method for example.

Consumer

“Consumer” is the part of your code that subscribes to the produced values and processes them. A consumer could be in a subscription callback or even a side effect operator.

Composition

“Composition” means we process newly arriving values - whether it be applying additional behavior and/or combining the values with other values. Think of composition as how we’re piercing together all of the reactive streams data.


Table of Content


What is the RxJS Late Subscriber Problem?

In a nutshell, this problem occurs when incoming Rx values arrive before the subscription has happened.

Let's take a look at an example:

Let’s say we have some state coming in through an @Input() decorator which arrives before the view gets rendered and we’re using an async pipe in the template - which is unable to receive the value right away.

@Component({
selector: 'app-late-subscriber',
template: ` {{ state$ | async | json }} `,
})
export class LateSubscriberComponent {
state$ = new Subject<ApplicationState>();
@Input()
set state(v) {
this.state$.next(v);
}
}
Dealing with values and late subscribers

In this case, the view is late in subscribing to the values from the @Input() properties. We call this situation the “late subscriber” problem.

There are several other situations that can cause a similar problem:

  • Input Decorators
    • Transporting values from @Input to AfterViewInit hook
    • Transporting values from @Input to the view
    • Transporting values from @Input to the constructor
  • Component And Directive Life Cycle Hooks
    • Transporting OnChanges to the view
    • Getting the state of any Life Cycle hook later in time (important when hooks are composed)
  • Local State
    • Transporting the current local state to the view
    • Getting the current local state for other compositions that involve global state

A quick solution here would be replaying the latest notification. As common quick fix we could place a BehaviourSubject or better a ReplaySubject with a bufferSize of 1.

This would in fact cache the latest emitted value and replay it when the async pipe subscribes.

Late Subscriber - First Solution

Primitive Solution

@Component({
selector: 'app-late-subscriber',
template: ` {{ state$ | async | json }} `,
})
export class LateSubscriberComponent {
state$ = new ReplaySubject<ApplicationState>(1);
@Input()
set state(v) {
this.state$.next(v);
}
}

(used RxJS parts: ReplaySubject)

BUT - This quick solution has 2 major caveats!

First Caveat:

The downside here is that we can only replay the latest value emitted. Replaying more values would cause problems for later compositions of this stream, as a new subscriber would get all past values of the @Input binding. And we definitely don’t want that to happen in our application!

More importantly is the fact that we had to push workload to the consumer. We can not assume everybody will handle this the same way.

Caveat Workload

If we would make every source replay at least the last value we would have to implement this logic in the following places:

  • View Input bindings (multiple times)
  • View events (multiple times)
  • Other Service Changes (multiple times)
  • Component Internal interval (multiple times)

It would also force different parts of your application to cache values and increase overall memory consumption. This method also forces our self to put a replay behaviour in our local state management as well as the third party user to implement this too!

IMHO this isn’t scalable. 👎

Another downside we could think of is the bundle size of ShareReplay. This can be ignored IMHO as a reply operator might be used somewhere else too in our architecture. Therefore this might not be the biggest problem.

Second Caveat:

The second and even trickier caveat is that the composition is still cold. As we rely on the consumer to initialize the state composition.


Before we dive deeper, let's quickly clarify hot/cold and unicast/multicast and make sure we’re all on the same page.

So in RxJS, what do we mean by unicast or multicast?

Unicast

The producer is unique PER subscription.

Any creation operator is unicast. (publish operators are not yet refactored to creation operators, but they would be the only exception) interval for example would call setInterval for every subscriber separately.

Multicast

The producer is shared over ALL subscriptions.

Subject for example emits it's value to multiple subscribers without executing some producer logic again.

Cold composition

The internal logic of the observable is executed only UPON subscription.

The consumer controls the moment when internal logic is executed over the subscribe function call. The interval creation operator, for example, will only start it's internal tick if we subscribe to it. Also, nearly every pipe-able operator will also ONLY execute if we have an active subscriber.

Caveat Cold Composition

An interesting example for a cold operator is share. Even if it multicasts its notifications to multiple subscribers, it will not emit any notifications until at least one subscriber is present.

So it's cold at the beginning but multicast when the first one subscribed. ⭐️

Hot composition

The internal logic is executed independently from any consumer.

A Subject for example can emit values without any consumer present.

There are also operators that can turn all logic into a hot path. Multicast and publish operators mostly return a ConnectableObservable. If we call connect on it, we can connect to the source. This means we subscribe to its notifications. In turn this starts to execute the logic and all the operators in between publish and it's source observable.

So now even if we have no subscriber present, incoming emissions will get processed.


Hot vs Cold, Unicast vs Multicast

Now that we have a better understanding of cold/hot composition, let’s look at an example use case when dealing with local state within our components.

When we’re dealing with the "state" of a component, there are many things we often have to handle such as:

  • View Interactions ( e.g. button click )
  • Global State Changes ( e.g. HTTP update )
  • Component State Changes ( e.g. triggered internal logic )

We know that putting all this (state management) logic in the component class isn’t a best practice.

So what’s the problem here?

Doing so lacks any separation of concerns! Not to mention the fact that we would have to implement similar code over and over again (in many other components that may need to handle their own local “state”).

Ideally we want to create logic to handle the "state" within our components in a compositional way that can be reused and independent of any one component!

So far all of our RxJS sources got subscribed to when our components view was ready and we rendered the components state. As the @Input() from the view is a hot producer of values (same with injected services), we have to decouple the service(s) that handle component state from these other sources.

What are some ways we can solve our local state problem?

Since our components have hot sources and we have to compose them in a more ideal compositional fashion.

If we try to “compose” our state, we have to consider that even if we try to use an operator like scan to achieve this - the scan operator also returns a cold observable. Remember that nearly every operator returns a cold source - even if it was hot before.

So no matter what we do (before or after an operation) - we get a cold observable, and we end up having to subscribe to that operation in order to trigger the entire composition.

So if some of sources might be cold, what’s the best way to solve this issue?

  1. Make all sources replay (at least) the latest value, pushing the workload to all relevant sources.
  2. Make the composition hot as early as possible (push workload to the component related part)

We already looked at #1 above by utilizing the ReplaySubject(), so let's dive into how can we make sure the composition is "hot" as early as possible.

What could be the earliest moment to make the composition hot?

In Angular, we know that Services, even if locally provided, are instantiated first, before the component.

If we would put it there we could take over the workload from:

  • View Input bindings (multiple times)
  • View events (multiple times)
  • Component Internal interval (multiple times)
  • Locally provided services
  • Global services

Let's take a look at a simple example where we rely on the consumer to start the composition.

Service:

export class SomeService {
commands$ = new Subject<SomeCommands>();
composedState$ = this.commands$.pipe(
tap((v) => console.log('compute state ', v)),
scan(
(acc, i) => {
return { sum: acc['sum'] + i['sum'] };
},
{ sum: 0 }
),
// operator here
shareReplay({ refCount: true, bufferSize: 1 })
);
}

(used RxJS parts: scan)

In this service we could try to solve our problem by using:

  • share()
  • shareReplay({refCount: true, bufferSize: 1})
  • shareReplay({refCount: false, bufferSize: 1})

Component:

@Component({
selector: 'cold-composition',
template: `
<h1>Cold Composition</h1>
<button (click)="updateState()">update state</button><br />
<label><input [(ngModel)]="isOpen" type="checkbox" /> Show result</label>
<div *ngIf="isOpen">
someService.composedState$:
{{ someService.composedState$ | async | json }}
</div>
`,
providers: [SomeService],
})
export class ColdCompositionComponent {
isOpen = false;
constructor(public someService: SomeService) {}
updateState() {
this.someService.commands$.next({ sum: 1 });
}
}

If we run the code and click the button first and then open the result area we see we missed the values emitted before opening the area.

No matter which above method we try, nothing seems to work! The reason for that is all those operators relying on the subscriber to initialize logic. We always lose values if no subscriber is present.

StackBlitz - RxJS Hot vs Cold

Cold Composition - Problem

Even if the source is hot (the subject in the service is defined on instantiation) since we used scan, it actually made the stream cold again.

This means the composed values can be received only if there is at least 1 subscriber. Since we utilized Angular's async pipe, this created a subscription automatically for us (behind the scenes), allowing everything to work as expected!

Let's see how we can implement the above in a way we could run the processing of emitted values immediately (make the composition hot):

Cold Composition - Solution

Hot Composition Service:

export class SomeService {
subscription: Subscription;
commands$ = new Subject<{ sum: number }>();
composedState$ = this.commands$.pipe(
tap((v) => console.log('compute state ', v)),
scan(
(acc, i) => {
return { sum: acc['sum'] + i['sum'] };
},
{ sum: 0 }
),
// operator here
publishReplay(1)
);
constructor() {
// Composition is hot from here on
this.subscription = this.composedState$.connect();
}
}

(used RxJS parts: publishReplay, Subscription)

In this example, we kept the component untouched and only applied changes to the service.

We used the publishReplay operator to make the source replay the last emitted value by using 1 as our bufferSize.

In the service constructor, we called connect to make it hot, meaning we subscribe to the source within the publishReplay operator.

That's it! :)

So what did we learn here?

If we take a look at our operator reference list at the end of this document we can see the concepts we needed to understand to solve our problem were:

  • unicast vs. multicast
  • hot vs. cold

The main outcome here was that we should ensure that the moment of computation of states (or in other words the composition of observables triggered by a subscribe call) is not controlled by the subscriber. It should be HOT.

An example implementation of our learning can be found in the resources.

Based on that solution and other related problems of reactive ephemeral state I implemented a lightweight flexible state management lib called 📦 @rx-angular/state.


Resources

Used RxJS operator reference list:


RxJs Enterprise Training & Workshops

Enterprise Workshop

We offer Remote Enterprise trainings & workshops on RxJs and a variety of other topics including:

  • Reactive Programming
  • Angular (basic & advanced concepts)
  • NgRx
  • NestJS (and Node.js ecosystem)
  • ...and much more!

Are you looking to help level-up your team? Reach out, we'd love to hear from you!

Request Workshop

Learn NestJS - Official NestJS Courses 📚

Level-up your NestJS and Node.js ecosystem skills in these incremental workshop-style courses, from the NestJS Creator himself, and help support the NestJS framework! 🐈

🚀 The NestJS Fundamentals Course is now LIVE and 25% off for a limited time!

🎉 NEW - NestJS Course Extensions now live!
#RxJS
#ReactiveProgramming
#Angular

Share this Post!

📬 Trilon Newsletter

Stay up to date with all the latest Articles & News!

More from the Trilon Blog .

Jay McDoniel | Trilon Consulting
Jay McDoniel

NestJS Metadata Deep Dive

In this article we'll be doing a deep-dive and learning about how NestJS uses Metadata internally for everything from dependency injection, to decorators we use everyday!

Read More
Kamil Mysliwiec | Trilon Consulting
Kamil Mysliwiec

NestJS v10 is now available

Today I am excited to announce the official release of Nest 10: A progressive Node.js framework for building efficient and enterprise-grade, server-side applications.

Read More
Manuel Carballido | Trilon Consulting
Manuel Carballido

Implementing data source agnostic services with NestJS

Learn how to implement data source logic in an agnostic way in yours NestJS applications.

Read More

What we do at Trilon .

At Trilon, our goal is to help elevate teams - giving them the push they need to truly succeed in today's ever-changing tech world.

Trilon - Consulting

Consulting .

Let us help take your Application to the next level - planning the next big steps, reviewing architecture, and brainstorming with the team to ensure you achieve your most ambitious goals!

Trilon - Development and Team Augmentation

Development .

Trilon can become part of your development process, making sure that you're building enterprise-grade, scalable applications with best-practices in mind, all while getting things done better and faster!

Trilon - Workshops on NestJS, Node, and other modern JavaScript topics

Workshops .

Have a Trilon team member come to YOU! Get your team up to speed with guided workshops on a huge variety of topics. Modern NodeJS (or NestJS) development, JavaScript frameworks, Reactive Programming, or anything in between! We've got you covered.

Trilon - Open-source contributors

Open-source .

We love open-source because we love giving back to the community! We help maintain & contribute to some of the largest open-source projects, and hope to always share our knowledge with the world!

Explore more

Write us a message .

Let's talk about how Trilon can help your next project get to the next level.

Rather send us an email? Write to:

hello@trilon.io
© 2019-2023 Trilon.