RxJS 笔记 10

参考 简书原文 作者:readilen

1 Operators

1-1 combineLatest

取得各个 observable 最后送出的值,再输出成一个值

// source : ----0----1----2|
// newest : --0--1--2--3--4--5|
//
//     combineLatest(newest, (x, y) => x + y);
//
// example: ----01--23-4--(56)--7|
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete

一定会等两个 observable 都曾有送值出来才会呼叫我们传入的 callback,所以这段程式是这样运行的

  • newest 送出了 0,但此时 source 并没有送出过任何值,所以不会执行 callback
  • source 送出了 0,此时 newest 最后一次送出的值为 0,把这两个数传入 callback 得到 0。
  • newest 送出了 1,此时 source 最后一次送出的值为 0,把这两个数传入 callback 得到 1。
  • newest 送出了 2,此时 source 最后一次送出的值为 0,把这两个数传入 callback 得到 2。
  • source 送出了 1,此时 newest 最后一次送出的值为 2,把这两个数传入 callback 得到 3。
  • newest 送出了 3,此时 source 最后一次送出的值为 1,把这两个数传入 callback 得到 4。
  • source 送出了 2,此时 newest 最后一次送出的值为 3,把这两个数传入 callback 得到 5。
  • source 结束,但 newest 还没结束,所以 example 还不会结束。
  • newest 送出了 4,此时 source 最后一次送出的值为 2,把这两个数传入 callback 得到 6。
  • newest 送出了 5,此时 source 最后一次送出的值为 2,把这两个数传入 callback 得到 7。
  • newest 结束,因为 source 也结束了,所以 example 结束。
  • 不管是 source 还是 newest 送出值来,只要另一方曾有送出过值(有最后的值),就会执行 callback 并送出新的值,这就是 combineLatest。

1-2 zip

会取每个 observable 相同顺位的元素并传入 callback,也就是说每个 observable 的第 n 个元素会一起被传入 callback

// source : ----0----1----2|
// newest : --0--1--2--3--4--5|
//     zip(newest, (x, y) => x + y)
// example: ----0----2----4|
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.zip(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete

zip 会等到 source 跟 newest 都送出了第一个元素,再传入 callback,下次则等到 source 跟 newest 都送出了第二个元素再一起传入 callback,所以运行的步骤如下:

  • newest 送出了第一个值 0,但此时 source 并没有送出第一个值,所以不会执行 callback。
  • source 送出了第一个值 0,newest 之前送出的第一个值为 0,把这两个数传入 callback 得到 0。
  • newest 送出了第二个值 1,但此时 source 并没有送出第二个值,所以不会执行 callback。
  • newest 送出了第三个值 2,但此时 source 并没有送出第三个值,所以不会执行 callback。
  • source 送出了第二个值 1,newest 之前送出的第二个值为 1,把这两个数传入 callback 得到 2。
  • newest 送出了第四个值 3,但此时 source 并没有送出第四个值,所以不会执行 callback。
  • source 送出了第三个值 2,newest 之前送出的第三个值为 2,把这两个数传入 callback 得到 4。
  • source 结束 example 就直接结束,因为 source 跟 newest 不会再有对应顺位的值
// source : (hello)|
// source2: -0-1-2-3-4-...
//         zip(source2, (x, y) => x)
// example: -h-e-l-l-o|
var source = Rx.Observable.from('hello');
var source2 = Rx.Observable.interval(100);

var example = source.zip(source2, (x, y) => x);

1-3 withLatestFrom

// main   : ----h----e----l----l----o|
// some   : --0--1--0--0--0--1|
// 
// withLatestFrom(some, (x, y) =>  y === 1 ? x.toUpperCase() : x);
// 
// example: ----h----e----l----L----O|
var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);

var example = main.withLatestFrom(some, (x, y) => {
    return y === 1 ? x.toUpperCase() : x;
});

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

withLatestFrom 会在 main 送出值的时候执行 callback,但请注意如果 main 送出值时 some 之前没有送出过任何值 callback 仍然不会执行!
这裡我们在 main 送出值时,去判断 some 最后一次送的值是不是 1 来决定是否要切换大小写,执行步骤如下

  • main 送出了 h,此时 some 上一次送出的值为 0,把这两个参数传入 callback 得到 h。
  • main 送出了 e,此时 some 上一次送出的值为 0,把这两个参数传入 callback 得到 e。
  • main 送出了 l,此时 some 上一次送出的值为 0,把这两个参数传入 callback 得到 l。
  • main 送出了 l,此时 some 上一次送出的值为 1,把这两个参数传入 callback 得到 L。
  • main 送出了 o,此时 some 上一次送出的值为 1,把这两个参数传入 callback 得到 O。