Skip to content

Commonly Used Operators

In the last lesson, we covered the general idea behind the pipe method and operators. This lesson is going to focus on the most commonly used RxJS operators in detail. We will start with the more basic operators, and work our way up to the more complex ones.

As well as explaining the general idea behind each operator, I am also going to give you specific real world examples to demonstrate how they would actually be used — all of the examples will be from the example applications in this course, so you will also get the context for these later as well.

map, filter, tap

We already discussed how these three operators work in the last lesson, so we won’t rehash that again here. To quickly recap:

  • map will allow you to transform values emitted on the stream by supplying a mapping function
  • filter will allow you to prevent values from being emitted from the stream if they fail the provided predicate
  • tap will allow you to use the value of the stream without modifying the stream — you can use this for debugging and side effects

The example we looked at in the previous lesson was this:

import { map, filter, tap } from "rxjs/operators"
myObservable$.pipe(
tap((value) => console.log("Before map: ", value)),
map((value) => value * 2),
tap((value) => console.log("Before filter: ", value)),
filter((value) => value < 7)
).subscribe((val) => console.log("Stream emitted:", val));

Which would result in the following output:

Before map: 1
Before filter: 2
Stream emitted: 2
Before map: 2
Before filter: 4
Stream emitted: 4
Before map: 3
Before filter: 6
Stream emitted: 6
Before map: 4
Before filter: 8
Before map: 5
Before filter: 10

Notably, the stream stops emitting data after the 6 value is emitted, because the predicate for the filter fails at that point.

One important thing to keep in mind is that the map and filter operators are different to the map and filter methods of an array. The map and filter operators apply to each individual stream emission. To demonstrate what I mean, imagine a stream that emits arrays of values:

const myObservable$ = of([1, 2, 3], [4, 5, 6], [7, 8, 9])

This would emit the following three arrays:

[1, 2, 3]
[4, 5, 6]
[7, 8, 9]

Now let’s say we want to double the values of all of the elements in these arrays. If you’re not careful, you might try to do something like this:

myObservable$.pipe(
map((val) => val * 2)
)

But, this would result in the following:

NaN
NaN
NaN

We get Not a Number for all three emissions. Do you know why?

The map operator will map each individual stream emission. That means we are trying to do this:

[1, 2, 3] * 2
[4, 5, 6] * 2
[7, 8, 9] * 2

This doesn’t work. What we need to do is use both the map operator and the map array method. We use the map operator to modify the stream emission in some way, and the way in which we modify that stream emission is to use the map array method on it:

myObservable$.pipe(
map((array) => array.map(val => val * 2))
)

This will give us the result we want:

[2, 4, 6]
[8, 10, 12]
[14, 16, 18]

The same goes for filtering arrays that are emitted on a stream. We would do this to filter out odd values from data emissions:

myObservable$.pipe(
map((array) => array.filter(val => val % 2 === 0))
)

Which would result in:

[ 2 ]
[ 4, 6 ]
[ 8 ]

Example use case

Although we have already talked about these three operators, we haven’t seen real world examples yet — let’s do that now:

getChecklistById(id: string) {
return this.getChecklists().pipe(
filter((checklists) => checklists.length > 0), // don't emit if checklists haven't loaded yet
map((checklists) => checklists.find((checklist) => checklist.id === id))
);
}

The purpose of this method is to return a stream of a single checklist that matches the id passed in. We start with a stream of all checklists that the getChecklists() method returns, and then we pipe filter and map.

The map is a very typical use case. We map the emission which will be an array of all checklists, and then we use the standard find array method to return only the checklist that matches the id.

The usage of filter here is a little more creative. It is possible that getChecklists() will emit an empty array if the checklists have not been loaded into storage yet, which will result in no checklist being found by the map. To deal with this, the getChecklistById method will ignore any emissions from getChecklists that are just an empty array.

Imagine on a page in our application we are trying to get a specific checklist with getChecklistById and the data has not been loaded from storage yet. Without the filter our stream would emit a null value first which would cause us some trouble, and then it would emit the actual checklist after that. With the filter the first stream emission we get will be the checklist after it has been loaded in from storage.

We will use a separate example for tap. The debugging use case is reasonably obvious for tap but let’s look at a case where we will use it to trigger a side effect:

getPhotos(){
return this.photos$.pipe(
tap((photos) => this.storage?.set('photos', photos))
)
}

