[tokio实战] 如何在rust中调用腾讯&百度的翻译服务

如何在rust中调用腾讯&百度的翻译服务

阅读本篇博客后,您将掌握关于Tokio库实现批量任务处理的实用技巧,并了解如何有效地进行并发限制。

背景

在开发ComfyUI Startup插件管理和模型管理功能的过程中,我发现对模型数据和插件数据的需求。在研究ComfyUI Manager如何实现将模型下载至特定目录时,我注意到其仓库中包含的数据文件仅提供英文版本。为了满足国际化需求,我决定着手翻译这些数据文件。然而,显然逐一手动翻译并不现实。因此,我选择利用tokioreqwest库调用百度或腾讯的翻译API,以自动化的方式进行翻译工作。

先实现一个翻译请求的调用

use crate::CLIENT;
use anyhow::{anyhow, Result};
use once_cell::sync::Lazy;
use serde_json::Value;

static APP_ID: Lazy<String> = Lazy::new(|| std::env::var("APP_ID").unwrap());
static APP_KEY: Lazy<String> = Lazy::new(|| std::env::var("APP_KEY").unwrap());
static URL: &str = "http://api.fanyi.baidu.com/api/trans/vip/translate";

// 对请求进行封装,只获取想要的数据
pub async fn trans(src: &str, from: &str, to: &str) -> Result<Value> {
    let res: Value = CLIENT
        .get(URL)
        .query(&[
            ("q", src),
            ("appid", &APP_ID),
            ("from", from),
            ("to", to),
            ("salt", "10086"),
            ("sign", &sign(src)),
        ])
        .send()
        .await?
        .json()
        .await?;

    let trans_result = res["trans_result"]
        .as_array()
        .ok_or(anyhow!("{res}"))?
        .first()
        .unwrap()["dst"]
        .clone();

    Ok(trans_result)
}

// 签名方法,参考百度翻译API文档
fn sign(q: &str) -> String {
    let str = format!("{}{}10086{}", &*APP_ID, q, &*APP_KEY);
    let digest = md5::compute(str);
    format!("{:x}", digest)
}

然后再实现批量调用

起初,我设想采用for循环逐个调用的方式实现功能,但这种方式未能充分利用Tokio框架提供的高并发特性。这一思考受到了TypeScript编程经验的影响,在TypeScript中,Promise会在定义后立即执行,而Rust语言中的Future则不同,它们仅在遇到await表达式时才会真正执行其异步操作。因此,为了适应Rust及Tokio的异步编程模型,我们需要采取更恰当的方式来调度并发任务以发挥其优势。

pub async fn run<P: AsRef<Path>>(
    url: &str,
    target_field: &str,
    path: P,
    from: &str,
    to: &str,
    field: Option<&str>,
) -> Result<()> {
    let custom_nodes: Value = CLIENT.get(url).send().await?.json().await?;
    let nodes = if field.is_some() {
        &custom_nodes[field.unwrap()]
    } else {
        &custom_nodes
    };
    // 前面都不重要,只需要知道nodes就是要翻译的数据数组
    let nodes = nodes.as_array().ok_or(anyhow!("{nodes:#?}"))?;

    let mut new_nodes = nodes.clone();
    let time = std::time::Instant::now();

    for node in new_nodes.iter_mut() {
        let src = node[target_field]
            .as_str()
            .ok_or(anyhow!("no description"))?;

        let zh = match trans(src, from, to).await {
            Ok(zh) => zh,
            Err(e) => {
                error!("{}", e);
                return Err(e);
            }
        };

        info!("{} -> {}", src, zh);
        node[format!("{to}_{target_field}")] = zh;
        warn!("one task time:{:?}", time.elapsed());
    }

    // 将数据写入文件中
    info!("all time: {:?}", time.elapsed());
    let file = std::fs::File::create(&path)?;
    serde_json::to_writer_pretty(file, &new_nodes)?;
    info!("write to {}", path.as_ref().display().to_string());
    Ok(())
}

第二版本,使用join_all来批量执行

