Skip to content

Higher Order Observables

The term higher order (something) probably sounds a lot fancier than it really is. It is also really common to hear the term higher order functions in programming. The basic idea of a higher order function is that it does either of the following (or both):

  • Accepts a function as a parameter
  • Returns a function

A higher order function is a function that returns a function, or accepts a function as a parameter. A higher order observable is an observable that emits observables.

You may find this supplementary video helpful to provide some context:

Play

Most of what we have looked at so far deals with observables that return simple values. For example, this:

const myObservable$ = from([1,2,3,4,5]);

Is an observable that emits numbers:

1
2
3
4
5

Let’s imagine a different scenario, we are going to borrow the general concept of the example in the documentation. Let’s say we have an observable of APIs we want to make a request to in order to fetch some data:

const myObservable$ = from([
'https://api-one.com/data',
'https://api-two.com/data',
'https://api-three.com/data',
'https://api-four.com/data'
]);

Everything is looking pretty normal so far — we have an observable that emits four strings:

https://api-one.com/data
https://api-two.com/data
https://api-three.com/data
https://api-four.com/data

That’s not of much use to us — we want the actual data from those URLs. This is a good scenario for our pipe — we want to transform this stream in some way. We want to transform this stream from being an observable of URLs into an observable of data fetched from those URLs.

The first thing that might spring to mind is to use the map operator:

myObservable$.pipe(
map(url => this.http.get(url))
)

Now we are taking the URL that is passed into the map and making an HTTP request with Angular’s HttpClient to fetch the data for us. This looks promising, but there is a problem…

A get request made with the HttpClient will return an observable. What we want is the data from the HTTP request being emitted on this stream, but if we subscribe to the stream above what we actually get is:

Observable
Observable
Observable
Observable

We still need to subscribe to those observables to execute the HTTP request and to get the data out of them. So, how should we handle that? The most natural step might then be to… just subscribe to them. Maybe we do something like this:

myObservable$.pipe(
map(url => this.http.get(url))
).subscribe((result) => {
result.subscribe((data) => console.log(data))
})

This would work… but it’s getting a bit ugly with nested subscribes. Not only that, we probably want to do something more than just console.log this data — but how do we use it? Maybe we take each emission and push it into an array:

const results = [];
myObservable$.pipe(
map(url => this.http.get(url))
).subscribe((result) => {
result.subscribe((data) => results.push(data))
})

In case my tone hasn’t been obvious you absolutely should not do this. It works, results will now contain the results of all the HTTP requests, but it is messy and is not coding in a reactive way (since we explicitly need to subscribe to pull the data out of streams in order to work with it). We are making things way harder than they need to be.

Flattening Operators

The whole point of the example above was to show why the flattening operators we are about to talk about are useful. These operators solve this situation for us, in different ways depending on what exactly we want to do. When we have a higher order observable, or an observable that returns an observable, we can use these operators to subscribe to the inner observable (the inner observable is the observable being returned by the observable) for us. They “flatten” the inner observables by subscribing to all of them for us, and just giving us the simple values from within those streams.

To demonstrate these operators, we are going to continue using the same scenario we just discussed:

const myObservable$ = from([
'https://api-one.com/data',
'https://api-two.com/data',
'https://api-three.com/data',
'https://api-four.com/data'
]);

We want to transform this stream by taking these values and making HTTP requests to the urls.

switchMap

Let’s take a look at switchMap first, as it is probably the most common.

import { switchMap } from 'rxjs/operators';
myObservable$.pipe(
switchMap(url => this.http.get(url))
).subscribe(val => console.log(val));

This example might seem oddly familiar. It is exactly the same as our “bad” example, except instead of the normal map operator we swapped it out with switchMap. This will give us the following result:

(data from api-four)

We kind of get the result we want — this will return the actual data from the URL rather than an observable for the HTTP request to get that data. But… it only gives us the last one. The result from making a GET request to: https://api-four.com/data.

