如何在rust中调用腾讯&百度的翻译服务
阅读本篇博客后,您将掌握关于Tokio库实现批量任务处理的实用技巧,并了解如何有效地进行并发限制。
背景
在开发ComfyUI Startup插件管理和模型管理功能的过程中,我发现对模型数据和插件数据的需求。在研究ComfyUI Manager如何实现将模型下载至特定目录时,我注意到其仓库中包含的数据文件仅提供英文版本。为了满足国际化需求,我决定着手翻译这些数据文件。然而,显然逐一手动翻译并不现实。因此,我选择利用tokio
和reqwest
库调用百度或腾讯的翻译API,以自动化的方式进行翻译工作。
先实现一个翻译请求的调用
use crate::CLIENT;
use anyhow::{anyhow, Result};
use once_cell::sync::Lazy;
use serde_json::Value;
static APP_ID: Lazy<String> = Lazy::new(|| std::env::var("APP_ID").unwrap());
static APP_KEY: Lazy<String> = Lazy::new(|| std::env::var("APP_KEY").unwrap());
static URL: &str = "http://api.fanyi.baidu.com/api/trans/vip/translate";
// 对请求进行封装,只获取想要的数据
pub async fn trans(src: &str, from: &str, to: &str) -> Result<Value> {
let res: Value = CLIENT
.get(URL)
.query(&[
("q", src),
("appid", &APP_ID),
("from", from),
("to", to),
("salt", "10086"),
("sign", &sign(src)),
])
.send()
.await?
.json()
.await?;
let trans_result = res["trans_result"]
.as_array()
.ok_or(anyhow!("{res}"))?
.first()
.unwrap()["dst"]
.clone();
Ok(trans_result)
}
// 签名方法,参考百度翻译API文档
fn sign(q: &str) -> String {
let str = format!("{}{}10086{}", &*APP_ID, q, &*APP_KEY);
let digest = md5::compute(str);
format!("{:x}", digest)
}
然后再实现批量调用
起初,我设想采用for循环逐个调用的方式实现功能,但这种方式未能充分利用Tokio框架提供的高并发特性。这一思考受到了TypeScript编程经验的影响,在TypeScript中,Promise会在定义后立即执行,而Rust语言中的Future则不同,它们仅在遇到await表达式时才会真正执行其异步操作。因此,为了适应Rust及Tokio的异步编程模型,我们需要采取更恰当的方式来调度并发任务以发挥其优势。
pub async fn run<P: AsRef<Path>>(
url: &str,
target_field: &str,
path: P,
from: &str,
to: &str,
field: Option<&str>,
) -> Result<()> {
let custom_nodes: Value = CLIENT.get(url).send().await?.json().await?;
let nodes = if field.is_some() {
&custom_nodes[field.unwrap()]
} else {
&custom_nodes
};
// 前面都不重要,只需要知道nodes就是要翻译的数据数组
let nodes = nodes.as_array().ok_or(anyhow!("{nodes:#?}"))?;
let mut new_nodes = nodes.clone();
let time = std::time::Instant::now();
for node in new_nodes.iter_mut() {
let src = node[target_field]
.as_str()
.ok_or(anyhow!("no description"))?;
let zh = match trans(src, from, to).await {
Ok(zh) => zh,
Err(e) => {
error!("{}", e);
return Err(e);
}
};
info!("{} -> {}", src, zh);
node[format!("{to}_{target_field}")] = zh;
warn!("one task time:{:?}", time.elapsed());
}
// 将数据写入文件中
info!("all time: {:?}", time.elapsed());
let file = std::fs::File::create(&path)?;
serde_json::to_writer_pretty(file, &new_nodes)?;
info!("write to {}", path.as_ref().display().to_string());
Ok(())
}
第二版本,使用join_all来批量执行
pub async fn run2<P: AsRef<Path>>(
url: &str,
target_field: &str,
path: P,
from: &str,
to: &str,
field: Option<&str>,
) -> Result<()> {
let custom_nodes: Value = CLIENT.get(url).send().await?.json().await?;
let nodes = if field.is_some() {
&custom_nodes[field.unwrap()]
} else {
&custom_nodes
};
// 前面都不重要,只需要知道nodes就是要翻译的数据数组
let nodes = nodes.as_array().ok_or(anyhow!("{nodes:#?}"))?;
let mut new_nodes = nodes.clone();
let time = std::time::Instant::now();
// 这里使用map转换成异步任务,但还没执行
let tasks = new_nodes.iter_mut().map(|node| async {
let src = node[target_field]
.as_str()
.ok_or(anyhow!("no description"))?;
let zh = match trans(src, from, to).await {
Ok(zh) => zh,
Err(e) => {
error!("{}", e);
return Err(e);
}
};
info!("{} -> {}", src, zh);
node[format!("{to}_{target_field}")] = zh;
warn!("one task time:{:?}", time.elapsed());
Ok(())
});
// 使用futures::future::join_all来批量执行这些任务
join_all(tasks).await;
// 将数据写入文件中
info!("all time: {:?}", time.elapsed());
let file = std::fs::File::create(&path)?;
serde_json::to_writer_pretty(file, &new_nodes)?;
info!("write to {}", path.as_ref().display().to_string());
Ok(())
}
这里将数据转换成批量异步任务,再使用join_all来等待他们所有执行完成。
其实这里有更好的方法就是使用futures::stream::FuturesUnordered
,这里为了简便就使用了join_all。
并发限制
在实践高并发场景时,我发现百度API存在一定的调用频率限制,即每秒允许的请求次数有限制。因此,为了避免超出限制,我不得不对并发任务进行控制。鉴于这种情况下的时间敏感性,我没有选择使用tokio::sync::Semaphore
作为并发限制工具,而是采用了批量分时执行策略:将任务划分为多个批次,每一批次之间的执行间隔为3秒,确保每批任务在前一批次执行满3秒后才开始执行,从而符合接口调用频率要求。
pub async fn run<P: AsRef<Path>>(
url: &str,
target_field: &str,
path: P,
from: &str,
to: &str,
field: Option<&str>,
) -> Result<()> {
let custom_nodes: Value = CLIENT.get(url).send().await?.json().await?;
let nodes = if field.is_some() {
&custom_nodes[field.unwrap()]
} else {
&custom_nodes
};
let nodes = nodes.as_array().ok_or(anyhow!("{nodes:#?}"))?;
let mut new_nodes = nodes.clone();
let time = std::time::Instant::now();
// 按5个进行分批
for i in (0..new_nodes.len()).step_by(5) {
let time = std::time::Instant::now();
// 对边界进行限制
let last = std::cmp::min(i + 5, new_nodes.len());
// 将这5个数据转换成批量任务
let tasks = &mut new_nodes[i..last];
let tasks = tasks.iter_mut().map(|node| async move {
let src = node[target_field]
.as_str()
.ok_or(anyhow!("no description"))?;
let zh = match trans(src, from, to).await {
Ok(zh) => zh,
Err(e) => {
error!("{}", e);
return Err(e);
}
};
info!("{} -> {}", src, zh);
node[format!("{to}_{target_field}")] = zh;
Ok::<(), anyhow::Error>(())
});
// 使用join等待两个异步任务一起执行完成
// 设置时间为3秒
// 注意下面注释的实际上是等待任务执行完后再等5秒。
// join_all(tasks).await;
// sleep(Duration::from_secs(3);
join!(join_all(tasks), sleep(Duration::from_secs(3)));
warn!("one task time:{:?}", time.elapsed());
}
info!("all time: {:?}", time.elapsed());
let file = std::fs::File::create(&path)?;
serde_json::to_writer_pretty(file, &new_nodes)?;
info!("write to {}", path.as_ref().display().to_string());
Ok(())
}
上面代码在我的GitHub translate
目录中,如果觉得有用,请给我点个赞吧,另外也恳请您为我的仓库点个Star,谢谢客官老爷。
原文链接:https://juejin.cn/post/7359084920596103206 作者:红尘散仙