I have a stream of values that I limit using a lower and upper bound, and to decrease the amount of logging I would like to only emit values if they change. The problem is that the second distinctUntilChanged()
in the following snippet still produces duplicates:
Observable // emits i.e. [2, 2, 2, 5, 5, 10, 20]
.distinctUntilChanged() // becomes [2, 5, 10, 20]
.map(target => {
const correctedTarget = Math.min(Math.max(target, MINIMUM), MAXIMUM); // Let's say min: 5, max: 10
if(correctedTarget != target) {
logger.warn(`Prediction wants to scale out-of-bounds to ${target}, limiting to ${correctedTarget}`);
}
return correctedTarget;
}) // becomes [5, 5, 10, 10]
.distinctUntilChanged() // should be [5, 10], but is [5, 5, 10, 10]
The RxJs docs state that the filter defaults to a simple equality comparison, so I’d expect that this should Just Work™.
Advertisement
Answer
One of my colleagues (once again) identified the problem (also thanks for the help Matt). It turns out that the initial assumption was wrong – the outline of the code was as follows:
Observable // emits credentials on some interval
.flatmap(credentials => {
return Observable2.doSomething()
.distinctUntilChanged()
})
As you can see, the distinctUntilChanged
was chained to Observable2
, which is a new observable stream every time the credentials get emitted. This also explains why the comparator function I made doesn’t get called at all: there is just one value every time so there is nothing to compare to.