实现一个Observable | 刷题打卡
分类:javascript
题目描述
你用过RxJS吗?其中最重要的概念是 Observable
和 Observer
。Observable
定义了如何将值传递给 Observer
。Observer
只是一组回调函数,如下:
const observer = {
next: (value) => {
console.log('we got a value', value);
},
error: (error) => {
console.log('we got an error', error);
},
complete: () => {
console.log('ok, no more values');
}
}
我们需要把上述Observer
和 Observable
做关联,Observer
向观察者提供值或者错误消息。
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
subscriber.next(4)
}, 100);
})
当有一个 订阅者
订阅的时候:
const sub = observable.subscribe(subscriber);
输出的结果是:
1
2
timeout 100ms
3
请你实现一个基本的Observable,使得上述描述的内容成为可能。
一些额外的要求列在了这里
- error 和 complete只能触发一次。其后的next/error/complete 需要被忽略。
- 在订阅的时候next/error/complete需要都不是必须。如果传入的是一个函数,这个函数需要被默认为next。
- 需要支持多个订阅。
题目链接
解题思路
不了解 rxjs
的前提下可能会有点绕,从 发布订阅
的角度去理解会好点,我们会定义一个 Publisher
类,这个 Publisher
相当于一个管理者,有管理订阅者并通知一些事情的能力:
/**
* 1. 添加观察者
* 2. 移除观察者
* 3. 通知观察者触发更新操作
*/
class Publisher {
observers: Observer[] = [];
name = '';
add(observer: Observer) {
this.observers.push(observer);
}
remove(observer: Observer) {
this.observers.forEach(
(item, index) => item === observer && this.observers.splice(index, 1)
);
}
watch() {
this.observers.forEach(observer => observer.update.call(this));
}
// -----------
getName() {
return this.name;
}
setName(name: string) {
this.name = name;
this.watch();
}
}
而 订阅者
负责听 发布者
的消息通知:
class Observer {
pubParam = {};
update(publisher: Publisher) {
this.pubParam = publisher.getName();
this.work();
}
work() {
console.log('这里可以拿发布者的一些参数...', this.pubParam);
}
}
const pub = new Publisher();
const ob1 = new Observer();
const ob2 = new Observer();
pub.add(ob1);
pub.add(ob2);
// 通知 observer
pub.setName('djmughal');
回到正题...我们需要定义 Observable
类,接收一个函数,函数内部定义数据,通过 subscribe
来 订阅
数据的变化,先完成这一步:
class Observable {
constructor(setup) { // 定义 setup 函数用于接收 订阅者
this.setup = setup;
}
subscribe(observer) {
return this.setup(observer);
}
}
const observable = Observable(fn);
observable.subscribe(observer);
Observable
首先会缓存一个执行函数,通过 subscribe
方法执行,所以 observer
是 Observable
构造函数的参数,observer
中包含 next
error
complete
,入参定义如下:
// 定义 observer 类 + 实例化
class Observer {
constructor() {
// 定义状态,isComplete isError 的时候取消订阅
this.isComplete = false;
this.isError = false;
}
next() {}
complete() {}
error() {}
unsubscribe() {
this.next = () => null;
this.complete = () => null;
this.error = () => null;
}
}
有了 observer
我们可以实现 observable
并且完成对其的订阅
const observable = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete("it's over");
observer.next(5);
}, 2000);
observer.next(6);
});
const sub = observable.subscribe(observer);
// 1 2 3 6 -> timeout 2000ms -> 4
AC代码
class Observable {
constructor(setup) {
this.setup = setup;
}
subscribe(observer) {
// const observer = new Observer(subscriber);
this.setup(observer);
return {
unsubscribe: observer.__proto__.unsubscribe,
};
}
}
class Observer {
constructor(callback) {
this.isComplete = false;
this.isError = false;
}
next(value) {
if (!this.isComplete && !this.isError) {
console.log('status: next', value);
}
}
complete(value) {
if (!this.isComplete && !this.isError) {
console.log('status: complete', value);
}
this.isComplete = true;
this.unsubscribe();
}
error(err) {
if (!this.isComplete && !this.isError) {
console.log('status: error', err);
}
this.isError = true;
this.unsubscribe();
}
unsubscribe() {
this.next = () => null;
this.complete = () => null;
this.error = () => null;
}
}
const observable = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete("it's over");
observer.next(5);
}, 2000);
observer.next(6);
});
const observer = new Observer();
const sub = observable.subscribe(observer);
setTimeout(() => {
sub.unsubscribe();
}, 1000);
本文正在参与「掘金 2021 春招闯关活动」, 点击查看 活动详情