Building Your Own State Utility
Our goal is to make our lives a bit easier by using some kind of utility to help manage our state management process. We are going to start by investigating creating our own little utility to help with this.
As an example, let’s consider the reducers for the ChecklistService from
the Quicklists application we built earlier:
constructor() { // reducers this.checklistsLoaded$.pipe(takeUntilDestroyed()).subscribe({ next: (checklists) => this.state.update((state) => ({ ...state, checklists, loaded: true, })), error: (err) => this.state.update((state) => ({ ...state, error: err })), });
this.add$.pipe(takeUntilDestroyed()).subscribe((checklist) => this.state.update((state) => ({ ...state, checklists: [...state.checklists, this.addIdToChecklist(checklist)], })) );
this.remove$.pipe(takeUntilDestroyed()).subscribe((id) => this.state.update((state) => ({ ...state, checklists: state.checklists.filter((checklist) => checklist.id !== id), })) );
this.edit$.pipe(takeUntilDestroyed()).subscribe((update) => this.state.update((state) => ({ ...state, checklists: state.checklists.map((checklist) => checklist.id === update.id ? { ...checklist, title: update.data.title } : checklist ), })) ); }There is clearly a lot of repetition here:
- pipe
- takeUntilDestroyed
- subscribe
- update
We repeat this process for just about every source we have. Let’s create a little utility to remove at least some of that repeated effort.
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';import { Observable } from 'rxjs';
export function reducer<T>( source$: Observable<T>, next: (value: T) => void, error?: (err: any) => void) { source$.pipe(takeUntilDestroyed()).subscribe({ next, error: error ? error : (err) => console.error(err), });}The idea here is that we have created a function called reducer that will take
in our source and it will handle subscribing to that source for us and piping
on the takeUntilDestroyed operator. It also allows us to supply functions that
we want to be triggered whenever there is a next value on the source or
optionally an error. It just calls our next function when the next on the
observable is triggered, and likewise for our error function if we supply it.
We are using another TypeScript concept called generics here as well. This
is a complex TypeScript topic that we are not going to cover in detail, but the
basic idea is that we have created a generic type T. By typing our source$
as Observable<T> our T type will become whatever type that observable emits.
If we pass reducer an observable that emits strings, then T will be of the
type string. This allows us to supply that same T type to our next values.
If our observable emits strings, then our value will have the type string.
This allows us to use this function generically with all different types of observable streams, whilst still retaining accurate type information.
Now to use our utility, instead of doing this to set up a reducer:
this.checklistsLoaded$.pipe(takeUntilDestroyed()).subscribe({ next: (checklists) => this.state.update((state) => ({ ...state, checklists, loaded: true, })), error: (err) => this.state.update((state) => ({ ...state, error: err })), });We could do this:
reducer( this.checklistsLoaded$, (checklists) => this.state.update((state) => ({ ...state, checklists, loaded: true, })), (error) => this.state.update((state) => ({ ...state, error })) );Now all we have to do is pass reducer our source and the functions we want
to use to update the state.
This removes a little bit of the boilerplate for us, which is nice. We could
take this further — we are still making a this.state.update call in all of our
reducers. This would be a good candidate for refactoring out into our reusable
utility.
This might be a good exercise in building something for the sake of learning,
but it’s also a case of reinventing the wheel. Luckily for us, there is
a fantastic utility available in the ngxtension library for Angular called
connect.
In the following lesson, we are going to take a look at using this connect
utility to handle our reducers, which does basically everything we want. To give
you a sense of why I mentioned not reinventing the wheel, just take a look at
the source code for the connect utility we will be using:
import { DestroyRef, Injector, type WritableSignal } from '@angular/core';import { takeUntilDestroyed } from '@angular/core/rxjs-interop';import { assertInjector } from 'ngxtension/assert-injector';import { Subscription, isObservable, type Observable } from 'rxjs';
type PartialOrValue<TValue> = TValue extends object ? Partial<TValue> : TValue;type Reducer<TValue, TNext> = ( previous: TValue, next: TNext) => PartialOrValue<TValue>;
type ConnectedSignal<TSignalValue> = { with<TObservableValue extends PartialOrValue<TSignalValue>>( observable: Observable<TObservableValue> ): ConnectedSignal<TSignalValue>; with<TObservableValue>( observable: Observable<TObservableValue>, reducer: Reducer<TSignalValue, TObservableValue> ): ConnectedSignal<TSignalValue>; subscription: Subscription;};
export function connect<TSignalValue>( signal: WritableSignal<TSignalValue>, injectorOrDestroyRef?: Injector | DestroyRef): ConnectedSignal<TSignalValue>;export function connect< TSignalValue, TObservableValue extends PartialOrValue<TSignalValue>>( signal: WritableSignal<TSignalValue>, observable: Observable<TObservableValue>, injectorOrDestroyRef?: Injector | DestroyRef): Subscription;export function connect<TSignalValue, TObservableValue>( signal: WritableSignal<TSignalValue>, observable: Observable<TObservableValue>, reducer: Reducer<TSignalValue, TObservableValue>, injectorOrDestroyRef?: Injector | DestroyRef): Subscription;export function connect( signal: WritableSignal<unknown>, ...args: [ (Observable<unknown> | (Injector | DestroyRef))?, (Reducer<unknown, unknown> | (Injector | DestroyRef))?, (Injector | DestroyRef)? ]) { const [observable, reducer, injectorOrDestroyRef] = parseArgs(args);
if (observable) { let destroyRef = null;
if (injectorOrDestroyRef instanceof DestroyRef) { destroyRef = injectorOrDestroyRef; // if it's a DestroyRef, use it } else { const injector = assertInjector(connect, injectorOrDestroyRef); destroyRef = injector.get(DestroyRef); }
return observable.pipe(takeUntilDestroyed(destroyRef)).subscribe((x) => { signal.update((prev) => { if (typeof prev === 'object' && !Array.isArray(prev)) { return { ...prev, ...((reducer?.(prev, x) || x) as object) }; }
return reducer?.(prev, x) || x; }); }); }
return { with(this: ConnectedSignal<unknown>, ...args: unknown[]) { if (!this.subscription) { this.subscription = new Subscription(); } else if (this.subscription.closed) { console.info(`[ngxtension connect] ConnectedSignal has been closed.`); return this; } this.subscription.add( connect( signal, ...(args as any), injectorOrDestroyRef ) as unknown as Subscription ); return this; }, subscription: null!, } as ConnectedSignal<unknown>;}
function parseArgs( args: [ (Observable<unknown> | (Injector | DestroyRef))?, (Reducer<unknown, unknown> | (Injector | DestroyRef))?, (Injector | DestroyRef)? ]): [ Observable<unknown> | null, Reducer<unknown, unknown> | null, Injector | DestroyRef | null] { if (args.length > 2) { return [ args[0] as Observable<unknown>, args[1] as Reducer<unknown, unknown>, args[2] as Injector | DestroyRef, ]; }
if (args.length === 2) { const [arg, arg2] = args; const parsedArgs: [ Observable<unknown>, Reducer<unknown, unknown> | null, Injector | DestroyRef | null ] = [arg as Observable<unknown>, null, null]; if (typeof arg2 === 'function') { parsedArgs[1] = arg2; } else { parsedArgs[2] = arg2 as Injector | DestroyRef; }
return parsedArgs; }
const arg = args[0]; if (isObservable(arg)) { return [arg, null, null]; }
return [null, null, arg as Injector | DestroyRef];}All of this results in an innocent little connect function that is super
simple to use. When you want something to work generically in a multitude of
different circumstances, it must by necessity be complex — dealing with the
typing of the utility and all possible usages is typically a challenging aspect.
Your own utilities can generally be simpler, because you can design them for just the specific use cases you need them for, but it can still be a rather complex task.
However, I wouldn’t discourage you from trying to build something like this. As complicated as the code above looks, it was written by a human person who once knew nothing about Angular or TypeScript.
It is also generally much more intimidating to look at completed code with no context — these solutions are generally built up over time. Your own code is generally easier to understand because you wrote it and slowly built it up over time — if you were able to look at your own completed code with no prior context it might feel similarly complicated.
If you strip out all of the TypeScript from the code above and just slowly focus on what is happening at each step, it becomes quite possible to understand. This is a bit of a tangent, but let’s try that. We will be heavily using this utility anyway so it won’t hurt to have deeper knowledge of how it works behind the scenes.
Here is the code with all of the typing information removed:
import { DestroyRef, Injector, type WritableSignal } from '@angular/core';import { takeUntilDestroyed } from '@angular/core/rxjs-interop';import { assertInjector } from 'ngxtension/assert-injector';import { Subscription, isObservable, type Observable } from 'rxjs';
export function connect(signal, injectorOrDestroyRef?);export function connect(signal, observable, injectorOrDestroyRef?);export function connect(signal, observable, reducer, injectorOrDestroyRef?);export function connect(signal, ...args) { const [observable, reducer, injectorOrDestroyRef] = parseArgs(args);
if (observable) { let destroyRef = null;
if (injectorOrDestroyRef instanceof DestroyRef) { destroyRef = injectorOrDestroyRef; // if it's a DestroyRef, use it } else { const injector = assertInjector(connect, injectorOrDestroyRef); destroyRef = injector.get(DestroyRef); }
return observable.pipe(takeUntilDestroyed(destroyRef)).subscribe((x) => { signal.update((prev) => { if (typeof prev === 'object' && !Array.isArray(prev)) { return { ...prev, ...((reducer?.(prev, x) || x) as object) }; }
return reducer?.(prev, x) || x; }); }); }
return { with(this, ...args) { if (!this.subscription) { this.subscription = new Subscription(); } else if (this.subscription.closed) { console.info(`[ngxtension connect] ConnectedSignal has been closed.`); return this; } this.subscription.add(connect(signal, ...args, injectorOrDestroyRef)); return this; }, subscription: null!, };}
function parseArgs(args) { if (args.length > 2) { return [args[0], args[1], args[2]]; }
if (args.length === 2) { const [arg, arg2] = args; const parsedArgs = [arg]; if (typeof arg2 === 'function') { parsedArgs[1] = arg2; } else { parsedArgs[2] = arg2; }
return parsedArgs; }
const arg = args[0]; if (isObservable(arg)) { return [arg, null, null]; }
return [null, null, arg];}Still, far from simple, but it probably looks much less arcane and a little more approachable at this point. Now we can try to break it down step-by-step. First, we have this:
export function connect(signal, injectorOrDestroyRef?);export function connect(signal, observable, injectorOrDestroyRef?);export function connect(signal, observable, reducer, injectorOrDestroyRef?);export function connect(signal, ...args) {This is a bit strange because we are defining the connect function multiple
times. Maybe you already know about the concept of function overloading
which is what is being used here. But if you didn’t, you can make this the
object of your focus and research things like:
In JavaScript, why would the same function be defined multiple times?
Soon enough you will discover the concept of function overloading, and you can learn all about it.
What this means is that we are able to use the connect function in different
ways by supplying it with different parameters.
For example, the first overload allows us to use it like this:
connect(mySignal);or this:
connect(mySignal, destroyRef);We can pass it just a signal, or optionally we can also supply it
a DestroyRef. The connect function works just like our manual subscribes we
have been doing — it uses the takeUntilDestroyed operator to automatically
unsubscribe when the thing that called it is destroyed. This works automatically
if it is called within an injection context (e.g. within the constructor) but
if it is called elsewhere then you will need to manually supply the
DestroyRef — that is why this optional parameter is here.
The second overload allows us to use it like this:
connect(mySignal, myStream$);Again, we can optionally supply a DestroyRef if we want as well. You might
notice that we can now do both of these:
connect(mySignal, destroyRef);connect(mySignal, myStream$);Both of these calls have the same number of parameters, but with function
overloading we can change the behaviour of the function based on the type of
the parameters being passed in. The destroyRef and myStream$ will be
different types and will match different function overload signatures.
The third overload allows us to supply a reducer:
connect(mySignal, myStream$, (prevState, valueFromStream) => ({}));We will see why this is a little later, but for now we just know it is an option. The last function definition looks a little different:
export function connect(signal, ...args)What ...args does generally is that it would allow us to provide an indefinite
number of parameters to this function and we would be able to use them, e.g:
connect(mySignal, 1, 2, 3, 4, 5, 6, 7)We could access all of those extra parameters in the function with args.
However, in the context of function overloading, this is being used for
a specific purpose. All of the previous declarations of the function define the
different signatures for the function that we can make use of, but this:
export function connect(signal, ...args)Is the beginning of the actual definition — we need to capture the parameters
from whichever way we are using this function, and then deal with those args
dynamically to determine how the function should behave. The first parameter is
always the signal we want to connect to, but the rest can vary.
Now we can look inside the function itself. The first thing it does is parse the
arguments that were passed in as args:
const [observable, reducer, injectorOrDestroyRef] = parseArgs(args);This is done using the parseArgs function:
function parseArgs(args) { if (args.length > 2) { return [args[0], args[1], args[2]]; }
if (args.length === 2) { const [arg, arg2] = args; const parsedArgs = [arg]; if (typeof arg2 === 'function') { parsedArgs[1] = arg2; } else { parsedArgs[2] = arg2; }
return parsedArgs; }
const arg = args[0]; if (isObservable(arg)) { return [arg, null, null]; }
return [null, null, arg];}The idea here is that we want it to return to us an array containing the observable, reducer, and DestroyRef — in that order — but depending on what was supplied, some of these might be null.
We just have some logic running to determine what to return based on the supplied arguments.
- If there was more than 2 arguments provided, just return all of them in the array
- If there are exactly 2 arguments provided, we need to check if the second argument is the
reducerfunction or aDestroyRef. If it is the reducer we assign it to spot[1]in theargs, otherwise we know it is theDestroyRefand we assign it to spot[2] - If we have just one argument it could either be an
observableor aDestroyRef— if it is anobservablewe set it as the first argument, and set the others tonull, otherwise it must be theDestroyRefand we set it as the last argument
Now we have a nice array that we know the structure of: [observable, reducer, destroyref] — if any of them were not supplied they will be null.
Now we can move on to dealing with what happens in the function if an
observable is supplied:
if (observable) { let destroyRef = null;
if (injectorOrDestroyRef instanceof DestroyRef) { destroyRef = injectorOrDestroyRef; // if it's a DestroyRef, use it } else { const injector = assertInjector(connect, injectorOrDestroyRef); destroyRef = injector.get(DestroyRef); }
return observable.pipe(takeUntilDestroyed(destroyRef)).subscribe((x) => { signal.update((prev) => { if (typeof prev === 'object' && !Array.isArray(prev)) { return { ...prev, ...((reducer?.(prev, x) || x) as object) }; }
return reducer?.(prev, x) || x; }); }); }We have been referring to just a DestroyRef but technically it could actually
either be a DestroyRef or an Injector. If it is a DestroyRef we just use
that directly, otherwise we use the injector to get access to the
DestroyRef.
Then we subscribe to the observable that was supplied — using takeUntilDestroyed with the destroyRef — and then we update the supplied signal with the new values.
First we check if the value in the signal is an object (an actual object, not
just an array which is also technically an object).
If it is an object, we use this as the value for the signal:
return { ...prev, ...((reducer?.(prev, x) || x) as object) };It uses all the previous values in the object, and then overwrites a portion of
the object with whatever value was emitted on the observable. If a reducer was
being used then it uses the value from that instead by calling it with the
current previous state and the value from the observable.
If the value in the signal is not an object, then we just directly use the
value from the reducer if it is being used, or we directly use the value from
the observable if it is not.
The last bit of this function is the return statement:
return { with(this, ...args) { if (!this.subscription) { this.subscription = new Subscription(); } else if (this.subscription.closed) { console.info(`[ngxtension connect] ConnectedSignal has been closed.`); return this; } this.subscription.add(connect(signal, ...args, injectorOrDestroyRef)); return this; }, subscription: null!, };We are returning an object that contains two properties:
withwhich is a functionsubscriptionwhich is initiallynullbut is set later with a reference to the current observable subscription
The general idea with with here is that it lets us chain onto the initial
connect call, effectively allowing us to call it as many times as we like. We
will use this a lot in the following lessons, and we will talk about why
specifically, but it allows us to do this:
connect(mySignal) .with(someStream$) .with(someOtherStream$, someReducer) .with(yetAnotherStream$, yetAnotherReducer);Each time we call with it will set up another subscription for us, and again
return us another object with a with function that we can then again use to
chain on more with calls:
this.subscription.add(connect(signal, ...args, injectorOrDestroyRef));So, breaking it down bit by bit like this, we have a pretty good understanding of how this all actually works behind the scenes. Maybe we learned a thing or two about building our own utilities as well!
Again, understanding specifically how connect works is not required, most
people will just use it and not worry about it and that’s fine. But there is
certainly something to be learned here.