Commonly Used Operators
In the last lesson, we covered the general idea behind the pipe method and
operators. This lesson is going to focus on the most commonly used RxJS
operators in detail. We will start with the more basic operators, and work our
way up to the more complex ones.
As well as explaining the general idea behind each operator, I am also going to give you specific real world examples to demonstrate how they would actually be used — all of the examples will be from the example applications in this course, so you will also get the context for these later as well.
map, filter, tap
We already discussed how these three operators work in the last lesson, so we won’t rehash that again here. To quickly recap:
mapwill allow you to transform values emitted on the stream by supplying a mapping functionfilterwill allow you to prevent values from being emitted from the stream if they fail the provided predicatetapwill allow you to use the value of the stream without modifying the stream — you can use this for debugging and side effects
The example we looked at in the previous lesson was this:
import { map, filter, tap } from "rxjs/operators"
myObservable$.pipe( tap((value) => console.log("Before map: ", value)), map((value) => value * 2), tap((value) => console.log("Before filter: ", value)), filter((value) => value < 7)).subscribe((val) => console.log("Stream emitted:", val));Which would result in the following output:
Before map: 1Before filter: 2Stream emitted: 2Before map: 2Before filter: 4Stream emitted: 4Before map: 3Before filter: 6Stream emitted: 6Before map: 4Before filter: 8Before map: 5Before filter: 10Notably, the stream stops emitting data after the 6 value is emitted, because
the predicate for the filter fails at that point.
One important thing to keep in mind is that the map and filter operators are
different to the map and filter methods of an array. The map and filter
operators apply to each individual stream emission. To demonstrate what I mean,
imagine a stream that emits arrays of values:
const myObservable$ = of([1, 2, 3], [4, 5, 6], [7, 8, 9])This would emit the following three arrays:
[1, 2, 3][4, 5, 6][7, 8, 9]Now let’s say we want to double the values of all of the elements in these arrays. If you’re not careful, you might try to do something like this:
myObservable$.pipe( map((val) => val * 2))But, this would result in the following:
NaNNaNNaNWe get Not a Number for all three emissions. Do you know why?
Click here to reveal solution
Solution
The map operator will map each individual stream emission. That means we are
trying to do this:
[1, 2, 3] * 2[4, 5, 6] * 2[7, 8, 9] * 2This doesn’t work. What we need to do is use both the map operator and
the map array method. We use the map operator to modify the stream emission
in some way, and the way in which we modify that stream emission is to use the
map array method on it:
myObservable$.pipe( map((array) => array.map(val => val * 2)))This will give us the result we want:
[2, 4, 6][8, 10, 12][14, 16, 18]The same goes for filtering arrays that are emitted on a stream. We would do this to filter out odd values from data emissions:
myObservable$.pipe( map((array) => array.filter(val => val % 2 === 0)))Which would result in:
[ 2 ][ 4, 6 ][ 8 ]Example use case
Although we have already talked about these three operators, we haven’t seen real world examples yet — let’s do that now:
getChecklistById(id: string) { return this.getChecklists().pipe( filter((checklists) => checklists.length > 0), // don't emit if checklists haven't loaded yet map((checklists) => checklists.find((checklist) => checklist.id === id)) ); }The purpose of this method is to return a stream of a single checklist that
matches the id passed in. We start with a stream of all checklists that
the getChecklists() method returns, and then we pipe filter and map.
The map is a very typical use case. We map the emission which will be an
array of all checklists, and then we use the standard find array method to
return only the checklist that matches the id.
The usage of filter here is a little more creative. It is possible that
getChecklists() will emit an empty array if the checklists have not been
loaded into storage yet, which will result in no checklist being found by the
map. To deal with this, the getChecklistById method will ignore any
emissions from getChecklists that are just an empty array.
Imagine on a page in our application we are trying to get a specific checklist
with getChecklistById and the data has not been loaded from storage yet.
Without the filter our stream would emit a null value first which would
cause us some trouble, and then it would emit the actual checklist after that.
With the filter the first stream emission we get will be the checklist after
it has been loaded in from storage.
We will use a separate example for tap. The debugging use case is reasonably
obvious for tap but let’s look at a case where we will use it to trigger
a side effect:
getPhotos(){ return this.photos$.pipe( tap((photos) => this.storage?.set('photos', photos)) )}The photos$ stream will contain all of the photos that are present in the
application. Every time a new photo is added (or deleted), this photos$ stream
will emit. We are using tap here to trigger a side effect every time a new
photo is added. We take the current photos and save them into storage.
startWith
The startWith operator is reasonably simple, let’s take a look at an example:
import { from } from "rxjs";import { startWith } from "rxjs/operators"
const myObservable$ = from([1, 2, 3]).pipe( startWith(0))This will cause the stream to emit:
0123The startWith will be subscribed to first, it will emit any values it was
supplied, and then it will subscribe to the source observable (from([1, 2, 3]))
and emit all of its values.
Example use case
I find this to be particularly useful for streams that might not emit a value until some user interaction occurs. Sometimes, we need an initial value to kick things off, but if a stream doesn’t emit any values by default we can be stuck waiting.
This occurs quite often when we use the valueChanges observable from Angular’s ReactiveFormsModule:
const result$ = myFormControl.valueChanges.pipe( // do something)The valueChanges observable will emit every time the user changes a value in
the form control. In one of the example applications, we let users view posts
from a particular subreddit on Reddit. To let them specify the subreddit they
want, we use a FormControl and we listen to valueChanges to set the
subreddit appropriately.
However, valueChanges doesn’t emit anything by default. But, we still want to
display a default subreddit. To handle this, we do something like this:
const result$ = myFormControl.valueChanges.pipe( startWith('gifs'), // fetch from subreddit with HTTP request)Now, even if the user has not entered anything into the form control, our stream
will still emit a default value of gifs and kick off the process of fetching
results from Reddit.
distinctUntilChanged
The distinctUntilChanged operator is another one that is reasonably simple,
the basic idea is that it will prevent the stream from emitting the same value
that was just emitted:
import { from } from "rxjs";import { distinctUntilChanged } from "rxjs/operators"
const myObservable$ = from([1, 2, 3, 3, 2]).pipe( distinctUntilChanged())This will cause the stream to emit:
1232Note that in the spot where we had 3, 3 only one 3 is emitted on the
resulting stream. We can have the same values emitting multiple times on the
stream (like the 2) as long as they are not immediately after each other.
Example use case
We can again use our form example for this. Imagine the same scenario with the
subreddit. Whatever the user enters, we want to go and fetch that data from
the Reddit API. If they entered gifs we would fetch from the gifs subreddit.
If the next stream emission is also gifs then there is no need to run the
request again because we already did it.
We could improve our stream a little more by adding distinctUntilChanged:
const result$ = myFormControl.valueChanges.pipe( startWith('gifs'), distinctUntilChanged(), // fetch from subreddit with HTTP request)debounceTime
The debounceTime operator is similar in spirit to distinctUntilChanged but
rather than preventing emissions that are the same as the last one, it will
prevent emissions that happened too quickly compared to the last one.
If we use our typical example:
import { from } from "rxjs";import { debounceTime } from "rxjs/operators"
const myObservable$ = from([1, 2, 3]).pipe( debounceTime(200))This would emit:
3This is because our from creation operator will emit all of the values
instantly. What debounceTime will do is wait for the stream to be “idle” (i.e.
emitting no values) for 200ms then it will emit whatever the last value was.
It doesn’t emit 1 because the 2 is emitted too soon after it, and it doesn’t emit 2 because the 3 emits too quickly after it, but 3 is the last value so it will wait 200ms, see that no new values have been emitted, and then it will emit the 3.
Example use case
We can again use our form example with Reddit. Again, we are expecting the user
to type a new subreddit into the form field. But unlike our from creation
operator, a user can’t instantly type values. If they are changing the subreddit
from gifs to funny there is going to be a lot of intermediate values:
ffufunfunnfunny
We don’t want to launch an HTTP request for each of these because most of them
will fail. We can use debounceTime to give the user a bit of time to finish
typing, and the value won’t emit until they have stopped typing for at least
200ms or whatever value we use:
const result$ = myFormControl.valueChanges.pipe( debounceTime(200), distinctUntilChanged(), startWith('gifs'), // fetch from subreddit with HTTP request)An important note here is that we do the startWith after the
debounceTime(). We use startWith so that we can fetch data from a default
subreddit as soon as the component loads. If we put the debounceTime after the
startWith it is going to cause our initial value to be delayed by 200ms
which will cause our data to load 200ms slower than it needs to.
catchError
As we mentioned earlier, an observable can accept an observer that has three notifier methods:
nexterrorcomplete
Most of the time we just focus on the next notifier which is what gives us our
data:
myObservable$.subscribe((data) => console.log(data))As in the example above, most of the time we don’t even supply the error or
complete notifiers. We just supply a single next handler to handle the data
emitted on the stream.
When a stream triggers the complete notifier, it will cause the stream to be
unsubscribed/cleaned up/deleted. In effect, the stream no longer exists. This is
generally something we want after our stream has emitted all of its values, e.g:
from([1, 2, 3])Once the 3 emits the stream will complete and be cleaned up.
However… the same thing also happens when an error occurs. The stream will
be unsubscribed/cleaned up/deleted. This might cause us problems. We are never
going to run into this problem with a simple example like the above, because it
can’t emit an error. But, let’s get creative. Let’s force a stream to throw an
error:
import { from } from "rxjs";import { map } from "rxjs/operators";
const numberStream$ = from([1, 2, 3]);
const myObservable$ = numberStream$.pipe( map(() => { throw new Error('Oops')}))If we subscribe to myObservable$ we will get:
Uncaught Error: OopsWe just get an error and no stream emissions. The error has broken the stream. This is fine in situations where our code is just wrong — but sometimes errors are a normal part of an application running. Sometimes invalid values are entered, sometimes an HTTP request will fail. Sometimes we need to be able to handle these without breaking our streams.
Let’s keep our map in there to break the stream. But now what we are going to
do is use catchError to save the stream. The bad news is, the original stream
is dead. Sorry, there is no saving it. It errored. It’s gone. But! We can supply
a replacement stream with catchError. If our original stream errors, we
can just replace it with a new one:
import { from } from "rxjs";import { map, catchError } from "rxjs/operators";
const numberStream$ = from([1, 2, 3]);
const myObservable$ = numberStream$.pipe( map(() => { throw new Error('Oops')}), catchError(() => numberStream$))Now if we subscribe to myObservable$ we will get:
123We have correctly handled the error, and we have replaced the broken stream with the original number stream.
Example use case
The example above works for a simple demonstration, but it’s not exactly
realistic. One simple error handling strategy we will use later is to replace
the stream with an EMPTY stream:
import { EMPTY } from 'rxjs';
return this.http.get('...').pipe( catchError(() => EMPTY))This is again from the Reddit example. We are making an HTTP request that
might fail. However, this stream is used as part of another stream. It’s fine
if our HTTP request fails, but by default it is going to error which will also
break the other stream it is being used in. What we do is catch the error, and
replace the errored stream with an EMPTY stream. EMPTY is just a special
RxJS stream that emits nothing and completes — the important part is it won’t
cause an error that will break the other stream.
We will see this example, and other more advanced error handling strategies, in more detail later.
Creation Operators
The operators we have discussed above are all pipeable operators that are
used on an existing stream with the pipe method. The following operators are
creation operators that are used to create entirely new observable streams.
of, from
We have already talked about these ones quite a lot so we won’t cover them again, but let’s take a look at a real world use cases rather than just using them to do things like emit an array of numbers like we have been doing.
Example use case
These two creation operators usually become useful in a practical situation when, for some reason, we need to convert some normal value into a stream of that value (or values). Often, this is because we want to combine this value into some existing stream in some way.
This is a slightly more advanced/confusing use case, but these operators can be used in conjunction with each other in interesting ways:
this.currentPhoto$ = from(value).pipe( // For each emission, switch to a stream of just that one value concatMap((photo) => of(photo).pipe( // Wait 500 ms before making it the currentPhoto // Then concatMap will move on to the next photo delay(500) ) ) );The value being used here comes from an @Input() — it is an array of photos.
Our goal here is to create a stream called currentPhoto$ that will emit values
that are just one photo from that array. The reason for this is that we want
to display one photo at a time like a slideshow, the currentPhoto$ stream
emitting one photo at a time with a slight delay will allow us to do that.
So, we take our array of photos and create a stream from it:
from(value)This will instantly emit every photo in the stream one at a time, e.g:
photo1photo2photo3photo4This isn’t what we want though, because we are only ever going to see the last
photo because the others are all emitted too fast. We are about to get into
flattening operators now, which is what concatMap is. This is a more
advanced concept that we are going to cover in the next lesson. That makes this
example a bit hard to explain in full, so I am just going to go light on the
details here for now.
The important point is that to get the result we want, we need to create
individual streams of each of those photo emissions and delay the emissions on
that stream by the time we want: 500ms in this case. There is no operator that
allows us to make an individual stream pause 500ms between each emission, e.g.
something like this would be nice:
this.currentPhoto$ = from(value).pipe( delayEach(500) );But delayEach does not exist. However, we can add a delay to the beginning
of a stream so that it will wait before its first emission. This doesn’t
work in our case because we need the delay between every emission, not just the
first one.
This is why we need to use of to create a stream of each individual photo, so
that we can delay those individual streams by 500ms each:
of(photo).pipe( delay(500))Since of returns an observable, and we are already inside of an observable, we
are now dealing with an observable that emits observables which is why we need
concatMap. That is what the next lesson is going to be about.
combineLatest
The last operator we are going to cover is combineLatest, and it is one we
will use quite a lot. It’s actually a reasonably simple one. Often, we will be
working with multiple different streams at once. For example, maybe we have the
following streams:
const name$ = of('Josh');const luckyNumber$ = from([5, 22, 587]);const attempts$ = new BehaviorSubject(0);Now let’s say we want the values from all of those streams. We would need to subscribe to all of them:
name$.subscribe((val) => console.log(val));luckyNumber$.subscribe((val) => console.log(val));attempts$.subscribe((val) => console.log(val));This would give us the following result:
Josh5225870If we were using these values to display inside of a template, we would actually only see the last values for each:
Josh5870It’s a little annoying that we need three separate subscriptions to get these
values, in some cases we might even have more streams. This is where
combineLatest becomes useful. It will create a new observable that emits the
latest values from all of its input observables, e.g:
const data$ = combineLatest([name$, luckyNumber$, attempts$]);Subscribing to this would give us:
['Josh', 587, 0]It emits an array with the latest results from each stream.
Example use case
As you will see later, combineLatest is useful for creating a view model
(usually named vm$) for our templates.
We will often have a bunch of data we want to display in our template from
different streams. We can use combineLatest to combine all of those streams
into one, subscribe to it once in the template with the async pipe, and then
easily use the values:
vm$ = combineLatest([name$, luckyNumber$, attempts$]).pipe( map(([name, luckyNumber, attempt]) => { return { name: name, luckyNumber: luckyNumber, attempt: attempt } }))It’s awkward having the values be in an array, so we use a map with array
destructuring to name and pull all of those values out of the array, and then
return them as an object. This means when this stream emits, rather than
getting:
['Josh', 587, 0]We will get:
{ name: 'Josh', luckNumber: 587, attempt: 0}Which is much easier to use. I’ve used the long form syntax to make it more obvious that we are mapping the values from the array into an object, but we could also do this:
vm$ = combineLatest([name$, luckyNumber$, attempts$]).pipe( map(([name, luckyNumber, attempt]) => ({name, luckyNumber, attempt})))With our vm$ stream created, we could use it in our template like this:
<ng-container *ngIf="vm$ | async as vm"> <p>{{ vm.name }}</p> <p>{{ vm.luckyNumber }}</p> <p>{{ vm.attempt }}</p></ng-container>An ng-container will not actually be rendered to the DOM, we just use this to
conditionally render everything inside of it based on our *ngIf. If the vm$
stream has emitted, then all of the paragraphs will render, and we are able to
access the data through the vm object (because we used the as vm syntax).
If we didn’t use combineLatest we would have had to do something like this:
<p *ngIf="name$ | async as name">{{ name }}</p><p *ngIf="luckyNumber$ | async as luckyNumber">{{ luckyNumber }}</p><p *ngIf="attempt$ | async as attempt">{{ attempt }}</p>We will talk a more about the | async pipe in a lesson soon. Again, this is
something that existing Angular applications very heavily rely on — but it is
not generally used in signals based applications.