《 从没用过的有趣 rxjs》

前言

一直听过 rxjs 的大名, 但是真没用过,今天看了「珠峰架构」 的关于 rxjs 的公开课,发现 rxjs 还挺有趣的😁

使用

1. 观测普通数据

const observable = new Observable(subscriber => {
 subscriber.next(1);
 subscriber.next(2);
 subscriber.next(3);
 subscriber.complete();
});

// 接收一个对象
observable.subscribe({
 next: value => console.log('next value:', value),
 complete: () => {
   console.log('complete')
 }
})
// 接受一个函数
observable.subscribe(value => console.log('next value:', value))

《 从没用过的有趣 rxjs》

2. 数组管道操作

let r = [1, 2, 3];
const subscriber = from(r)
  .pipe(map(val => val * 2)) // [2,4,6]
  .pipe(filter(val => val > 3)) //[4,6]
  .pipe(map(data => data + 1)); //[5,7]

// subscriber.subscribe(val => console.log(val, "abcd"));

subscriber.subscribe({
  next: val => console.log(val),
});

《 从没用过的有趣 rxjs》

3. 异步调用

function task(state) {
  console.log("state: ", state);
  if (state < 5) {
    this.schedule(state + 1, 2000);
  }
}
// 最后一个参数从哪开始
asyncScheduler.schedule(task, 10000, 2);

《 从没用过的有趣 rxjs》

4. 定时任务

interval(500).pipe(take(3)).subscribe(console.log);

《 从没用过的有趣 rxjs》

5. 发布订阅数据

const subject = new Subject();

subject.subscribe({ next: data => console.log("observerA: ", data) });
subject.subscribe({ next: data => console.log("observerB: ", data) });

subject.next(1); // observerA:1  observerA:  2

subject.next(2); // observerB:1  observerA:  2

《 从没用过的有趣 rxjs》

感觉还是方便的,如果想要了解更多 Api 可以点击这里 👉 rxjs.dev/guide/overv…

Api

简单的源码实现

1. Observable

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

observable.subscribe(value => console.log("next value:", value));

observable.subscribe({
  next: value => console.log("next value:", value),
  complete: () => {
    console.log("complete");
  },
});

可以看出 Observable 是一个 类,接收一个 函数,同时它的实例上有一个叫做 subscribe 的方法,subscribe 可以接收 一个 函数 / 一个对象

所以我们可以写出这样的代码

class Observable {
  constructor(subscribe) {
    if (subscribe) {
      this._subscribe = subscribe;
     }
   }
  
   subscribe(observerOrNext){
    let subscriber = null;
    // 构造出一个对象出来
    if (isFunction(observerOrNext)) {
      subscriber = {
        next: observerOrNext,
      };
    } else {
      subscriber = observerOrNext;
    }
    
    // 执行 complate 
    subscriber.complete ??= () => {};
    this._subscribe(subscriber);
    
    return subscriber;
   } 
 }

《 从没用过的有趣 rxjs》

虽然可以完成基本需要,但是有些丑陋,而且功能杂糅到一起,让我们来进行一些抽离

通过 Subscriber类 对传入的数据进行包装,增加对应的功能

  subscribe(observerOrNext) {

    const subscriber = new Subscriber(observerOrNext);
    this._subscribe(subscriber);
    return subscriber;
  }

Subscriber类 中增加 next / complate 方法,通过一个标志位 isStopped 来控制函数的执行

class Subscriber {
  isStopped = false;
  
  constructor(observerOrNext) {
   
    let observer = null;
    if (isFunction(observerOrNext)) {
      observer = {
        next: observerOrNext
      }
    } else {
      observer = observerOrNext
    }
    //把观察者对象存到了订阅者对象的destination属性上
    this.destination = observer;
  }
  next(value) {
    if (!this.isStopped) {
      this.destination.next(value);
    }
  }
  //如果调用了complete方法,就表示生产完毕了
  complete() {
    if (!this.isStopped) {
      this.isStopped = true;
      this.destination.complete?.();
    }
  }
}