The photos$ stream will contain all of the photos that are present in the application. Every time a new photo is added (or deleted), this photos$ stream will emit. We are using tap here to trigger a side effect every time a new photo is added. We take the current photos and save them into storage.

startWith

The startWith operator is reasonably simple, let’s take a look at an example:

import { from } from "rxjs";
import { startWith } from "rxjs/operators"
const myObservable$ = from([1, 2, 3]).pipe(
startWith(0)
)

This will cause the stream to emit:

0
1
2
3

The startWith will be subscribed to first, it will emit any values it was supplied, and then it will subscribe to the source observable (from([1, 2, 3])) and emit all of its values.

Example use case

I find this to be particularly useful for streams that might not emit a value until some user interaction occurs. Sometimes, we need an initial value to kick things off, but if a stream doesn’t emit any values by default we can be stuck waiting.

This occurs quite often when we use the valueChanges observable from Angular’s ReactiveFormsModule:

const result$ = myFormControl.valueChanges.pipe(
// do something
)

The valueChanges observable will emit every time the user changes a value in the form control. In one of the example applications, we let users view posts from a particular subreddit on Reddit. To let them specify the subreddit they want, we use a FormControl and we listen to valueChanges to set the subreddit appropriately.

However, valueChanges doesn’t emit anything by default. But, we still want to display a default subreddit. To handle this, we do something like this:

const result$ = myFormControl.valueChanges.pipe(
startWith('gifs'),
// fetch from subreddit with HTTP request
)

Now, even if the user has not entered anything into the form control, our stream will still emit a default value of gifs and kick off the process of fetching results from Reddit.

distinctUntilChanged

The distinctUntilChanged operator is another one that is reasonably simple, the basic idea is that it will prevent the stream from emitting the same value that was just emitted:

import { from } from "rxjs";
import { distinctUntilChanged } from "rxjs/operators"
const myObservable$ = from([1, 2, 3, 3, 2]).pipe(
distinctUntilChanged()
)

This will cause the stream to emit:

1
2
3
2

Note that in the spot where we had 3, 3 only one 3 is emitted on the resulting stream. We can have the same values emitting multiple times on the stream (like the 2) as long as they are not immediately after each other.

Example use case

We can again use our form example for this. Imagine the same scenario with the subreddit. Whatever the user enters, we want to go and fetch that data from the Reddit API. If they entered gifs we would fetch from the gifs subreddit. If the next stream emission is also gifs then there is no need to run the request again because we already did it.

We could improve our stream a little more by adding distinctUntilChanged:

const result$ = myFormControl.valueChanges.pipe(
startWith('gifs'),
distinctUntilChanged(),
// fetch from subreddit with HTTP request
)

debounceTime

The debounceTime operator is similar in spirit to distinctUntilChanged but rather than preventing emissions that are the same as the last one, it will prevent emissions that happened too quickly compared to the last one.

If we use our typical example:

import { from } from "rxjs";
import { debounceTime } from "rxjs/operators"
const myObservable$ = from([1, 2, 3]).pipe(
debounceTime(200)
)

This would emit:

3

This is because our from creation operator will emit all of the values instantly. What debounceTime will do is wait for the stream to be “idle” (i.e. emitting no values) for 200ms then it will emit whatever the last value was.

It doesn’t emit 1 because the 2 is emitted too soon after it, and it doesn’t emit 2 because the 3 emits too quickly after it, but 3 is the last value so it will wait 200ms, see that no new values have been emitted, and then it will emit the 3.

Example use case

We can again use our form example with Reddit. Again, we are expecting the user to type a new subreddit into the form field. But unlike our from creation operator, a user can’t instantly type values. If they are changing the subreddit from gifs to funny there is going to be a lot of intermediate values:

  • f
  • fu
  • fun
  • funn
  • funny

We don’t want to launch an HTTP request for each of these because most of them will fail. We can use debounceTime to give the user a bit of time to finish typing, and the value won’t emit until they have stopped typing for at least 200ms or whatever value we use:

const result$ = myFormControl.valueChanges.pipe(
debounceTime(200),
distinctUntilChanged(),
startWith('gifs'),
// fetch from subreddit with HTTP request
)

