I have a stream of values that I limit using a lower and upper bound, and to decrease the amount of logging I would like to only emit values if they change. The problem is that the second distinctUntilChanged()
in the following snippet still produces duplicates:
Observable // emits i.e. [2, 2, 2, 5, 5, 10, 20] .distinctUntilChanged() // becomes [2, 5, 10, 20] .map(target => { const correctedTarget = Math.min(Math.max(target, MINIMUM), MAXIMUM); // Let's say min: 5, max: 10 if(correctedTarget != target) { logger.warn(`Prediction wants to scale out-of-bounds to ${target}, limiting to ${correctedTarget}`); } return correctedTarget; }) // becomes [5, 5, 10, 10] .distinctUntilChanged() // should be [5, 10], but is [5, 5, 10, 10]
The RxJs docs state that the filter defaults to a simple equality comparison, so I’d expect that this should Just Workâ˘.
Advertisement
Answer
One of my colleagues (once again) identified the problem (also thanks for the help Matt). It turns out that the initial assumption was wrong – the outline of the code was as follows:
Observable // emits credentials on some interval .flatmap(credentials => { return Observable2.doSomething() .distinctUntilChanged() ... })
As you can see, the distinctUntilChanged
was chained to Observable2
, which is a new observable stream every time the credentials get emitted. This also explains why the comparator function I made doesn’t get called at all: there is just one value every time so there is nothing to compare to.