Promise.any like Observable operator
The ECMAScript 2021 specification adds a new Promise.any function. This post
explains how to create a new RxJS operator porting the Promise.any behavior to
Observables.
Specification of Promise.any
Let’s first have a look at the Promise.any signature:
Promise.any<T>(values: Iterable<Promise<T>>): Promise<T>. The full signature
is a little more complicated, but for the sake of simplicity let’s focus on this
simpler signature.
Promise.any expects to be passed an Iterable of Promise elements. The
promise will be fulfilled with the first promise that fulfils or reject with an
error of type AggregateError if all of the passed promises reject. The
AggregateError holds a list of all the rejection reasons. For more information
about the new Promise.any function see the
ECMAScript Language Specification or
the excellent explanation of Dr. Axel Rauschmayer in
JavaScript for impatient programmers.
Related RxJS Operators
Before starting to implement the operator, lets have a look at some of the existing operators.
Race
The race operator will mirror the
first source observer to emit a value. This operator is similar to the
Promise.race function. They differ in how the race operator and the
Promise.race functions handle failures.
Promise.race will reject if the first promise to settle results in a failure.
This differs from the Promise.any behavior which will reject if all Promises
reject.
Similar the RxJS race operator will report an error notification when the
first observable to emit a notification will emit an error notification. Even if
some of the other observables complete the first one to emit a value will win
the race.
Fork Join
The forkJoin operator will
subscribe to all observables simultaneously and return an array containing the
last value notification emitted from each observable. This differs from
Promise.any because all values are returned. In case one of the passed
Observables emit an error notification that error notification will be passed on
to the subscribers. Therefore the semantics of forkJoin are not the same as we
would like them to be for our new operator.
Behavior of the new operator
Before implementing the new operator let’s specify its behavior:
- The values passed to the operator should be an iterable of observables
(
Iterable<Observable>). - The result of the operator will be the last non error notification of the first observable will emit a complete notification.
- If all observables will emit an error notification the operator should emit a
downstream error notification with an
AggregateErrorholding the individual errors from the passed source observables.
Implementation of the new operator
To implement the new operator we will use the new Observable constructor
function. It is also possible to combine existing RxJS operators via the
pipe() function to create the operator. That approach requires careful
orchestration of existing operators and is quite complex. For demonstration
purposes I created the new operator also by using existing operators. You can
find the implementation in the
Stackblitz
accompanying this post. For now let’s concentrate on the simpler version by
creating a new operator.
The any operator subscribes to each of the input observables and tracks the
completion or error notifications of those observables by storing the respective
error or the last notification of the observables. If one of the observables
emits a complete notification the last observed value notification (if present)
of the corresponding observable is emitted to any downstream consumer followed
by a complete notification. If all observables error the errors are aggregated
by using the AggregateError constructor and a downstream error notification is
issued.
import { from, Observable, ObservableInputTuple } from 'rxjs';
export function any<T extends readonly unknown[]>(
sources: [...ObservableInputTuple<T>],
): Observable<T[number]> {
return new Observable((subscriber) => {
const length = sources.length;
if (!length) {
subscriber.complete();
return;
}
let values = new Array(length);
let completed = false;
let errorCount = length;
for (let sourceIndex = 0; sourceIndex < length; sourceIndex++) {
let hasValue = false;
subscriber.add(
from(sources[sourceIndex]).subscribe({
next(v) {
hasValue = true;
values[sourceIndex] = v;
},
error(e) {
values[sourceIndex] = e;
if (!--errorCount) {
subscriber.error(new AggregateError(values));
}
},
complete() {
if (!completed) {
completed = true;
if (hasValue) {
subscriber.next(values[sourceIndex]);
}
subscriber.complete();
}
},
}),
);
}
return subscriber;
});
}
If you would like to explore this operator a bit more take a look at the Stackblitz. The example also contains unit tests to test the specified behavior of the operator. Have fun exploring!