Skip to content
Advertisement

RxJs: distinctUntilChanged still emits duplicate values

I have a stream of values that I limit using a lower and upper bound, and to decrease the amount of logging I would like to only emit values if they change. The problem is that the second distinctUntilChanged() in the following snippet still produces duplicates:

Observable // emits i.e. [2, 2, 2, 5, 5, 10, 20]
.distinctUntilChanged() // becomes [2, 5, 10, 20]
.map(target => {
  const correctedTarget = Math.min(Math.max(target, MINIMUM), MAXIMUM); // Let's say min: 5, max: 10
  if(correctedTarget != target) {
    logger.warn(`Prediction wants to scale out-of-bounds to ${target}, limiting to ${correctedTarget}`);
  }
  return correctedTarget;
}) // becomes [5, 5, 10, 10]
.distinctUntilChanged() // should be [5, 10], but is [5, 5, 10, 10]

The RxJs docs state that the filter defaults to a simple equality comparison, so I’d expect that this should Just Work™.

Advertisement

Answer

One of my colleagues (once again) identified the problem (also thanks for the help Matt). It turns out that the initial assumption was wrong – the outline of the code was as follows:

Observable // emits credentials on some interval
.flatmap(credentials => {
  return Observable2.doSomething()
         .distinctUntilChanged()
         ...
})

As you can see, the distinctUntilChanged was chained to Observable2, which is a new observable stream every time the credentials get emitted. This also explains why the comparator function I made doesn’t get called at all: there is just one value every time so there is nothing to compare to.

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement