函数式编程中的竟态

吐槽君 分类:javascript

序言

当我们需要同时发起多个请求,并且需要等待所有请求都返回的时候处理一段逻辑,即为并发。
当我们需要同时发起多个请求,并且需要处理第一个最快返回的请求逻辑,即为竟态。
相信优秀的读者们或多或少都处理过此类问题。
但是在函数式编程中如何处理此类问题却鲜有人提起,本文将简单介绍如何通过函数式编程以及函子之间的作用来处理这两类问题
处理并发的部分写在了另外一篇函数式编程中的并发,里面注重介绍了ap函数的作用和原理。

// 先批量造出一些异步请求
const createTaskByNumber = n => new Task((rej, res) => {
      console.log(n);
      setTimeout(() => res(n), n * 1000);
});
const [http1, http3, http5] = [1, 3, 5].map(n => createTaskByNumber(n));
 

这样我们同时得到了需要等待响应1/3/5秒的请求。此处的Task来自data.task,笔者之前的文章都提到过他。

现在

在讲解之前,先来对比一下远古时期的处理方法。

// 处理并发
const collect: any[] = [];
[http1, http3, http5].forEach((http, index) => {
    http().then(res => {
        collect[index] = res;

        if (collect.length === 3) {
            const [x, y, z] = collect;
            console.log(x + y + z);
        }
    });
});

// 处理竟态
const collect: any[] = [];
[http1, http3, http5].forEach((http, index) => {
    http().then(res => {
        collect[index] = res;

        if (collect.length === 1) {
            console.log(res);
        }
    });
});
 

原理都很简单 通过维护一个数组判断数组内容是否都请求完毕来执行业务逻辑,以此可以处理并发和竟态。然而不足之处我们需要维护更多的变量与以此带来的维护成本。

幸运的是,es6给我们带来了最珍贵的礼物Promise.借助Promise.all&Promise.race。处理此类问题变得更加得心应手。

// 处理并发
Promise.all([http1, http3, http5]).then(res => {
    const [x, y, z] = res;
    console.log(x + y + z)
});

// 处理竟态
Promise.race([http1, http3, http5]).then(res => {
     console.log(res);
});
 

好了,介绍完当前的常用解法,也该来看看函数式编程的方案,可能会有点扫兴,如果要用函数式和函子去处理以上2种需求,所需要进行的操作和理解成本都不低,不过没关系,笔者会仔细讲解,仔细到源码级别。

Task.prototype.concat

相对于处理并发的ap,concat显得相对简单一些。
data.task对于concat函数的注解如下:

/**
 * Selects the earlier of the two tasks `Task[α, β]`
 *
 * @summary @Task[α, β] => Task[α, β] → Task[α, β]
 */
 

简单翻译一下,处理两个函子中resolve最早的那一个,先看看以下的用例。

const {error, log} = console;

http1.concat(http3).concat(http5).fork(error, log); // 1
 

这种链式的调用有点不雅,第二点是concat这个名字有点难以理解,咋一听以为是数组或者字符方法,而且我们也希望向标准靠拢,向Promise.race看齐。封装一个race函数

function race(arr: any[]) {
    return new Task((rej, res) => {
        const task = arr.reduce((a, b) => {
            return a.concat(b);
        });
        task.fork(rej, res);
    });
}
const handleAllHttp = curry((f, https) => compose(fork(error.f), race, () => https);

// 演变各种处理竟态的函数
const add = handleAllHttp(x => x + 1);
const multiply= handleAllHttp(x => x * x);

add([http1, http3, http5]); // 2
multiply([http1, http3, http5]); // 1
 

concat的原理相对于ap简单很多,理解了ap再来理解concat不是什么难事

// 源码
Task.prototype.concat = function _concat(that) {
  var forkThis = this.fork; // 保存当前task
  var forkThat = that.fork; // 保存被concat的task
  var cleanupThis = this.cleanup;
  var cleanupThat = that.cleanup;

  function cleanupBoth(state) {
    cleanupThis(state[0]);
    cleanupThat(state[1]);
  }
    
    // concat过后返回一个新task
  return new Task(function(reject, resolve) {
    var done = false; // 是否有task被resolve
    var allState;
    
    // 执行当前task的函数 并把reject, resolve传入
    var thisState = forkThis(guard(reject), guard(resolve));
    // 同上
    var thatState = forkThat(guard(reject), guard(resolve));

    return allState = [thisState, thatState];
    
    // 传入新task的resolve
    // 并返回一个函数 依赖于上层的闭包
    // 当这个函数被函子中resolve调用时更新done
    // 并返回新task resolve的结果
    function guard(f) {
      return function(x) {
        if (!done) {
          // 这里很重要
          // 当出现链式调用或者concat多个函子时
          // 由于这里的函数始终维持着定定义时的闭包关系
          // 本函数会guard传入各个函子中 各函子通过的resolve都会调用本函数
          // 所以竟态的条件done都是唯一的,以此来控制
          done = true;
          delayed(function(){ cleanupBoth(allState) })
          return f(x);
        }
      };
    }
  }, cleanupBoth);

};
 

总结

little function, large magic~

回复

我来回复
  • 暂无回复内容