This is often the behaviour we want, but not for this scenario. What switchMap will do is the following:

  1. Receives a new url value from the outer observable (myObservable$)
  2. Passes the url into the get call, and subscribes to it (the inner observable)
  3. Emits the value from the inner observable

If the http.get() call returned instantly/synchronously, then switchMap would indeed return all the data for us:

(data from api-one)
(data from api-two)
(data from api-three)
(data from api-four)

But, an HTTP request might take some time to complete, maybe even a few seconds. What happens with switchMap is that it will receive its first value, and subscribe to the inner http.get() observable. However, if it receives its second value from the outer observable (myObservable$) before it gets a result from the inner observable (http.get), it will cancel the request and launch a new request for the second value instead. The process will go more like this:

  1. Receives a api-one value from the outer observable (myObservable$)
  2. Passes the api-one URL into the get call, and subscribes to it (the inner observable)
  3. Receives a new api-two value from the outer observable
  4. Unsubscribes from the http.get() for api-one and subscribes to http.get() for api-two instead
  5. Receives a new api-three value from the outer observable
  6. Unsubscribes from the http.get() for api-two and subscribes to http.get() for api-three instead
  7. Receives a new api-four value from the outer observable
  8. Unsubscribes from the http.get() for api-three and subscribes to http.get() for api-four instead
  9. Emits the value from the inner observable for api-four

All of the values get emitted from myObservable$ before it has time to finish its http.get() request for each value, so it only has time to fetch and emit the value for the last value (because no more values are coming in that cause it to be cancelled before it can finish).

This is the special behaviour of switchMap: whenever it receives a new value it will cancel the previous subscription.

Although this is not suited to this particular situation, it often is the appropriate flattening operator to use. For example, imagine the scenario of a user entering a search term into a search box. When they enter a value, we make an API request to get the results. If the user enters:

socks

We will take that value and use it to launch an HTTP request to get the results for socks. But, if the user enters a new search term before that one returns:

pants

It doesn’t make any sense to finish the request for socks, we don’t need it any more. We would use a switchMap to cancel the in progress request, and just return whatever the latest one is instead.

concatMap

With switchMap explained, we should be able to go a little lighter on the explanations for all the rest. They all do basically the same thing, they are only differentiated by the way they handle the situation of new values emitted from the outer observable before the inner observable has completed. As we saw with switchMap, it will cancel older values and switch to the new value instead.

Let’s see what happens with concatMap:

import { concatMap } from 'rxjs/operators';
myObservable$.pipe(
concatMap(url => this.http.get(url))
).subscribe(val => console.log(val));

Again, it looks exactly the same apart from the fact we are using the concatMap operator. The result this will give us is:

(data from api-one)
(data from api-two)
(data from api-three)
(data from api-four)

That seems like what we want! It would work, but it’s actually not the most efficient way to do this. Imagine that each http.get() takes 1000ms to complete. The result of this stream would actually look like this:

(data from api-one) // arrives after 1 second
(data from api-two) // arrives after 2 seconds
(data from api-three) // arrives after 3 seconds
(data from api-four) // arrives after 4 seconds

In total, it takes us 4000ms to get all of the results. The special behaviour of concatMap is that it will wait for each inner observable to complete before moving on to the next value. It sort of creates a nice orderly queue as additional values arrive — like a bank or a post office (that can only help one customer at a time). The process in more detail would look like this:

  1. Receives a api-one value from the outer observable (myObservable$)
  2. Passes the api-one URL into the get call, and subscribes to it (the inner observable)
  3. Receives a new api-two value from the outer observable
  4. Receives a new api-three value from the outer observable
  5. Receives a new api-four value from the outer observable
  6. Emits the result of the api-one URL
  7. Passes the api-two URL into the get call, and subscribes to it (the inner observable)
  8. Emits the result of the api-two URL
  9. Passes the api-three URL into the get call, and subscribes to it (the inner observable)
  10. Emits the result of the api-three URL
  11. Passes the api-four URL into the get call, and subscribes to it (the inner observable)
  12. Emits the result of the api-four URL

