Image of Thomas Mair tmair.dev

Promise.any like Observable operator

rxjsoperatorpromise

The ECMAScript 2021 specification adds a new Promise.any function. This post explains how to create a new RxJS operator porting the Promise.any behavior to Observables.

Specification of Promise.any

Let’s first have a look at the Promise.any signature: Promise.any<T>(values: Iterable<Promise<T>>): Promise<T>. The full signature is a little more complicated, but for the sake of simplicity let’s focus on this simpler signature.

Promise.any expects to be passed an Iterable of Promise elements. The promise will be fulfilled with the first promise that fulfils or reject with an error of type AggregateError if all of the passed promises reject. The AggregateError holds a list of all the rejection reasons. For more information about the new Promise.any function see the ECMAScript Language Specification or the excellent explanation of Dr. Axel Rauschmayer in JavaScript for impatient programmers (ES2021 edition).

Before starting to implement the operator, lets have a look at some of the existing operators.

Race

The race operator will mirror the first source observer to emit a value. This operator is similar to the Promise.race function. They differ in how the race operator and the Promise.race functions handle failures.

Promise.race will reject if the first promise to settle results in a failure. This differs from the Promise.any behavior which will reject if all Promises reject.

Similar the RxJS race operator will report an error notification when the first observable to emit a notification will emit an error notification. Even if some of the other observables complete the first one to emit a value will win the race.

Fork Join

The forkJoin operator will subscribe to all observables simultaneously and return an array containing the last value notification emitted from each observable. This differs from Promise.any because all values are returned. In case one of the passed Observables emit an error notification that error notification will be passed on to the subscribers. Therefore the semantics of forkJoin are not the same as we would like them to be for our new operator.

Behavior of the new operator

Before implementing the new operator let’s specify its behavior:

  1. The values passed to the operator should be an iterable of observables (Iterable<Observable>).
  2. The result of the operator will be the last non error notification of the first observable will emit a complete notification.
  3. If all observables will emit an error notification the operator should emit a downstream error notification with an AggregateError holding the individual errors from the passed source observables.

Implementation of the new operator

To implement the new operator we will use the new Observable constructor function. It is also possible to combine existing RxJS operators via the pipe() function to create the operator. That approach requires careful orchestration of existing operators and is quite complex. For demonstration purposes I created the new operator also by using existing operators. You can find the implementation in the Stackblitz accompanying this post. For now let’s concentrate on the simpler version by creating a new operator.

The any operator subscribes to each of the input observables and tracks the completion or error notifications of those observables by storing the respective error or the last notification of the observables. If one of the observables emits a complete notification the last observed value notification (if present) of the corresponding observable is emitted to any downstream consumer followed by a complete notification. If all observables error the errors are aggregated by using the AggregateError constructor and a downstream error notification is issued.

ts
import { from, Observable, ObservableInputTuple } from 'rxjs';
 
export function any<T extends readonly unknown[]>(
sources: [...ObservableInputTuple<T>]
): Observable<T[number]> {
return new Observable((subscriber) => {
const length = sources.length;
if (!length) {
subscriber.complete();
return;
}
 
let values = new Array(length);
let completed = false;
let errorCount = length;
 
for (let sourceIndex = 0; sourceIndex < length; sourceIndex++) {
let hasValue = false;
subscriber.add(
from(sources[sourceIndex]).subscribe({
next(v) {
hasValue = true;
values[sourceIndex] = v;
},
error(e) {
values[sourceIndex] = e;
if (!--errorCount) {
subscriber.error(new AggregateError(values));
}
},
complete() {
if (!completed) {
completed = true;
if (hasValue) {
subscriber.next(values[sourceIndex]);
}
subscriber.complete();
}
},
})
);
}
 
return subscriber;
});
}

If you would like to explore this operator a bit more take a look at the Stackblitz. The example also contains unit tests to test the specified behavior of the operator. Have fun exploring!