Subscriber类 的初始化中,我们格式化用户传递的数据,无论传入的是对象还是函数,都使其变成一个对象形式;用户在Observable传入的回调函数中传递的 next中传入的值,都被 Subscribernext 收到

那么在 new Observable 接受的函数 就是 一个 Subscriber 实例

graph TD
Observable类 -->接收一个函数
接收一个函数--> 存入Observable类的this._subscribe
Observable类 -->实例执行subscribe
实例执行subscribe-->传入Subscriber类并返回实例
传入Subscriber类并返回实例-->格式化参数为对象形式
格式化参数为对象形式-->执行Subscriber实例上的next/complate方法

2. 数组调用

使用 from 接收一个数组,返回值可以使用 pipe 管道函数接收 map 或者 filter 对数组进行处理,并且可以监听转化过程

let r = [1, 2, 3];
const subscriber = from(r)
  .pipe(map(val => val * 2)) // [2,4,6]
  .pipe(filter(val => val > 3)) //[4,6]
  .pipe(map(data => data + 1)); //[5,7]

// subscriber.subscribe(val => console.log(val, "abcd"));

subscriber.subscribe({
  next: val => console.log(val),
});

from 的这个函数接收一个数组

function from(arrayLike) {
  return new Observable(subscriber => {
    for (let i = 0; i < arrayLike.length; i++) {
      subscriber.next(arrayLike[i]);
    }
    subscriber.complete();
  });
}

哦,那么它又用到了 Observable,返回一个 Observable 实例,遍历执行 next 方法;

Observable 需要加上方法 pipe
由于 pipe 可以进行链式调用,并且可以进行值传递,所以简单来说,可以写成下面这种,把当前实例当做参数传递

class Observable{
    // ...
     pipe(operations){
         return operations(this)
     }
}

那么 mapfilter 应该满足什么条件呢?

首先能够返回值能够调用 Observable类 中的 pipe 方法,所以必须返回一个 Observable实例

其次还有返回值能够传递给下一个 pipe

🚩所以 在 map 中 的 source 是一个老 的 Observable,同时需要返回一个转化过的新的 Observable ,这个新的 Observable 交由 pipe 返回,这样可以不断的链式调用

function map(project) {
  //operation 传入老的Observable,返回新的Observable
  return source => {
    return new Observable(function (subscriber) {
      return source.subscribe({
        ...subscriber,
        //最关键的是要重写next方法,此value是老的Observable传过来的老值
        next: (value) => {
          subscriber.next(project(value));
        }
      })
    });
  }
}

同样的 ,filter

function filter(predicate) {
  //operation 传入老的 Observable,返回新的Observable
  return source => {
    return new Observable(function (subscriber) {
      return source.subscribe({
        ...subscriber,
        //最关键的是要重写next方法,此value是老的Observable传过来的老值
        next: (value) => {
          predicate(value) && subscriber.next(value);
        }
      })
    });
  }
}

mapfilter 都是高阶函数
通过不断的返回新的 Observable让链式调用不断的执行,并且source 都是上一次 this

3. 异步处理

function task(state) { 
    console.log("state: ", state);
    if (state < 5) { 
        this.schedule(state + 1, 2000);
    }
} 
// 最后一个参数从哪开始 
asyncScheduler.schedule(task, 10000, 2);

asyncScheduler.schedule 接收3个参数,一个函数,一个执行间隔,一个初始值

其实很简单,就是一个定时器不断的执行

我们来看看

const asyncScheduler = new Scheduler(AsyncAction)

asyncScheduler 是 Scheduler 的一个实例, 接收 AsyncAction类 作为参数

执行this.schedulerActionCtor = schedulerActionCtorAsyncAction保存在自己身上

class Scheduler {
  constructor(schedulerActionCtor) {
    this.schedulerActionCtor = schedulerActionCtor;
  }
  /**
   * setTimeout
   * @param {*} work 要执行的工作
   * @param {*} delay 延迟的时间
   * @param {*} state 初始状态
   * @returns 
   */
  schedule(work, delay = 0, state) {
    return new this.schedulerActionCtor(work).schedule(state, delay);
  }
}

