函数式编程中的并发

我心飞翔 分类:javascript

序言

当我们需要同时发起多个请求,并且需要等待所有请求都返回的时候处理一段逻辑,即为并发。
当我们需要同时发起多个请求,并且需要处理第一个最快返回的请求逻辑,即为竟态。
相信优秀的读者们或多或少都处理过此类问题。
但是在函数式编程中如何处理此类问题却鲜有人提起,本文将简单介绍如何通过函数式编程以及函子之间的作用来处理这两类问题:

// 先批量造出一些异步请求
const createTaskByNumber = n => new Task((rej, res) => {
      console.log(n);
      setTimeout(() => res(n), n * 1000);
});
const [http1, http3, http5] = [1, 3, 5].map(n => createTaskByNumber(n));
 

这样我们同时得到了需要等待响应1/3/5秒的请求。此处的Task来自data.task,笔者之前的文章都提到过他。

现在

在讲解之前,先来对比一下远古时期的处理方法。

// 处理并发
const collect: any[] = [];
[http1, http3, http5].forEach((http, index) => {
    http().then(res => {
        collect[index] = res;

        if (collect.length === 3) {
            const [x, y, z] = collect;
            console.log(x + y + z);
        }
    });
});

// 处理竟态
const collect: any[] = [];
[http1, http3, http5].forEach((http, index) => {
    http().then(res => {
        collect[index] = res;

        if (collect.length === 1) {
            console.log(res);
        }
    });
});
 

原理都很简单 通过维护一个数组判断数组内容是否都请求完毕来执行业务逻辑,以此可以处理并发和竟态。然而不足之处我们需要维护更多的变量与以此带来的维护成本。

幸运的是,es6给我们带来了最珍贵的礼物Promise.借助Promise.all&Promise.race。处理此类问题变得更加得心应手。

// 处理并发
Promise.all([http1, http3, http5]).then(res => {
    const [x, y, z] = res;
    console.log(x + y + z)
});

// 处理竟态
Promise.race([http1, http3, http5]).then(res => {
     console.log(res);
});
 

好了,介绍完当前的常用解法,也该来看看函数式编程的方案,可能会有点扫兴,如果要用函数式和函子去处理以上2种需求,所需要进行的操作和理解成本都不低,不过没关系,笔者会仔细讲解,仔细到源码级别。

Task.prototype.ap

先来讲讲ap,这是实现函数式编程处理并发的核心函数。
data.task对于ap函数的注解如下:

/**
 * Applys the successful value of the `Task[α, (β → γ)]` to the successful
 * value of the `Task[α, β]`
 *
 * @summary @Task[α, (β → γ)] => Task[α, β] → Task[α, γ]
 */
 

简单翻译一下,也就是说可以把一个函子中的函数ap(apply: 应用)于另外一个函子中resolve的值。这么讲可能有点抽象,我们先来看看ap用起来是什么样子把。

const {error, log} = console;

// 需要ap的函数放进函子里
const add = new Task((rej, res) => {
    res(x => x + 1);
});

// 需要被ap的值放进函子里
const one = new Task((rej, res) => {
    res(2);
});

add.ap(one).fork(error, log) // 3
 

咋一看好像平平无奇,不足称奇,笔者当初也是这么以为的。别急,我们再来点更复杂的场景。给他加上异步试试。

const add = new Task(function (reject, resolve) {
    setTimeout(function () {
        resolve(function (x) {
            return x + 1;
        });
    }, 1000);
});

const one = new Task(function (reject, resolve) {
    setTimeout(function () {
        resolve(2);
    }, 2000);
});

add.ap(one).fork(error, log); // 3
 

咿呀,好像开始有点意思了,无论add中的逻辑多么复杂即使有异步有延迟脚本,只要最后能resolve到一个函数作为successful value,one也是一样,能有个resolve值,那么add.ap(one) 最后执行的结果都是一样的。
有点像add只要ap(爱❤️)上了one,不过跨越千山万水,终会相见的浪漫。

来看看ap函数有何魔法,即使所爱(ap)隔山海,也要ap到你的魔力