An important note here is that we do the startWith after the debounceTime(). We use startWith so that we can fetch data from a default subreddit as soon as the component loads. If we put the debounceTime after the startWith it is going to cause our initial value to be delayed by 200ms which will cause our data to load 200ms slower than it needs to.

catchError

As we mentioned earlier, an observable can accept an observer that has three notifier methods:

  • next
  • error
  • complete

Most of the time we just focus on the next notifier which is what gives us our data:

myObservable$.subscribe((data) => console.log(data))

As in the example above, most of the time we don’t even supply the error or complete notifiers. We just supply a single next handler to handle the data emitted on the stream.

When a stream triggers the complete notifier, it will cause the stream to be unsubscribed/cleaned up/deleted. In effect, the stream no longer exists. This is generally something we want after our stream has emitted all of its values, e.g:

from([1, 2, 3])

Once the 3 emits the stream will complete and be cleaned up.

However… the same thing also happens when an error occurs. The stream will be unsubscribed/cleaned up/deleted. This might cause us problems. We are never going to run into this problem with a simple example like the above, because it can’t emit an error. But, let’s get creative. Let’s force a stream to throw an error:

import { from } from "rxjs";
import { map } from "rxjs/operators";
const numberStream$ = from([1, 2, 3]);
const myObservable$ = numberStream$.pipe(
map(() => { throw new Error('Oops')})
)

If we subscribe to myObservable$ we will get:

Uncaught Error: Oops

We just get an error and no stream emissions. The error has broken the stream. This is fine in situations where our code is just wrong — but sometimes errors are a normal part of an application running. Sometimes invalid values are entered, sometimes an HTTP request will fail. Sometimes we need to be able to handle these without breaking our streams.

Let’s keep our map in there to break the stream. But now what we are going to do is use catchError to save the stream. The bad news is, the original stream is dead. Sorry, there is no saving it. It errored. It’s gone. But! We can supply a replacement stream with catchError. If our original stream errors, we can just replace it with a new one:

import { from } from "rxjs";
import { map, catchError } from "rxjs/operators";
const numberStream$ = from([1, 2, 3]);
const myObservable$ = numberStream$.pipe(
map(() => { throw new Error('Oops')}),
catchError(() => numberStream$)
)

Now if we subscribe to myObservable$ we will get:

1
2
3

We have correctly handled the error, and we have replaced the broken stream with the original number stream.

Example use case

The example above works for a simple demonstration, but it’s not exactly realistic. One simple error handling strategy we will use later is to replace the stream with an EMPTY stream:

import { EMPTY } from 'rxjs';
return this.http.get('...').pipe(
catchError(() => EMPTY)
)

This is again from the Reddit example. We are making an HTTP request that might fail. However, this stream is used as part of another stream. It’s fine if our HTTP request fails, but by default it is going to error which will also break the other stream it is being used in. What we do is catch the error, and replace the errored stream with an EMPTY stream. EMPTY is just a special RxJS stream that emits nothing and completes — the important part is it won’t cause an error that will break the other stream.

We will see this example, and other more advanced error handling strategies, in more detail later.

Creation Operators

The operators we have discussed above are all pipeable operators that are used on an existing stream with the pipe method. The following operators are creation operators that are used to create entirely new observable streams.

of, from

We have already talked about these ones quite a lot so we won’t cover them again, but let’s take a look at a real world use cases rather than just using them to do things like emit an array of numbers like we have been doing.

Example use case

These two creation operators usually become useful in a practical situation when, for some reason, we need to convert some normal value into a stream of that value (or values). Often, this is because we want to combine this value into some existing stream in some way.

This is a slightly more advanced/confusing use case, but these operators can be used in conjunction with each other in interesting ways:

this.currentPhoto$ = from(value).pipe(
// For each emission, switch to a stream of just that one value
concatMap((photo) =>
of(photo).pipe(
// Wait 500 ms before making it the currentPhoto
// Then concatMap will move on to the next photo
delay(500)
)
)
);

The value being used here comes from an @Input() — it is an array of photos. Our goal here is to create a stream called currentPhoto$ that will emit values that are just one photo from that array. The reason for this is that we want to display one photo at a time like a slideshow, the currentPhoto$ stream emitting one photo at a time with a slight delay will allow us to do that.

So, we take our array of photos and create a stream from it:

from(value)

This will instantly emit every photo in the stream one at a time, e.g:

photo1
photo2
photo3
photo4

