ChatGPT 给我的「万能的」控制 Promise 并发的方法

背景

现在开发的项目需要调用平台的 OpenAPI,众所周知 OpenAPI 是有并发限制的。正好最近就发现了一个并发的问题:Promise.all 并发了 20 多个请求,有 5、6 个失败了

本来这个上网搜一下就有的,但是灵机一动,为什么不问一下 C 老师呢。于是,笔者被教会了一个新方法(这个方法网上应该也能搜到,但是 C 老师有讲解啊),聊天放在最后。

代码解析

Decorator

首先,C 老师上来就给了我一个 Decorator – limitConcurrency

// 自定义装饰器函数,用于限制并发请求数量
function limitConcurrency(fn, limit) {
  const semaphore = new Semaphore(limit);
  
  return async function(...args) {
    await semaphore.acquire();
    
    try {
      return await fn.apply(this, args);
    } finally {
      semaphore.release();
    }
  };
}

// 使用装饰器包裹 fetch 请求
const limitedFetch = limitConcurrency(fetch, 5);

// 并发发起 20 个请求
const urls = ['url1', 'url2', 'url3', ...]; // 假设有 20 个请求
const promises = urls.map(url => limitedFetch(url));

Promise.all(promises)
  .then(responses => {
    // 处理所有请求的响应
  })
  .catch(error => {
    // 处理错误
  });

嗯~~ 很好,我就是要这种 Decorator,可以最低限度的降低代码的修改,侵入性非常小。

limitConcurrency 也跟我想的一样,就是个普通的闭包。等等……这个 Semaphore 是什么?代码呢?看来看去,这才是最关键的吧!

Semaphore

于是继续问 C 老师代码。嚯~~ 怎么说呢,就感觉真的是一个老程序员在给我讲解,聊天细节放在后面了,这里先解析代码:

export class Semaphore {
  private limit: number;
  private count: number;
  private queue: (() => void)[];

  constructor(limit: number) {
    this.limit = limit;
    this.count = 0;
    this.queue = [];
  }

  public acquire() {
    return new Promise<void>((resolve) => {
      if (this.count < this.limit) {
        this.count++;
        resolve();
      } else {
        this.queue.push(resolve);
      }
    });
  }

  public release() {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift();
      resolve?.();
    } else {
      this.count--;
    }
  }
}

C 老师管这个对象叫「信号量」,笔者研究了半天才理解了里面的玄机,不禁拍案叫绝。这解耦真的是太彻底了,这个「信号量」可以说是万能的。我们具体来看下它的用法:

function limitConcurrency(fn, limit) {
  const semaphore = new Semaphore(limit);
  
  return async function(...args) {
    await semaphore.acquire();
    
    try {
      return await fn.apply(this, args);
    } finally {
      semaphore.release();
    }
  };
}

闭包里面 new 一个实例这个就不解释了,主要是为了注入 limit(并发次数限制)这个数据,然后我们重点来看其 acquirerelease 调用的时机,分别是 fn 执行前和 fn 完成后。

我们先跳出来想一下,如果要实现并发控制,大概的逻辑就是:

  1. 如果新增的请求数量小于 limit,那就即刻发起请求;
  2. 如果新增的请求数量等于 limit,那就等着,一直等着,等到有请求完成之后你再执行;

OK,毫无疑问,release 是在请求完成后调用的,这个好理解,感觉也比较好实现。那么关键就是:

怎么才能让新请求一直等着

我们来看 acquire 的实现:

  public acquire() {
    return new Promise<void>((resolve) => {
      if (this.count < this.limit) {
        this.count++;
        resolve(); // 立即执行
      } else {
        this.queue.push(resolve); // 存起来
      }
    });
  }

关键就在 this.queue.push(resolve) 这一句,笔者也是反应了好久才恍然大悟的。Promise 只有执行 resolve 才算完成,它这里直接把 resolve 放到队列里了,就是不执行。你就在这挂着吧,一直等到我主动执行你你才能完成

再看 release 的代码,就很好理解了,就是从队列里拿 resolve,然后执行,也就是主动完成 Promise。

好了,现在可以回来看闭包了,如果请求满了,await semaphore.acquire() 这句代码会一直「挂着」,不会往下执行 fn,直到队列里有 Promise 完成为止。

总结

我们可以看到,Semaphore 非常的独立,C 老师称它为「信号量」还是很贴切的。它就是一个信号控制,与业务逻辑完全解耦,理论上它可以运用到各种频控场景下,可以成为「万能」了。

笔者因为想减少侵入性,所以用闭包实现了个 Decorator,对于用得少的同学来说,估计需要反应一阵子。实际上如果不考虑减少侵入性,直接把 Semaphore 用到业务代码里也是可以的。比如:

const semaphore = new Semaphore(5);

const request = async (args) => {
  await semaphore.acquire();
  try {
    const res = await fetch(args);
    const data = res.json();
    return data;
  } finally {
    semaphore.release();
  }
};

所以关键还是 Semaphore 的设计,真是精彩,学到了!

什么是教育? 教,指引方向;育,陪伴成长。——安妮爸

聊天记录

ChatGPT 给我的「万能的」控制 Promise 并发的方法

PS:最近在寻找有没有适合自己的副业。有没有什么远程、海外的项目机会?欢迎给我留言啊

原文链接:https://juejin.cn/post/7265210393047056424 作者:DylanlZhao

(0)
上一篇 2023年8月10日 上午10:00
下一篇 2023年8月10日 上午10:10

相关推荐

发表回复

登录后才能评论