Watching through a window: Building a moving window with RxJS
04.02.2020
Imagine yourself sitting in a train, landscapes passing by β trees, houses, sheep, people, hills, ... When I find myself in a situation like that I can't help but thinking of reactive data streams all the time.
All the things that rush by are items of a huge stream, and you subscribe to that Observable by watching through the window. However, here's an important detail: Whenever a new object appears outside, the window moves forward and opens the view to not just the recent one β but to the last few emitted items.
(GIF source: giphy.com)
Luckily there's more behind this than my peculiar interest for watching through train windows, so let's bring this example to the world of software. Imagine an event stream β this could be log messages, button clicks, or whatever Observable stream you like. We want to build a UI around this that displays a chronological log of those events. Just like the train window delimits the number of trees we can see at a time, we only want to display a certain range of the event history β the last few.
When you look into the direction of travel, new items always appear at the front and disappear behind you. We want to adopt this analogy and display the newest element at the top of the list before it runs to the bottom and eventually disappears.
A look into the documentation
My first approach to tackling reactive problems is a quick look into the RxJS API documentation.
It's very likely that an operator exists for solving the problem.
However, it's important to not let yourself confuse by the number of operators, especially those who sound suitable at first glance.
For a few minutes, I was obsessed with the thought that one of the buffer
or window
operators might be the solution.
They are not! Both buffer
and window
(and their relatives) collect values from the source and emit them all at once β after a time or when a signal appears.
This sounds good but is still not the right thing for us when we want to move a window forward.
Finally, I cleared my mind and started all over again.
If there is no suitable operator for you, it's the best to think in fundamentals and begin from the ground up using low-level operators like map
, filter
, reduce
and scan
.
About the scan operator
When you read about the scan
operator you often stumble upon examples that add up values, like this one:
import { scan } from 'rxjs/operators';
const source$ = of(1, 2, 3, 4, 5);
const result$ = source$.pipe(
scan((acc, item) => acc + item, 0)
);
// Result: 1, 3, 6, 10, 15
It transforms the stream of numbers to a stream of intermediate results from the addition performed as acc + item
.
The scan
operator is also the functional basis for Redux-style state management where we reduce a stream of actions to state objects.
But let's take a closer look at the scan
operator.
The first argument to scan
is a reducer function with two arguments itself: acc
and item
.
The argument item
is the emitted item from the source stream.
Whatever we return from the function will be the next item in the stream that flows out of the operator.
In a way, scan
is similar to map
here.
The key difference however is that the reducer also gets the result from its last emission as an argument (also called accumulator, hence the argument name acc
).
So whenever the source fires, we get the following arguments to the reducer:
- the new source item (
item
) - the previously calculated result (
acc
)
On first execution, acc
will be undefined since there is no previous calculation.
This is why we can provide a seed value as the secound argument to scan
which will be used as first value to acc
then.
The following code listing shows the general structure of how we use scan
:
function reducer(acc, item) {
// calculate next value
return nextValue;
}
scan(reducer, seed);
How does all this help us with the train window now?
Scanning train windows
Back to our train window problem: We want to produce a stream of arrays where each contains the last n
items from the source.
This array is the list we render in the UI to display the items.
When the source stream looks like this (one item per line)...
π
π¦
π
π
π
π
β½οΈ
π³
...and n
is 4, the result should look like this:
[π]
[π¦, π]
[π, π¦, π]
[π, π, π¦, π]
[π, π, π, π¦]
[π, π, π, π]
[β½οΈ, π, π, π]
[π³, β½οΈ, π, π]
Let's go ahead and scan over the source stream. This means, as an output we want to create an array that contains
- the new source item at the beginning
- the first
n - 1
items from the last result array
const n = 4;
const result$ = source$.pipe(
scan((acc, item) => [item, ...acc.slice(0, n - 1)], [])
);
We need to provide an empty array as seed here so that the array operation .slice()
does not fail for an undefined value.
So what can we achieve with this pattern? There are many possible practical cases: displaying the latest logging entries, the latest stock prices or just the latest tweets about a topic like #Angular
.
Think of a stream of events that you want to display in realtime, but truncated: then this example is for you.
Extra: Build a custom operator
To hide the complexity we can wrap this thing into a custom operator. I wrote a dedicated blog post about this topic here: Build your own RxJS logging operator.
export function shiftWindow(n: number = 4) {
return scan((acc, item) => [item, ...acc.slice(0, n - 1)], []);
}
const result$ = source$.pipe(
shiftWindow(4)
);
Demo
You can find a working demo on Stackblitz:
Special thanks to Jan-Niklas Wortmann and Johannes Hoppe for review and feedback.
Header image: Photo by "Free-Photos" on Pixabay, modified. The RxJS logo is under Apache License 2.0.
Suggestions? Feedback? Bugs? Please