可以看出 schedule 方法也是执行的是 AsyncAction 中的 schedule 方法

来到 AsyncAction类

AsyncAction 初始化接受一个存储 传入的函数,然后在 schedule方法 中接收 statedelay

class AsyncAction {
  pending = false
  constructor(work) {
    this.work = work;
  }
  
  //当调用此方法的时候,我要在delay时间后,以state作为参数调用work方法
  schedule(state, delay = 0) {
    this.state = state;
    this.delay = delay;
    
    // 清除定时器把上次的清除掉,避免多个定时器执行
    if (this.timerID !== null) {
      this.timerID = this.recycleAsyncId(this.timerID)
    }
    
    //表示有任务等待执行
    this.pending = true;
    this.timerID = this.requestAsyncId(delay);
  }
  
  recycleAsyncId(timerID) {
    if (timerID !== null) {
      clearInterval(timerID)
    }
    return null;
  }
  
  requestAsyncId(delay = 0) {
    return setInterval(this.execute, delay)
  }
  
  execute = () => {
    this.pending = false;
    this.work(this.state);
    //如果在 work中没有调度新的任务的话,那就把定时器也清掉
    if (this.pending === false && this.timerID !== null) {
      this.timerID = this.recycleAsyncId(this.timerID);
    }
  }
}

因为是在 setInterval 中执行的 execute,为了保证 this 指向,所以使用 箭头函数 保证 execute 的指向

4. 定时任务

interval(1000).subscribe(v => console.log(v));
interval(500).pipe(take(3)).subscribe(console.log);

interval 接收一个执行 delayTime,其实内部利用了 setInterval

function interval(
  dueTime = 0,
  scheduler = asyncScheduler
) {
  return new Observable(subscriber => {
    let n = 0;
    return scheduler.schedule(function () {
      subscriber.next(n++);
      if (dueTime > 0) {
        this.schedule(undefined, dueTime);
      } else {
        subscriber.complete();
      }
    }, dueTime);
  });
}

没错,使用了 asyncScheduler 那个 异步类 – const asyncScheduler = new Scheduler(AsyncAction);; 此时scheduler.schedule指代的是Scheduler类中的schedule

所以 work 是一个匿名函数,delay 是 dueTime,state 是 undefined

class Scheduler {
  constructor(schedulerActionCtor) {
    this.schedulerActionCtor = schedulerActionCtor;
  }
  /**
   * setTimeout
   * @param {*} work 要执行的工作
   * @param {*} delay 延迟的时间
   * @param {*} state 初始状态
   * @returns 
   */
  schedule(work, delay = 0, state) {
    return new this.schedulerActionCtor(work).schedule(state, delay);
  }
}

🚀到了在 this.schedule(undefined, dueTime) 中 this 指向的是 schedulerActionCtor也就是 AsyncAction,因为 scheduler.schedule的调用本质上是 new this.schedulerActionCtor 的调用

5. 发布订阅数据

const subject = new Subject();

subject.subscribe({ next: data => console.log("observerA: ", data) });
subject.subscribe({ next: data => console.log("observerB: ", data) });

subject.next(1);
subject.next(2);

可以简单理解为

class Subject {
  observers = [];
  subscribe(subscriber) {
    this.observers.push(subscriber);
  }
  next(value) {
    for (const subscriber of this.observers) {
      subscriber.next(value);
    }
  }
  complete() {
    for (const subscriber of this.observers) {
      subscriber.complete();
    }
  }
}

使用内部变量 observers 收集 传入实例 subjectsubscribe 的函数,当实例 subject 执行 next 时,拿出所有的收集到的函数,依次执行

总结

rxjs 有很多的 Api, 只列取了很少一部分,有兴趣可以去看看rxjs 中文文档

原文链接:https://juejin.cn/post/7258555175495958584 作者:好大猫

(0)
上一篇 2023年7月23日 上午10:13
下一篇 2023年7月23日 上午10:24

相关推荐

发表回复

登录后才能评论