Promise.any like Observable operator
The ECMAScript 2021 specification adds a new Promise.any
function. This post explains how to create a new RxJS operator porting the Promise.any
behavior to Observables.
Specification of Promise.any
Let’s first have a look at the Promise.any
signature: Promise.any<T>(values: Iterable<Promise<T>>): Promise<T>
. The full signature is a little more complicated, but for the sake of simplicity let’s focus on this simpler signature.
Promise.any
expects to be passed an Iterable
of Promise
elements. The promise will be fulfilled with the first promise that fulfils or reject with an error of type AggregateError
if all of the passed promises reject. The AggregateError
holds a list of all the rejection reasons. For more information about the new Promise.any
function see the ECMAScript Language Specification or the excellent explanation of Dr. Axel Rauschmayer in JavaScript for impatient programmers (ES2021 edition).
Related RxJS Operators
Before starting to implement the operator, lets have a look at some of the existing operators.
Race
The race operator will mirror the first source observer to emit a value. This operator is similar to the Promise.race
function. They differ in how the race
operator and the Promise.race
functions handle failures.
Promise.race
will reject if the first promise to settle results in a failure. This differs from the Promise.any
behavior which will reject if all Promises reject.
Similar the RxJS race
operator will report an error notification when the first observable to emit a notification will emit an error notification. Even if some of the other observables complete the first one to emit a value will win the race.
Fork Join
The forkJoin operator will subscribe to all observables simultaneously and return an array containing the last value notification emitted from each observable. This differs from Promise.any
because all values are returned. In case one of the passed Observables emit an error notification that error notification will be passed on to the subscribers. Therefore the semantics of forkJoin
are not the same as we would like them to be for our new operator.
Behavior of the new operator
Before implementing the new operator let’s specify its behavior:
- The values passed to the operator should be an iterable of observables (
Iterable<Observable>
). - The result of the operator will be the last non error notification of the first observable will emit a complete notification.
- If all observables will emit an error notification the operator should emit a downstream error notification with an
AggregateError
holding the individual errors from the passed source observables.
Implementation of the new operator
To implement the new operator we will use the new Observable
constructor function. It is also possible to combine existing RxJS operators via the pipe()
function to create the operator. That approach requires careful orchestration of existing operators and is quite complex. For demonstration purposes I created the new operator also by using existing operators. You can find the implementation in the Stackblitz accompanying this post. For now let’s concentrate on the simpler version by creating a new operator.
The any
operator subscribes to each of the input observables and tracks the completion or error notifications of those observables by storing the respective error or the last notification of the observables. If one of the observables emits a complete notification the last observed value notification (if present) of the corresponding observable is emitted to any downstream consumer followed by a complete notification. If all observables error the errors are aggregated by using the AggregateError
constructor and a downstream error notification is issued.
ts
import {from ,Observable ,ObservableInputTuple } from 'rxjs';export functionany <T extends readonly unknown[]>(sources : [...ObservableInputTuple <T >]):Observable <T [number]> {return newObservable ((subscriber ) => {constlength =sources .length ;if (!length ) {subscriber .complete ();return;}letvalues = newArray (length );letcompleted = false;leterrorCount =length ;for (letsourceIndex = 0;sourceIndex <length ;sourceIndex ++) {lethasValue = false;subscriber .add (from (sources [sourceIndex ]).subscribe ({next (v ) {hasValue = true;values [sourceIndex ] =v ;},error (e ) {values [sourceIndex ] =e ;if (!--errorCount ) {subscriber .error (newAggregateError (values ));}},complete () {if (!completed ) {completed = true;if (hasValue ) {subscriber .next (values [sourceIndex ]);}subscriber .complete ();}},}));}returnsubscriber ;});}
If you would like to explore this operator a bit more take a look at the Stackblitz. The example also contains unit tests to test the specified behavior of the operator. Have fun exploring!