记录一次Rust中流的尝试
LI Rui

这是一次失败的尝试,仅作为经验分享。

在编写一个文件分享服务后端的时候,遇到了这样一个需求:后端需要提供一个POST API,这个API会直接接收文件内容(Binary)并以流的形式传输到S3中。传入的文件并不会在硬盘中中转,后端需要利用内存缓冲区中转数据。

Rust之前的尝试

为了实现相关的功能,我们尝试了多种语言,或多或少遇到了一些问题。首先我们尝试了Python的FastAPI,在Python中,流作为内置支持实现,能够成功在后端框架和S3库间传递,但是可能是Python的内存缓冲区存在问题,对于大型文件会存在传输速度逐渐变慢的问题。

于是我们考虑使用Presigned URL来进行传输,考虑到透传的问题,有以下两种方案:

  • 透传文件信息,必要字段进行加密,这样做感觉不太理想。
  • 使用Redis作为临时存储。

这样一来S3服务发现文件上传成功后会调用后端的Webhook,从而入库。后来我们参考了Firefox Send的代码,决定还是要以流的形式来进行传输。

Rust下的尝试

我们分别尝试了下面的库:

  • Rocket
  • Actix Web
  • rust-s3
  • Rusoto

前两个是后端框架,后两个是S3支持库。

第一次尝试

第一次尝试用的是Rocket + Rusoto的方案,在Rocket中,直接读取Body流的官方示例长这样:

1
2
3
4
5
6
use rocket::Data;

#[post("/upload", format = "plain", data = "<data>")]
fn upload(data: Data) -> Result<String, std::io::Error> {
data.stream_to_file("/tmp/upload.txt").map(|n| n.to_string())
}

Rocket提供了直接将流读到文件的方法,以及一次取指定大小数据的peek方法。当我们尝试将Rocket的流交给Rusoto处理时,发现Rusoto的流使用的是自己的ByteStream,内部数据是bytes::bytes::Bytes。很遗憾,我并没有找到实现转换的方法。

第二次尝试

第二次尝试使用的是Actix Web + Rusoto的方案,在Actix Web中,流的实现比Rocket感觉容易一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use actix_web::{get, web, Error, HttpResponse};
use futures::StreamExt;

#[get("/")]
async fn index(mut body: web::Payload) -> Result<HttpResponse, Error> {
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
let item = item?;
println!("Chunk: {:?}", &item);
bytes.extend_from_slice(&item);
}

Ok(HttpResponse::Ok().finish())
}

但是我们发现Actix的bytes类型是自己实现的actix_web::web::Bytes,仍然存在一个转换的问题。于是我们想自己写一个转换函数,参考了GitHub上的部分代码,得到了下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let (mut body_sender, body_receiver) = futures::channel::mpsc::channel(512);
actix_rt::spawn(async move {
while let Some(chunk) = body.next().await {
match chunk {
Ok(bytes) => {
let bytes = Bytes::from(bytes.to_vec());
body_sender.send(Ok(bytes)).await.unwrap();
}
Err(_) => {
body_sender
.send(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Error when receiving body",
)))
.await
.unwrap();
}
}
}
});
let byte_stream = ByteStream::new(body_receiver);

同时我们将actix_rt的版本降到了1。这段代码编译能够通过,但在执行的时候会报错:there is no reactor running, must be called from the context of a Tokio 1.x runtime.。这个方法只好放弃。

第三次尝试

第三次尝试我们使用的库和第二次一样,只不过我们将接收文件内容换成了multipart/form-data的格式。这次我们使用actix_multipart来处理流。Actix Multipart会每次从流中读入64Kb的chunk,允许分次读取。不过我们找不到一个好的办法将每一个chunk连起来以流的形式交给S3支持库进行处理。

第四次尝试

最后我们换用Rust-S3库来处理数据,很遗憾,似乎其提供的put_object_stream仅接收文件路径作为参数,我们仍然需要在本地中转增加负担。不过看这个库的源码,有私有的方法来处理流。

总结

实际上非常感谢heymind老哥提供的帮助,他利用hyper + Rusoto + Multer成功实现了API。最后考虑到开发的难度,我们选择转为Node.js来编写后端,Node.js的流由语言底层实现,能直接传给S3。

  • 本文标题:记录一次Rust中流的尝试
  • 本文作者:LI Rui
  • 创建时间:2021-03-31 10:04:10
  • 本文链接:https://www.lirui.tech/post/2021/28baa1981e76.html
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-SA 许可协议。转载请注明出处!