Our concatMap receives all of the values very quickly, but it just deals with them one at a time. Once one is done, the next one is started. This behaviour is desirable where you want the observables to complete in order, but in this case we probably don’t care about the order, we just want them to be all fetched as fast as possible and at the same time.

That’s where our next operator comes into play.

mergeMap

The mergeMap operator is very similar to concatMap — it will also subscribe to and emit the values for all inner observables. The difference is that a mergeMap is not like an orderly queue at the post office or bank. Maybe it’s more like a really good bank where everyone gets served instantly by different tellers.

A mergeMap will instantly subscribe to the inner observable for any new values it receives, and will emit the results as soon as it has them. It doesn’t care about order or timing:

import { mergeMap } from 'rxjs/operators';
myObservable$.pipe(
mergeMap(url => this.http.get(url))
).subscribe(val => console.log(val));

In our last example, we assumed all requests took 1000ms to perform. This isn’t really realistic. The time it takes for a request to complete is going to vary. Let’s say each request might take anywhere between 800ms and 1200ms.

With mergeMap, this might result in a result like this:

(data from api-one) // arrives after 0.8 seconds
(data from api-two) // arrives after 0.9 seconds
(data from api-three) // arrives after 0.9 seconds
(data from api-four) // arrives after 1.1 seconds

or it might result in this:

(data from api-one) // arrives after 0.8 seconds
(data from api-four) // arrives after 0.8 seconds
(data from api-three) // arrives after 1 second
(data from api-two) // arrives after 1 second

Notice that in this case the api-four result is received second. The mergeMap operator doesn’t care about the order — everything gets subscribed to as soon as it arrives, and it will emit as soon as it completes.

You can use this operator when the order doesn’t matter. This is probably the most suitable operator to this example — we get all of the data from all of the URLs as quickly as possible — but it really depends on the specific situation.

exhaustMap

The last one we are going to look at is probably the least used (at least in my experience) but can still be useful. This one is more similar in spirit to switchMap. To recap, switchMap will cancel any inner observable in progress if a new value arrives from the outer observable. The exhaustMap operator will ignore values from the outer observable if it already has an active subscription for the inner observable. Once the inner observable completes, it will start accepting values from the outer observable again, but any values that were emitted before the completion of the inner observable will be missed.

To use our bank/post office analogy again, imagine you arrive at the bank but there is a customer already being served. Too bad, you’re just going to go home and you never get served. Maybe the next customer to come along is a bit luckier than you. When they arrive the first customer has finished being served, so now the customer that just arrived will get served.

Unlike the concatMap which will store/queue values that arrive during an active subscription, and then address them later, the exhaustMap will just drop them completely if they arrive during an active subscription:

import { exhaustMap } from 'rxjs/operators';
myObservable$.pipe(
exhaustMap(url => this.http.get(url))
).subscribe(val => console.log(val));

This might lead to a situation like this:

(data from api-one) // arrives after 1 second
(data from api-four) // arrives after 2 seconds

Whilst the first request is being processed, the values for api-two and api-three might be received. However, since the inner observable/HTTP request for api-one hasn’t completed yet, these values are ignored. If the api-four value arrives after the api-one request has been completed, then it will be processed. Depending on the timing of things, we might just end up with only the first api-one request being successfully completed.

Recap

All of the following questions assume we are dealing with higher-order observables:

    Which of the following best describes a higher-order observable?

    What types of operators do we use to deal with higher-order observables?

    Which operator should you use if you need the results from all inner observables in order?

    Which operator should you use if you only need the result from the latest inner observable?

    Which operator should you use if you only care about the current inner observable?

    Which operator should you use if you want all results as quick as possible, disregarding order?