实现一个Observable | 刷题打卡

我心飞翔 分类:javascript

题目描述

你用过RxJS吗?其中最重要的概念是 ObservableObserverObservable 定义了如何将值传递给 ObserverObserver只是一组回调函数,如下:

const observer = {
  next: (value) => {
     console.log('we got a value', value);
  },
  error: (error) => {
    console.log('we got an error', error);
  },
  complete: () => {
    console.log('ok, no more values');
  }
}
 

我们需要把上述ObserverObservable 做关联,Observer 向观察者提供值或者错误消息。

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
    subscriber.next(4)
  }, 100);
})
 

当有一个 订阅者 订阅的时候:

const sub = observable.subscribe(subscriber);
 

输出的结果是:

1
2
timeout 100ms
3
 

请你实现一个基本的Observable,使得上述描述的内容成为可能。
一些额外的要求列在了这里

  • error 和 complete只能触发一次。其后的next/error/complete 需要被忽略。
  • 在订阅的时候next/error/complete需要都不是必须。如果传入的是一个函数,这个函数需要被默认为next。
  • 需要支持多个订阅。

题目链接

解题思路

不了解 rxjs 的前提下可能会有点绕,从 发布订阅 的角度去理解会好点,我们会定义一个 Publisher 类,这个 Publisher 相当于一个管理者,有管理订阅者并通知一些事情的能力:

/**
 * 1. 添加观察者
 * 2. 移除观察者
 * 3. 通知观察者触发更新操作
 */
class Publisher {
  observers: Observer[] = [];
  name = '';

  add(observer: Observer) {
    this.observers.push(observer);
  }

  remove(observer: Observer) {
    this.observers.forEach(
      (item, index) => item === observer && this.observers.splice(index, 1)
    );
  }

  watch() {
    this.observers.forEach(observer => observer.update.call(this));
  }
  
  // -----------
  getName() {
    return this.name;
  }
  
  setName(name: string) {
    this.name = name;
    this.watch();
  }
}
 

订阅者 负责听 发布者 的消息通知:

class Observer {
  pubParam = {};

  update(publisher: Publisher) {
    this.pubParam = publisher.getName();
    this.work();
  }

  work() {
    console.log('这里可以拿发布者的一些参数...', this.pubParam);
  }
}
 
const pub = new Publisher();
const ob1 = new Observer();
const ob2 = new Observer();

pub.add(ob1);
pub.add(ob2);

// 通知 observer
pub.setName('djmughal');
 

回到正题...我们需要定义 Observable 类,接收一个函数,函数内部定义数据,通过 subscribe订阅 数据的变化,先完成这一步:

class Observable {
  constructor(setup) { // 定义 setup 函数用于接收 订阅者
    this.setup = setup;
  }

  subscribe(observer) {
    return this.setup(observer);
  }
}

const observable = Observable(fn);
observable.subscribe(observer);
 

Observable 首先会缓存一个执行函数,通过 subscribe 方法执行,所以 observerObservable 构造函数的参数,observer中包含 next error complete,入参定义如下:

// 定义 observer 类 + 实例化
class Observer {
  constructor() {
    // 定义状态,isComplete isError 的时候取消订阅
    this.isComplete = false;
    this.isError = false;
  }
  next() {}
  complete() {}
  error() {}
  unsubscribe() {
    this.next = () => null;
    this.complete = () => null;
    this.error = () => null;
  }
}
 

有了 observer 我们可以实现 observable 并且完成对其的订阅

const observable = new Observable(observer => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete("it's over");
        observer.next(5);
    }, 2000);
    observer.next(6);
});
const sub = observable.subscribe(observer);

// 1 2 3 6 -> timeout 2000ms -> 4
 

AC代码

class Observable {
constructor(setup) {
this.setup = setup;
}
subscribe(observer) {
// const observer = new Observer(subscriber);
this.setup(observer);
return {
unsubscribe: observer.__proto__.unsubscribe,
};
}
}
class Observer {
constructor(callback) {
this.isComplete = false;
this.isError = false;
}
next(value) {
if (!this.isComplete && !this.isError) {
console.log('status: next', value);
}
}
complete(value) {
if (!this.isComplete && !this.isError) {
console.log('status: complete', value);
}
this.isComplete = true;
this.unsubscribe();
}
error(err) {
if (!this.isComplete && !this.isError) {
console.log('status: error', err);
}
this.isError = true;
this.unsubscribe();
}
unsubscribe() {
this.next = () => null;
this.complete = () => null;
this.error = () => null;
}
}
const observable = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete("it's over");
observer.next(5);
}, 2000);
observer.next(6);
});
const observer = new Observer();
const sub = observable.subscribe(observer);
setTimeout(() => {
sub.unsubscribe();
}, 1000);

本文正在参与「掘金 2021 春招闯关活动」, 点击查看 活动详情

回复

我来回复
  • 暂无回复内容