This isn’t what we want though, because we are only ever going to see the last photo because the others are all emitted too fast. We are about to get into flattening operators now, which is what concatMap is. This is a more advanced concept that we are going to cover in the next lesson. That makes this example a bit hard to explain in full, so I am just going to go light on the details here for now.

The important point is that to get the result we want, we need to create individual streams of each of those photo emissions and delay the emissions on that stream by the time we want: 500ms in this case. There is no operator that allows us to make an individual stream pause 500ms between each emission, e.g. something like this would be nice:

this.currentPhoto$ = from(value).pipe(
delayEach(500)
);

But delayEach does not exist. However, we can add a delay to the beginning of a stream so that it will wait before its first emission. This doesn’t work in our case because we need the delay between every emission, not just the first one.

This is why we need to use of to create a stream of each individual photo, so that we can delay those individual streams by 500ms each:

of(photo).pipe(
delay(500)
)

Since of returns an observable, and we are already inside of an observable, we are now dealing with an observable that emits observables which is why we need concatMap. That is what the next lesson is going to be about.

combineLatest

The last operator we are going to cover is combineLatest, and it is one we will use quite a lot. It’s actually a reasonably simple one. Often, we will be working with multiple different streams at once. For example, maybe we have the following streams:

const name$ = of('Josh');
const luckyNumber$ = from([5, 22, 587]);
const attempts$ = new BehaviorSubject(0);

Now let’s say we want the values from all of those streams. We would need to subscribe to all of them:

name$.subscribe((val) => console.log(val));
luckyNumber$.subscribe((val) => console.log(val));
attempts$.subscribe((val) => console.log(val));

This would give us the following result:

Josh
5
22
587
0

If we were using these values to display inside of a template, we would actually only see the last values for each:

Josh
587
0

It’s a little annoying that we need three separate subscriptions to get these values, in some cases we might even have more streams. This is where combineLatest becomes useful. It will create a new observable that emits the latest values from all of its input observables, e.g:

const data$ = combineLatest([name$, luckyNumber$, attempts$]);

Subscribing to this would give us:

['Josh', 587, 0]

It emits an array with the latest results from each stream.

Example use case

As you will see later, combineLatest is useful for creating a view model (usually named vm$) for our templates.

We will often have a bunch of data we want to display in our template from different streams. We can use combineLatest to combine all of those streams into one, subscribe to it once in the template with the async pipe, and then easily use the values:

vm$ = combineLatest([name$, luckyNumber$, attempts$]).pipe(
map(([name, luckyNumber, attempt]) => {
return {
name: name,
luckyNumber: luckyNumber,
attempt: attempt
}
})
)

It’s awkward having the values be in an array, so we use a map with array destructuring to name and pull all of those values out of the array, and then return them as an object. This means when this stream emits, rather than getting:

['Josh', 587, 0]

We will get:

{
name: 'Josh',
luckNumber: 587,
attempt: 0
}

Which is much easier to use. I’ve used the long form syntax to make it more obvious that we are mapping the values from the array into an object, but we could also do this:

vm$ = combineLatest([name$, luckyNumber$, attempts$]).pipe(
map(([name, luckyNumber, attempt]) => ({name, luckyNumber, attempt}))
)

With our vm$ stream created, we could use it in our template like this:

<ng-container *ngIf="vm$ | async as vm">
<p>{{ vm.name }}</p>
<p>{{ vm.luckyNumber }}</p>
<p>{{ vm.attempt }}</p>
</ng-container>

An ng-container will not actually be rendered to the DOM, we just use this to conditionally render everything inside of it based on our *ngIf. If the vm$ stream has emitted, then all of the paragraphs will render, and we are able to access the data through the vm object (because we used the as vm syntax).

If we didn’t use combineLatest we would have had to do something like this:

<p *ngIf="name$ | async as name">{{ name }}</p>
<p *ngIf="luckyNumber$ | async as luckyNumber">{{ luckyNumber }}</p>
<p *ngIf="attempt$ | async as attempt">{{ attempt }}</p>

We will talk a more about the | async pipe in a lesson soon. Again, this is something that existing Angular applications very heavily rely on — but it is not generally used in signals based applications.

Recap

    Which operator allows us to modify individual stream emissions?

    Which of the following can NOT prevent some stream emissions?

    In relation to combineLatest, which of the following statements are FALSE

    Which of the following most accurately describes catchError