RxJS 笔记 06

参考 简书原文 作者:readilen

1 Observable

1-1 create

var source = Rx.Observable
    .create(function(observer) {
        observer.next('Jerry');
        observer.next('Anna');
        observer.complete();
    });

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log(error)
    }
});

// Jerry
// Anna
// complete!

1-2 of

var source = Rx.Observable.of('Jerry', 'Anna');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

// Jerry
// Anna
// complete!

1-3 from

var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
var source = Rx.Observable.from(arr);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

// Jerry
// Anna
// 2016
// 2017
// 30 days
// complete!
var source = Rx.Observable.from('铁人赛');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});
// 铁
// 人
// 赛
// complete!

1-4 fromPromise

var source = Rx.Observable
  .from(new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Hello RxJS!');
    },3000)
  }))

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log(error)
    }
});

// Hello RxJS!
// complete!

1-5 fromEvent (Event)

取得 DOM 物件的常用方法:
document.getElementById()
document.querySelector()
document.getElementsByTagName()
document.getElementsByClassName()

var source = Rx.Observable.fromEvent(document.body, 'click');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

// MouseEvent {...}
class Producer {
    constructor() {
        this.listeners = [];
    }
    addListener(listener) {
        if(typeof listener === 'function') {
            this.listeners.push(listener)
        } else {
            throw new Error('listener 必须是 function')
        }
    }
    removeListener(listener) {
        this.listeners.splice(this.listeners.indexOf(listener), 1)
    }
    notify(message) {
        this.listeners.forEach(listener => {
            listener(message);
        })
    }
}
// ------- 以上 观察者模式 -------- //

var egghead = new Producer(); 
// egghead 同时具有 注册监听者及移除监听者 两种方法

var source = Rx.Observable
    .fromEventPattern(
        (handler) => egghead.addListener(handler), 
        (handler) => egghead.removeListener(handler)
    );

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
})

egghead.notify('Hello! Can you hear me?');
// Hello! Can you hear me?

Rx.Observable
    .fromEventPattern(
        egghead.addListener.bind(egghead), 
        egghead.removeListener.bind(egghead)
    )
    .subscribe(console.log)

1-6 never(无穷的 observable), empty(空的 observable) ,throw(异常)

never 它就是一个一直存在但却什麽都不做的 observable

var source = Rx.Observable.never();

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

empty 它会立即送出 complete 的讯息!

var source = Rx.Observable.empty();

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});
// complete!

throw 只做一件事就是抛出错误

var source = Rx.Observable.throw('Oop!');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// Throw Error: Oop!

1-7 interval 持续的行为 , timer

interval 有一个参数必须是数值(Number),这的数值代表发出讯号的间隔时间(ms)。

var source = Rx.Observable.create(function(observer) {
    var i = 0;
    setInterval(() => {
        observer.next(i++);
    }, 1000)
});

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// 1
// 2
// .....

var source = Rx.Observable.interval(1000);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// 1
// 2
// ...

timer 有两个参数时,第一个参数代表要发出第一个值的等待时间(ms),第二个参数代表第一次之后发送值的间隔时间

var source = Rx.Observable.timer(1000);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// complete!

var source = Rx.Observable.timer(1000, 5000);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// 1
// 2 ...

2 Subscription

我们可能会在某些行为后不需要这些资源,要做到这件事最简单的方式就是 unsubscribe。
其实在订阅 observable 后,会回传一个 subscription 物件,这个物件具有释放资源的unsubscribe 方法

Events observable 尽量不要用 unsubscribe

var source = Rx.Observable.timer(1000, 1000);

// 取得 subscription
var subscription = source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});

setTimeout(() => {
    subscription.unsubscribe() // 停止订阅(退订), RxJS 4.x 以前的版本用 dispose()
}, 5000);
// 0
// 1
// 2
// 3
// 4