pub async fn run2<P: AsRef<Path>>(
    url: &str,
    target_field: &str,
    path: P,
    from: &str,
    to: &str,
    field: Option<&str>,
) -> Result<()> {
    let custom_nodes: Value = CLIENT.get(url).send().await?.json().await?;
    let nodes = if field.is_some() {
        &custom_nodes[field.unwrap()]
    } else {
        &custom_nodes
    };
    // 前面都不重要,只需要知道nodes就是要翻译的数据数组
    let nodes = nodes.as_array().ok_or(anyhow!("{nodes:#?}"))?;

    let mut new_nodes = nodes.clone();
    let time = std::time::Instant::now();
		
  	// 这里使用map转换成异步任务,但还没执行
    let tasks = new_nodes.iter_mut().map(|node| async {
        let src = node[target_field]
            .as_str()
            .ok_or(anyhow!("no description"))?;

        let zh = match trans(src, from, to).await {
            Ok(zh) => zh,
            Err(e) => {
                error!("{}", e);
                return Err(e);
            }
        };

        info!("{} -> {}", src, zh);
        node[format!("{to}_{target_field}")] = zh;
        warn!("one task time:{:?}", time.elapsed());
        Ok(())
    });
  	
  	// 使用futures::future::join_all来批量执行这些任务  	
    join_all(tasks).await;
    // 将数据写入文件中
    info!("all time: {:?}", time.elapsed());
    let file = std::fs::File::create(&path)?;
    serde_json::to_writer_pretty(file, &new_nodes)?;
    info!("write to {}", path.as_ref().display().to_string());
    Ok(())
}

这里将数据转换成批量异步任务,再使用join_all来等待他们所有执行完成。

其实这里有更好的方法就是使用futures::stream::FuturesUnordered,这里为了简便就使用了join_all。

并发限制

在实践高并发场景时,我发现百度API存在一定的调用频率限制,即每秒允许的请求次数有限制。因此,为了避免超出限制,我不得不对并发任务进行控制。鉴于这种情况下的时间敏感性,我没有选择使用tokio::sync::Semaphore作为并发限制工具,而是采用了批量分时执行策略:将任务划分为多个批次,每一批次之间的执行间隔为3秒,确保每批任务在前一批次执行满3秒后才开始执行,从而符合接口调用频率要求。

pub async fn run<P: AsRef<Path>>(
    url: &str,
    target_field: &str,
    path: P,
    from: &str,
    to: &str,
    field: Option<&str>,
) -> Result<()> {
    let custom_nodes: Value = CLIENT.get(url).send().await?.json().await?;
    let nodes = if field.is_some() {
        &custom_nodes[field.unwrap()]
    } else {
        &custom_nodes
    };
    let nodes = nodes.as_array().ok_or(anyhow!("{nodes:#?}"))?;

    let mut new_nodes = nodes.clone();
    let time = std::time::Instant::now();
		// 按5个进行分批
    for i in (0..new_nodes.len()).step_by(5) {
        let time = std::time::Instant::now();
      	// 对边界进行限制
        let last = std::cmp::min(i + 5, new_nodes.len());
      
      	// 将这5个数据转换成批量任务
        let tasks = &mut new_nodes[i..last];
        let tasks = tasks.iter_mut().map(|node| async move {
            let src = node[target_field]
                .as_str()
                .ok_or(anyhow!("no description"))?;

            let zh = match trans(src, from, to).await {
                Ok(zh) => zh,
                Err(e) => {
                    error!("{}", e);
                    return Err(e);
                }
            };
            info!("{} -> {}", src, zh);
            node[format!("{to}_{target_field}")] = zh;
            Ok::<(), anyhow::Error>(())
        });
				
      	// 使用join等待两个异步任务一起执行完成
      	// 设置时间为3秒
      	// 注意下面注释的实际上是等待任务执行完后再等5秒。
      	// join_all(tasks).await;
      	// sleep(Duration::from_secs(3);
        join!(join_all(tasks), sleep(Duration::from_secs(3)));
        warn!("one task time:{:?}", time.elapsed());
    }
    info!("all time: {:?}", time.elapsed());
    let file = std::fs::File::create(&path)?;
    serde_json::to_writer_pretty(file, &new_nodes)?;
    info!("write to {}", path.as_ref().display().to_string());
    Ok(())
}

上面代码在我的GitHub translate目录中,如果觉得有用,请给我点个赞吧,另外也恳请您为我的仓库点个Star,谢谢客官老爷。

原文链接:https://juejin.cn/post/7359084920596103206 作者:红尘散仙

(0)
上一篇 2024年4月19日 上午9:51
下一篇 2024年4月19日 上午10:00

相关推荐

发表回复

登录后才能评论