// ap源码
Task.prototype.ap = function _ap(that) {
  var forkThis = this.fork; // 保存add
  var forkThat = that.fork; // 保存one
  var cleanupThis = this.cleanup;
  var cleanupThat = that.cleanup;

  function cleanupBoth(state) {
    cleanupThis(state[0]);
    cleanupThat(state[1]);
  }
  
  // add.ap(one)返回一个新的task
  return new Task(function(reject, resolve) {
  
    // 新的task被fork之后调用本函数
    
    var func, funcLoaded = false; // 保存add中resolve的函数
    var val, valLoaded = false; // 保存one中resolve的值
    var rejected = false;
    var allState;
    
    // 此处调用forkThis 也就是调用了add函子中的function (reject, resolve) {
    //    setTimeout(function () {
    //        resolve(function (x) {
    //            return x + 1;
    //        });
    //    }, 1000);
    // }
    // 同时把guardReject传入reject guardResolve(xxx)返回的一个函数传入resolve
    var thisState = forkThis(guardReject, guardResolve(function(x) {
      funcLoaded = true;
      func = x;
    }));
    
    // 此处调用forkThis 也就是调用one函子中的函数 均是同理
    var thatState = forkThat(guardReject, guardResolve(function(x) {
      valLoaded = true;
      val = x;
    }));

    // 生成resolve函数传入调用的add/one的函数的resolve参数
    function guardResolve(setter) {
      return function(x) {
          
        // 其中任意一方reject掉都会return
        if (rejected) {
          return;
        }
           
        // 这里是关键,setter会拿到函子中最终resolve的值
        // add函子的setter会设置func funcLoaded
        // one函子的setter会设置val valLoaded
        setter(x); 
        
        // 当两个函子都被resolved掉
        // 这也就是为什么两函子内部多复杂 即使有异步也无妨的原因
        if (funcLoaded && valLoaded) {
          delayed(function(){ cleanupBoth(allState) });
          
          // 这里就会执行 func(val) 也就是add(1)
          // resolve也就是fork函数中传入的log
          return resolve(func(val));
        } else {
          return x;
        }
      }
    }

    function guardReject(x) {
      if (!rejected) {
        rejected = true;
        return reject(x);
      }
    }

    return allState = [thisState, thatState];
  }, cleanupBoth);
};
 

见识到ap的魔力之后 那么我们如何利用ap去处理并发呢?其实距离真相不远了。假设add中的逻辑没有异步,one中的逻辑有异步,再来多几个one two three函子,并把add同时ap于多个异步函子,那么这不就实现了处理并发吗?来试试看把

// 为了代码简洁引入Task.of
const of = Task.of;
const add = (x,y,z) => x + y + z; // 这里是处理3个并发请求完之后的逻辑

// 这一步非常重要,不是直接把add传入,而是传入一个柯里化的add
const addTask = Task.of(curry(add));

// 还是3个老伙计
const [http1, http3, http5] = [1, 3, 5].map(n => createTaskByNumber(n));

// show time
addTask.ap(http1).ap(http3).ap(http5).fork(error, log); // 9
 

惊奇把~每一次ap都会返回一个新的task,新的task内部都会执行一次func(val)
每一次func(val)后都会又返回一个新的函数,交给下一次ap的func,以此实现了并发处理逻辑。

但是这不够优雅,也不够好用,需要一种类似Promise.all这样的api,让并发处理更“傻子透明”点。动手封装一个all把

const all = curry((arr: any[], f) => {
    return new Task((rej, res) => {
        const task = arr.reduce((a, b) => {
            return a.ap(b);
        }, of(curry(f)));
        task.fork(rej, res);
    });
});

// 借助all 轻松实现处理并发
const handleAllHttp = curry((https, f) => compose(fork(error, log), all(https), () => f));

const handleHttp135 = handleAllHttp([http1, http3, http5]);

// 各种逻辑的处理并发
const addHttp135 = handleHttp135((x, y, z) => x + y + z);
const minusHttp135 = handleHttp135((x, y, z) => x - y - z);
const multiplyHttp135 = handleHttp135((x, y, z) => x * y * z);


addHttp135(); // 9
minusHttp135(); // -7
multiplyHttp135(); // 15
 

总结

函数式编程就是如此的富有魅力,时至今日,依然痴迷于ap函数的浪漫~

回复

我来回复
  • 暂无回复内容