线程(thread)的问题:
- 每个线程都有固定大小的栈内存占用,成千上万个线程会消耗大量的内存;
- 内核调度线程运行的上下文切换开销大;
- 线程在执行 read/write 系统调用时,尤其是网络通信时,会有大量的时间被 block 等待,此时该线程不能再执行其它事情,效率低;
异步(async)通过创建大量异步 task,然后使用一个 thread pool 来执行它们,在某个 task 被阻塞时 async executor 自动调 度其它 task 到 thread 上运行,从而执行效率和并发更高。task 相比 thread 更轻量化,没有了大量 thread stack 开销,切换执行 速度也更快,所以一个程序内部可以创建大量的异步 task。
参考:https://rust-lang.github.io/async-book/part-guide/concurrency.html
Future #
async 的核心是 Future trait, 定义如下:
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
它的 poll() 方法的 self 类型是 Pin<&mut Self> ,表示一旦第一次 poll 后,Self 对象的内存地址必须是固定的,
不能被转移, 这是为了确保 Future 对象内部保存的栈变量地址指针继续有效(Pin 的原理参考后文)。
poll() 方法的第二个参数 Context 封装了 Waker ,当 poll Pending 时,Future 的实现可以保存该
waker(一般是在一个辅助线程中), 后续当条件满足时调用 waker 来唤醒 async executor 来重新 poll 自己。
Future 对象只有被 poll 时才开始执行, 一般使用 .await 来进行 poll,它返回 Ready 后的值(await 的原理参考后文)。
创建 Future 对象 #
有多种方式来创建实现 Future trait 的对象:
-
为自定义类型实现
Future trait; -
使用
async block语法:
-
async {xxx}返回一个impl Future<Output=XX>的匿名类型对象; -
async move {xxx}返回一个impl Future<Output=XX> + 'static的匿名类型对象;
- 使用
async fn语法:
async fn(xx) {xxx}返回一个fn(xx) -> impl Future<Output=XX>的匿名函数类型对象;
- 使用返回
async block的 closure 语法:
-
|xx| async {xx}返回一个impl FnXX(xx) -> impl Future<Output=XX>的匿名闭包类型对象; -
|xx| async move {xx}返回一个impl FnXX(xx) -> impl Future<Output=XX> + 'static的匿名闭包类型对象;
- 使用
async closure语法(Rust 1.75 开始原生支持):
async |xx| {}或async move |xx| {}返回impl FnXX(xx) -> impl Future<Output=XX>的匿名闭包类型对象;
- 返回
impl Future类型对象的函数;
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>> {
async move {
Ok::<Result(usize)>(2)
}
}
实现 Future trait #
// https://mazhen.tech/p/pinboxdyn-future%E8%A7%A3%E6%9E%90/
pub struct HttpRequest {
url: String,
}
pub struct HttpResponse {
code: u32,
}
pub struct ResponseFuture {
request: HttpRequest,
}
impl Future for ResponseFuture {
type Output = Result<HttpResponse, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
println!("process url:{}", &self.as_ref().get_ref().request.url);
Poll::Ready(Ok(HttpResponse { code: 200 }))
}
}
// 在实现 Service 时,关联类型 type Future 设置为我们手工实现的 Future:
struct RequestHandler;
impl Service<HttpRequest> for RequestHandler {
type Response = HttpResponse;
type Error = Error;
type Future = ResponseFuture;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: HttpRequest) -> Self::Future {
ResponseFuture { request: req }
}
}
// 然后就可以像正常的 Future 一样,使用 Service:
#[tokio::main]
async fn main() {
let mut service = RequestHandler {};
match service.call(HttpRequest { url: "/user/mazhen".to_owned(),}) .await // .await 对 call() 返回的 future 进行 poll
{
Ok(r) => println!("Response code: {}", r.code),
Err(e) => println!("process failed. {:?}", e),
}
}
async block #
async block 类似于闭包,提供了一个异步上下文,内部可以使用 .await, 返回一个 impl Future 的匿名类型对象
(编译时生成的匿名对象)。
注:和同步闭包 move || {} 类似,async move 不保证生成的 Future 对象一定是 'static 的。
use std::io;
use std::future::Future;
fn cheapo_request<'a>(host: &'a str, port: u16, path: &'a str)
-> impl Future<Output = io::Result<String>> + 'a
{
// async move 也不能保证返回的 Future 对象一定是 'static 的, 特别是这里捕获的 host、path 本身是借用,而且不是
// 'static, 所以返回的 Future 也是具有 'a 而非 'static
async move {
//... function body ...
}
}
// 解决办法:捕获对象而非借用类型
fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static
{
let host = host.to_string();
let path = path.to_string();
async move {
// 捕获了 host、path 的所有权
//... use &*host, port, and path ...
}
}
// 下面是 OK 的:
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
// 将 host、port、path 所有权转移到 async block 中
handles.push(task::spawn(async move {
cheapo_request(&host, port, &path).await
}));
}
在 async block 中使用 ? 来传播错误或 return 值都是 async block 返回,而不是它所在的函数返回;
- 作为对比,在函数的普通 block、match branch 中使用 ? 时,是所在的函数返回;
- async block 中不能使用 break/continue,如果要提前返回需要使用 return。
// https://doc.rust-lang.org/std/keyword.async.html#control-flow
async fn example() -> i32 {
let x = async {
return 5; // async block 提前返回,而不是所在的函数返回,返回返回值
};
x.await
}
//https://rust-lang.github.io/async-book/part-guide/more-async-await.html
async {
let x = foo()?; // ? 引起所在的 async block 返回
consume(x);
// Ok(()) // 错误,需要指定 Ok 所属的 Result 完整类型。
Ok::<(), std::io::Error>(()) // 正确,指定 Ok 所属的 Result 类型
// 或者
// std::result::Result::<(), std::io::Error>::Ok(());
}.await?
async block 可以和闭包一样捕获环境中的对象(借用或 move),可以指定 async move 来获得对象的所有权,这时 async block 返回的 Future 可能具有 'static lifetime。
由于 async block 返回实现 impl Future<Output=XX> 匿名类型对象,所以:
- 不能对返回的对象施加额外的限界,如
+Send+Unpin,但编译器会自动判断它是否实现Send/Unpin。 - 也不能指定 Future 中的 Output 关联类型,当 Output 为 Result 时可能出错。
- 解决办法:为 Ok 指定它所属的 Enum Result 类型。
let input = async_std::io::stdin();
// future 是 Future<Output=Result>
let future = async {
let mut line = String::new();
// ? 是 async block 返回,结果类型 std::io::Result<usize>
input.read_line(&mut line).await?;
println!("Read line: {}", line);
// Ok(()) // 错误
// 正确,指定 Ok 所属的 Result 类型
Ok::<(), std::io::Error>(())
// 或者
// std::result::Result::<(), std::io::Error>::Ok(());
};
async {} 和 async move {} 都是表达式,故可以作为闭包函数的返回值。
use async_std::net;
use async_std::task;
// serve_one 是一个 impl Future<Output=()> 的匿名类型对象(静态派发)
let serve_one = async {
// ? 是 async block 返回(而非 block 所在的函数),类型是 Result
let listener = net::TcpListener::bind("localhost:8087").await?;
let (mut socket, _addr) = listener.accept().await?;
};
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(
task::spawn_local(
// 使用 async move 来获取 host、path 的所有权。
async move {
cheapo_request(&host, port, &path).await
}
));
}
//...
}
loop {
async move {
break; // error[E0267]: `break` inside of an `async` block
}
}
async closure #
async closure 有两种类型:
- Rust 1.75 之前:只能使用返回
async {}的同步闭包来实现,如|| async {}和|| async move {}; - Rust 1.75 及之后:原生支持
async closure,如async || {}和async move || {};
旧的 async block 实现 #
使用 || async move {} 来定义异步闭包,等效于生成 impl FnXX() -> impl Future<Output=YY> 类型的匿名类型对象:
let app = Router::new()
.route(
"/",
any_service(service_fn(
// 等效于生成一个 impl Fn() -> impl Future<Output=Result<Response, Infallible> 匿名类型对象
|_: Request| async {
let res = Response::new(Body::from("Hi from `GET /`"));
Ok::<_, Infallible>(res) // service_fn 的闭包必须返回 Result
}))
)
但是这种通过同步闭包返回异步对象的实现,有如下问题:
- 输出或捕获的的对象包含借用的情况,带来的问题和解法参考:“9-rust-lang-function-closure.md”
- 不能以
&mut T的方式捕获上下文对象 (因为 FnMut 闭包有捕获的 &mut T 不能转移出闭包的语义限制,而闭包返回的 Future 会转 移出持有的 &mut T,所以报错)
// 错误:不支持 &mut T 捕获上下文对象
let mut vec: Vec<String> = vec![];
// closure 类型是:impl FnMut() -> impl Future<Output=()>
let closure = || async {
// error: captured variable cannot escape `FnMut` closure body
vec.push(ready(String::from("")).await);
};
/*
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:69:22
|
68 | let mut vec: Vec<String> = vec![];
| ------- variable defined here
69 | let closure = || async {
| ____________________-_^
| | |
| | inferred to be a `FnMut` closure
70 | | vec.push(ready(String::from("")).await);
| | --- variable captured here
71 | | };
| |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
*/
这个错误的核心是:|| async { ... } 不是“异步闭包”,而是“普通闭包返回一个 Future”。
也就是说它大致等价于:
let closure = || {
async {
vec.push(ready(String::from("")).await);
}
};
关键问题在这里:vec.push(...) 需要 &mut vec,而这个 &mut vec 会被保存进 async block 生成的 Future 里。
因为 async block 不是马上执行完的,它会生成一个状态机。只要里面有 .await,这个 Future 就可能被暂停、移动、稍后再继续执行。所以 Future 必须把它需要的变量保存起来,包括对 vec 的可变借用。
于是编译器看到的是:
let closure = || -> Future {
// 返回的 Future 里面持有 &mut vec
}
但这个 closure 被推断为 FnMut。FnMut 的语义是:闭包执行期间可以临时可变访问捕获变量,但不能把对捕获变量的引用“带出”闭包调用本身。
而你这里返回的 Future 会在闭包调用结束后继续存在:
let fut = closure();
// closure 调用已经结束了
// 但 fut 里面还想持有 &mut vec
这就违反了 FnMut 的约束,所以报:captured variable cannot escape FnMut closure body
换句话说,问题不是 Vec,也不是 push,而是:
FnMut closure 返回的 Future 不能持有对闭包捕获变量的 &mut 借用
一个更直观的等价错误模型是:
let mut vec = vec![];
let closure = || {
// 试图返回一个持有 &mut vec 的对象
async {
vec.push(String::new());
}
};
这个 Future 离开了 closure body,但它里面还借着 vec。
对于异步闭包不支持捕获 &mut T,常见的解决办法是使用 move 或 Rc/Arc 来转移对象,或者 Rust 1.75+ 的 async closure 来解决。 :
- 如果只调用一次,可以直接用 async move 把所有权移进去:
let mut vec: Vec<String> = vec![];
let fut = async move {
vec.push(ready(String::from("")).await);
};
-
如果需要多次调用,并共享可变状态,通常用
Arc<Mutex<_>> 或 Rc<RefCell<_>>,取决于是否跨线程:use std::sync::{Arc, Mutex}; use std::future::ready; let vec = Arc::new(Mutex::new(Vec::<String>::new())); let closure = { let vec = Arc::clone(&vec); move || { let vec = Arc::clone(&vec); async move { let value = ready(String::from("")).await; vec.lock().unwrap().push(value); } } }; -
如果是单线程 async 环境,也可以用:
use std::cell::RefCell;
use std::future::ready;
use std::rc::Rc;
let vec = Rc::new(RefCell::new(Vec::<String>::new()));
let closure = {
let vec = Rc::clone(&vec);
move || {
let vec = Rc::clone(&vec);
async move {
let value = ready(String::from("")).await;
vec.borrow_mut().push(value);
}
}
};
```
核心判断规则是:
1. async 里跨 .await 使用的东西,会变成 Future 状态的一部分。
2. 如果它是借用,借用就会被 Future 保存。
3. 如果这个 Future 是从 FnMut closure 返回的,那么对 closure 捕获变量的借用就“逃逸”了,所以不允许。
## 新的 async closure 语法
Rust 1.75+ 开始正式支持 `async closure`: 使用 `async || {}` 语法(而非 `|| async {}`)。
新的 `async closure` 带来了两个新的语义:
1. 支持 `&mut T` 捕获上下文对象(它修改了异步闭包场景下 `&mut T` 的语义);
2. 支持 HRTB(编译器自动添加,但不能显式添加);
这两个语义都是伴随着 `async closure` 新的 `AsyncFn/AsyncFnMut/AsyncFnOnce trait` 来实现的。
```rust
// OK:move 捕获
let string: String = "Hello, world".into();
let closure = async move || {
// 闭包获得了 string 的所有权
ready(&string).await;
};
普通 FnMut 闭包只允许在闭包调用期间临时借用捕获变量,它不能返回一个继续持有 &mut vec 的 Future,所以报:captured
variable cannot escape FnMut closure body
|| -> impl Future<Output = ()> {
async { ... }
}
// 错误:通过返回 async block 的异步闭包不支持 &mut T 捕获上下文对象(&T 捕获 OK)
let vec: Vec<String> = vec![];
let closure = || async {
vec.push(ready(String::from("")).await);
};
这是因为普通闭包返回 async block 时,大概是:
let mut vec = Vec::new();
let closure = || async {
vec.push(String::new());
};
它的形状是:FnMut::call_mut(&mut self) -> Fut, 但 FnMut 的 Output 是固定类型:
trait FnMut<Args> {
// 虽然 Rust 支持关联类型带 lifetime 标记,但是这里的 Output 没有带 lifetime 标记,所以它是编译器推导的固定
// lifetime 类型。
type Output;
fn call_mut(&mut self, args: Args) -> Self::Output;
}
这里没有表达:返回的 Output 可以借用这次 call_mut 的 &mut self。所以如果返回的 Future 里保存了 &mut vec,就等于把对 closure 捕获状态的可变借用从 call_mut 里带出去了。普通的 FnMut 不允许这个,编译器报: captured
variable cannot escape FnMut closure body
同理,对于 Fn 闭包,输出也是固定的、无 lifetime 的 Output 类型,所以如果返回值包含借用,也会失败:
trait Fn<Args> {
type Output;
fn call(&self, args: Args) -> Self::Output;
}
// 错误:Fn 返回值不能包含借用
let s = String::from("hello");
let f = || -> &str {
&s
};
而 async closure 实现的不是普通 FnMut -> Future 模型,而是 AsyncFn* 模型。它的关键能力是返回
Future 的类型带有调用生命周期。
// OK: async cloure 支持 &mut T 捕获上下文对象
let mut vec: Vec<String> = vec![];
// 注意返回的 closure 也必须是可变的。
let mut closure = async || {
// async closure 使用 &'closure mut Vec<String> 方式捕获了上下文对象 vec,直到这个闭包被 drop(如被 .await)。
vec.push(ready(String::from("")).await);
};
closure().await;
AsyncFnMut trait 定义:
pub trait AsyncFnMut<Args>: AsyncFnOnce<Args> where Args: Tuple,{
// 关联类型也可以带 lifetime 标记
type CallRefFuture<'a>: Future<Output = Self::Output> where Self: 'a;
// Required method
extern "rust-call" fn async_call_mut(
&mut self,
args: Args,
) -> Self::CallRefFuture<'_>;
}
重点是:type CallRefFuture<'a>, 这个 Future 类型带着 ‘a,而 ‘a 就是这次调用时 &'a mut self 的生命周期。
所以 async closure 可以表达:
- 调用 closure 时,返回一个 Future;
- 这个 Future 可以借用 closure 的捕获状态; <=== 也就是返回的 Future 可以带有声明周期(来源于输入、或捕获的对象)
- 借用持续到 Future 完成或被 drop。
通过自动实现 AsyncFnMut/AsyncFn/AsyncFnOnce trait, 编译器给 async closure 赋予了新的调用语义:允许
返回的 Future 从 closure 本体里借用捕获变量。也就是说,这个 Future 可 以 持有类似: &'closure mut Vec<String> 这样的借用。
所以 async || { vec.push(...await...) } 可以成立,因为 async closure 的新 trait 体系表达了
“返回的 Future 可以借用 closure 捕获的状态”。
一句话:
- 普通 FnMut 的返回值不能借用 &mut self;
- AsyncFnMut 的返回 Future 可以借用 &mut self;
- 所以 async closure 能捕获并跨 .await 使用 &mut T。
async closure 也支持 HTRB
#
由于 async closure 实现的 AsyncFn* trait 的 Output 包含 lifetime 泛型参数,所以 async closure 返回的匿名对象支持输入参数包含引用的 HRTB。
// OK, 异步闭包继续支持 move
let s = String::from("hello, world");
let c = async move || { do_something(&s).await };
let arg = async |x: i32| { async_add(x, 1).await };
let ret = async || -> Vec<Id> { async_iterator.collect().await };
// 错误:同步闭包不支持 HRTB
let mut closure = |a: &i32, b: &str| {Combined { num: a, text: b }}
// OK: 异步闭包支持 HRTB(自动添加)
let hr = async |x: &str| { do_something(x).await };
// 编译器为 hr 生成的匿名类型对象和实现的 AsyncFu trait 如下:
struct ClosureEnv;
impl<'a> AsyncFn<(&'a str,)> for ClosureEnv {
// 由于返回的对象包含 'a lifetime 标记,所以可以满足输入参数 x 的任意 lifetime。
type CallRefFuture<'a>: Future<Output = Self::Output> where Self: 'a;
extern "rust-call" fn async_call(
&self,
args: (&'a str,),
) -> Self::CallRefFuture<'_>;
}
由于 async closure 的定义和同步闭包 closure 的定义一样,都不支持 liftime 声明,所以在输入、输出
同时包含借用时,因缺少显式的 lifetime 关系约束,也会编译失败。
解决办法:将异步闭包转换为 fn 函数实现,因为 fn 函数支持显式的 lifetime 标记和约束定义。
async fn #
async fn 分为两种类型: 1. 普通异步函数; 2. 位于 trait 中的异步函数,它们在实现上有一些差异。
Rust 的 async fn/block/closure 提供了一种使用同步代码风格来写异步代码的机制,它们都是通过编译器创建的包含状态机的匿名
(枚举)类型来实现的。
终态机的状态和代码块中的 .await 有关,每个 .await 位置对应一个状态,因为这些位置是 async 运行时(executor)的 yield 点,生 成一个新的 Future 对象,可能会调度到不同线程运行。
// https://www.eventhelix.com/rust/rust-to-assembly-async-await/
// 普通异步函数
async fn goto(unit: UnitRef, pos: i32) {
UnitGotoFuture {
unit,
target_pos: pos,
}
.await;
}
goto(unit.clone(), 10).await;
async fn/block/closure 的编译器实现 #
对于 async fn/block/closure, 编译器将它糖化为返回 impl Future + '_ 的普通函 数('_ 表示编译器会为其捕
获函数的范型参数和 lifetime), 并且在函数内部创建一个实现该 trait 的隐式类型和对象,如果函数参数包含借用,则生成的 Future 对象不一
定满足 'static 要求:
- 返回的
impl Future是编译时静态派发&生成匿名类型读写,后续不会再变化。
async fn foo(x: &i32) -> i32 {
*x
}
// 等效为:一个返回类型为 impl Future 匿名具体类型的普通函数。
fn foo(x: Rc<i32>) -> impl std::future::Future<Output=i32> {
// 编译器生成的一个具体的、匿名类型,其中 state 是跨 .await 的状态(对于复杂场景,如有多个 await 时,一般是 enum 类型,每
// 个 enum variant 都包含一个实现 Future 的匿名类型)
struct FooFuture { x: Rc<i32>, state: bool };
// 为具体类型实现 Future trait
impl Future for FooFuture { /* poll 实现 */ }
FooFuture { x, state: false }
}
// 另一个例子:如果函数的参数包含借用类型,则返回的实现 impl Future 的隐式类型对象也具有 lifetime。
async fn foo(x: &str) -> usize {
let len = x.len();
len
}
// 等价为:这里 foo 返回一个匿名 Future 类型,生命周期 'a 来自参数 &'a str。
fn foo<'a>(x: &'a str) -> impl std::future::Future<Output = usize> + 'a {
struct FooFuture<'b> {x: &'b str, state: bool };
impl<'b> Future for FooFuture<'b> {/* pool 实现*/ }
FooFuture{x, state: false} // FooFuture 的 lifetime 和 x 一致,都是 'a
}
// 后续调用 foo 返回的 fut 类型为:impl Future<Output = i32> + '_,推导的生命周期和 v 一致。
let fut = foo(&mut v);
// 错误:fut 不是 'static 类型,不满足 spawn() 参数的限界要求
tokio::spawn(fut).await?;
虽然它们都没有声明实现 Send,但编译器在编译时可以根据跨 .await 的对象是否都实现 Send 来判断整体是否实现 Send:
-
如果局部变量(输入参数、内部定义变量、捕获的对象)都是 Send(如 i32、Arc、String 等),那么编译器生成的匿名 Future 类型自动实现 Send。
-
如果捕获了非 Send 类型(如 Rc、RefCell),Future 就不会实现 Send。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move { // 这里的 async block 返回的 impl Future 对象由编译器自动推导实现了 Send
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => {
if socket.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(_) => {
return;
}
}
}
});
}
}
一个带 .await 和 &mut T 输入的 async fn 编译器实现的复杂例子:
- 每个 .await 都会让编译器拆分出一个状态,对应一个状态枚举值。
- 局部变量(包括函数输入、内部创建的变量)在跨 .await 时会被保存到状态机结构体里。
- 由于状态机里可能持有栈变量的地址,如
&mut i32,必须通过 Pin 保证 .await 的 Future 对象不被移动导致地址失效。&mut T并不能保证对象被转移,例如使用std::mem::swap()等函数可以&mut T来移动对象位置。- Pin<Box
> 或 Pin<&mut T> 类型可以确保不会将 Box 或 &mut T 暴露出来,从而不可能被移动。 - 但是 Pin 的设计保留了后门,它仅保证 T:!Unpin 时将对象固定住,而 T:Unpin 时还是可以返回 &mut T 来方便修改和使用。
- 典型的是编译器为 async fn/block/closure 自动实现的 impl Future 匿名类型 没有实现 Unpin, 所以会被 Pin 住。
- Rust 编译器自动生成这个状态机和 poll 方法,用户只需写 async/await。
- poll 方法是一个大 match,根据不同状态推进计算。
- Pin 和 Context 保证 Future 安全地暂停和恢复。
async fn foo(x: &mut i32) -> i32 {
*x += 1;
tokio::task::yield_now().await; // 第一个 await
*x *= 2;
tokio::task::yield_now().await; // 第二个 await
*x + 3
}
// 去语法糖,返回 impl Future + '_
fn foo<'a>(x: &'a mut i32) -> impl std::future::Future<Output = i32> + 'a {
async move {
*x += 1;
tokio::task::yield_now().await; // 第一个 await
*x *= 2;
tokio::task::yield_now().await; // 第二个 await
*x + 3
}
}
// 进一步去语法糖,编译器生成一个实现 Future 的匿名类型 FooFuture,内部的状态机保存 .await 点的 Future 状态
fn foo<'a>(x: &'a mut i32) -> impl std::future::Future<Output = i32> + 'a {
FooFuture{x, state: State::Start{x}}
}
// 编译器为 async fn() { ... } 生成一个匿名类型,包含各 .await 状态机对象。
// 如果 async {} 通过 &T/&mut T 借用捕获了上下文对象,则生成的匿名类型是有 lifetime 标记。
struct FooFuture<'a> {
state: State<'a>,
}
// 编译器会为每个 .await 会生成一个状态枚举的分支。
enum State<'a> {
// 枚举值的 field 包含跨 .await 的对象引用,所以在这些 .await 点间进行状态切换前,需要确保栈变量 x 的地址不变。
// 所以 poll() 的 self 参数是 Pin<&mut Self> 类型,Pin 可以确保 Self 不被移动,进而确保这些栈变量地址有效。
Start { x: &'a mut i32 },
Awaiting1 { x: &'a mut i32, fut: tokio::task::YieldNow }, // 每个状态都持有一个 future 对象
Awaiting2 { x: &'a mut i32, fut: tokio::task::YieldNow },
Done,
}
// Start:刚开始执行,捕获 &mut x。
// Awaiting1:第一个 await 处挂起,持有内部 future。
// Awaiting2:第二个 await 处挂起,持有内部 future。
// Done:完成状态。
// 为匿名类型实现 Future
use std::pin::Pin;
use std::task::{Context, Poll};
impl<'a> std::future::Future for FooFuture<'a> {
type Output = i32;
fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output> {
unsafe {
let me = self.get_unchecked_mut();
loop {
match &mut me.state {
// 进入第一个 await
State::Start { x } => {
*x += 1;
let fut = tokio::task::yield_now();
me.state = State::Awaiting1 { x: *x, fut };
return Poll::Pending;
}
State::Awaiting1 { x, fut } => {
// poll 内部 Future
match Pin::new(fut).poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
*x *= 2;
let fut = tokio::task::yield_now();
me.state = State::Awaiting2 { x: *x, fut };
}
}
}
State::Awaiting2 { x, fut } => {
match Pin::new(fut).poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
let result = *x + 3;
me.state = State::Done;
return Poll::Ready(result);
}
}
}
State::Done => panic!("polled after completion"),
}
}
}
}
}
// 执行流程示意
// Start:执行 *x += 1 → 创建第一个 future → 切换 Awaiting1 → Poll::Pending
// Awaiting1:poll 第一个 future,完成后执行 *x *= 2 → 创建第二个 future → 切换 Awaiting2 → Poll::Pending
// Awaiting2:poll 第二个 future,完成后计算 *x + 3 → 切换 Done → 返回结果
// Done:Future 已完成,再 poll 会 panic
总结:
async fn语法糖 = 普通函数 +impl Future返回值。async {}= 被编译器转换成匿名 struct + 状态机;async move会移动捕获变量,使 Future ‘static- .await = 状态机的“中断点”,会把局部变量保存到状态机里,必须通过 Pin 来确保 Self 内存地址不变,这样栈变量的地址才一直有效。
Pin + Future::poll= 保证状态机在内存中不会移动,局部变量借用安全。- poll() 推进状态机,编译器为状态机自动实现 Future poll 方法;
Future 的 poll 接口要求 Pin<&mut Self> 类型,原因是:异步块内部的局部变量可能包含自引用,如 &self.x 指向同一个
Future 内部的变量,而 Pin 保证 Future 内部状态的地址不会移动,从而保证自引用安全。
&mut Self只能保证仅能通过 self 变量修改自身,但是通过 std:mem::replace() 之类函数还是能移动的。
参考:
trait 中的 async fn 编译器实现 #
async fn 的第二种类型是 trait 中的 async fn ,它返回的 impl Future 匿名类型对象实现方式和普通 async fn 不同,它是通过一个 trait 匿名关联类型来实现 ,而且编译器为每个实现该 Trait 的类型都生成不同的关联类型,所以编译器不能判断它们是否
都实现 Send,进而导致默认是未实现 Send。所以,Rust 不建议在将 async fn 做为 trait 的 public 接口对外暴露。
trait MyTrait {
async fn do_work(&self) -> i32;
}
// 编译器在 trait 内部生成一个匿名关联类型:
trait MyTrait {
type DoWorkFuture<'a>: Future<Output = i32> + 'a where Self: 'a;
fn do_work(&self) -> Self::DoWorkFuture<'_>;
}
这会导致后续调用 trait 中定义的 async fn 函数时会因未实现 Send trait 而报错:
warning: use of `async fn` in public traits is discouraged as auto trait bounds cannot be specified
--> src/lib.rs:7:5
|
7 | async fn fetch(&self, url: Url) -> HtmlBody;
| ^^^^^
|
help: you can desugar to a normal `fn` that returns `impl Future` and add any desired bounds such as `Send`, but these cannot be relaxed without a breaking API change
|
7 - async fn fetch(&self, url: Url) -> HtmlBody;
7 + fn fetch(&self, url: Url) -> impl std::future::Future<Output = HtmlBody> + Send;
|
解决方案:
-
将
async fn去糖,转换为返回impl Trait + Send或Pin<Box<dyn Future<Output=XX>+Send+'_>>的普通函数:// 示例 1: // async fn fetch(&self, url: Url) -> HtmlBody; // // impl Trait 无论是在函数输入还是输出位置都支持限界 fn fetch(&self, url: Url) -> impl std::future::Future<Output = HtmlBody> + Send; // 示例 2:使用 Box::pin + dyn Future + Send 是 trait async fn 的常见做法。 use std::future::Future; trait MyTrait { // 注意返回的对象 lifetime 和 self 一致。 fn do_work(&self) -> Pin<Box<dyn Future<Output = i32> + Send + '_>>; } // 然后在实现里用 Box::pin(async move { ... }) 返回: impl MyTrait for MyStruct { fn do_work(&self) -> Pin<Box<dyn Future<Output = i32> + Send + '_>> { Box::pin(async move { // 捕获的变量必须实现 Send 42 }) } } -
使用
async-trait宏:它在 trait 层面隐藏这些复杂性,自动把async fn返回值去糖为Pin<Box<dyn Future> + Send + 'lifetime>,从而满足多线程环境的限界要求。#[async_trait::async_trait] trait Service { async fn call(&self, req: String) -> String; } //大致展开成这个: trait Service { fn call<'async_trait>( &'async_trait self, req: String, ) -> Pin<Box<dyn Future<Output = String> + Send + 'async_trait>>; } // 示例: use async_trait::async_trait; // 在定义 trait 和实现 trait 时都使用 #[async_trait] 标注 #[async_trait] trait MyTrait { async fn do_work(&self) -> i32; } struct MyStruct; #[async_trait] impl MyTrait for MyStruct { async fn do_work(&self) -> i32 { 42 } } -
或者使用 Rust 官方提供的
trait-variant crate(cargo add trait-variant)来为包含 async fn 的 trait 生成多个版本:
#[trait_variant::make(HttpService: Send)]
pub trait LocalHttpService {
async fn fetch(&self, url: Url) -> HtmlBody;
}
// This creates two versions of your trait: LocalHttpService for single-threaded
// executors and HttpService for multithreaded work-stealing executors
// 生成的 HttpService 版本:
pub trait HttpService: Send {
fn fetch(
&self,
url: Url,
) -> impl Future<Output = HtmlBody> + Send;
}
但是 trait-variant crate 并不能完全取代 async-trait crate,因为前者为 trait 中 async fn 返回
的还是 impl Trait 对象,还是不满足 object-safe/dyn-compatibility 的要求,不支持动态派发。
由于有上面的问题, async fn 一般不建议出现在 trait 的 Public APIs 中,而应该使用泛型类型、或关联类型来解决上面的问题。
// 示例:引入泛型类型 Fut,并对 Fut 进行限界:
impl<F, Fut, Res, S> Handler<((),), S> for F
where
// 闭包整体要实现 FnOnce() -> Fut、Clone、Send、Sync 和 'static
F: FnOnce() -> Fut + Clone + Send + Sync + 'static,
// 闭包的返回值需要实现 Future<Output = Res> 和 Send
Fut: Future<Output = Res> + Send,
Res: IntoResponse,
参考:
trait 的 async fn 的局限性:dyn compatible 和 Send #
有两个局限性:
- trait 定义一旦包含
async fn, 就不再object safe(dyn compatible),也就不能再为它创建trait object: async fn的参数限界如果是函数类型,则不支持 HRTB,即输入和输出参数不能有借用;
pub trait Trait {
async fn f(&self);
}
// 编译报错:error[E0038]: the trait `main::Trait` cannot be made into an object
pub fn make() -> Box<dyn Trait> {
unimplemented!()
}
另外,trait 中 async fn 返回值是 impl Future 匿名类型,而不能指定限界,如 impl Future + Send + 'static,不满足多线程场景的要求。
上面两个问题可以使用 async-trait crate 来解决(见后文)。
async fn 的局限性:Fn* 函数限界不支持 HRTB #
async fn 的参数限界如果使用 Fn/FnMut/FnOnce trait 类型,则不支持 HRTB,即输入或输出参数不能有借用:
- 这里本质原因和同步闭包定义不支持 HRTB 的原因一致,即
Fn/FnMut/FnOnce trait只能返回一个固定 lifetime 的 Output,而不能随着输入参数的 lifetime 而变(HRTB)。
async fn for_each_city<F, Fut>(mut f: F)
where
// 错误:不支持 HRTB,这个 HRTB 是 Rust 为包含引用的限界自动添加的,等效于:F: for<'c> FnMut(&'c str) -> Fut,
//
// 注意:这里的 Fut 是一个泛型参数,不支持 & 也没有 lifetime 标识,所以语义是:对任意生命周期 'c,f(&'c str) 都返回同一个 Fut 类型。
F: FnMut(&str) -> Fut,
Fut: Future<Output = ()>,
{
for x in ["New York", "London", "Tokyo"] {
f(x).await;
}
}
// do_something2 概念上接近于:
// fn do_something2<'c>(city_name: &'c str) -> impl Future<Output = ()> + 'c
// 注意返回的 Future 依赖 'c。也就是说它实际更像:
// for<'c> Fn(&'c str) -> DoSomething2Future<'c>
// 而不是每次都返回同一个类型的 Fut:
// for<'c> Fn(&'c str) -> Fut
async fn do_something2(city_name: &str) { todo!() }
async fn main() {
for_each_city(do_something2).await;
}
// 区别在于:
// for_each_city() 的要求是:输入的闭包函数返回一个固定 Fut
// for<'c> FnMut(&'c str) -> Fut
// 但是传入的 async fn do_something2() 返回类型是随输入参数变化的 'c 类型:
// for<'c> FnMut(&'c str) -> Fut<'c>
//
// Rust 里普通泛型参数 Fut 不能表达这种“返回类型族”。这就是报错里说的:
// expected opaque type `impl for<'c> Future<Output = ()>`
// found opaque type `impl Future<Output = ()>`
//
// 更直白地说:
// do_something2(&'short str) 返回的 Future 可能借用了 'short 的参数;
// do_something2(&'long str) 返回的 Future 又借用了 'long 的参数。
// 它们不是一个生命周期无关的固定 Fut,而是一类不同 lifetime 的函数族。
/*
error[E0308]: mismatched types
--> src/main.rs:101:5
|
101 | for_each_city(do_something2).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ one type is more general than the other
|
= note: expected opaque type `impl for<'c> Future<Output = ()>`
found opaque type `impl Future<Output = ()>`
= note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
--> src/main.rs:89:34
|
89 | F: for<'c> FnMut(&'c str) -> Fut,
| ^^^
error: implementation of `FnMut` is not general enough
--> src/main.rs:101:5
|
86 | / async fn for_each_city<F, Fut>(mut f: F)
87 | | where
88 | | // 错误:不支持
89 | | F: for<'c> FnMut(&'c str) -> Fut,
| | ----------------------------- doesn't satisfy where-clause
90 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `for_each_city`...
...
101 | for_each_city(do_something2).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: ...`for<'a> fn(&'a str) -> impl Future<Output = ()> {do_something2}` must implement `FnMut<(&'c str,)>`
= note: ...but it actually implements `FnMut<(&'0 str,)>`, for some specific lifetime `'0`
*/
// 另一个例子 2:
async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut) where Fut: Future<Output = ()>, { todo!() }
async fn main() {
async fn g(_: &u8) { todo!() }
// error[E0308]: mismatched types
// error: implementation of `Fn` is not general enough
f1(g).await;
}
/*
error[E0308]: mismatched types
--> src/main.rs:57:9
|
57 | f1(g).await;
| ^^^^^ one type is more general than the other
|
= note: expected opaque type `impl for<'a> Future<Output = ()>`
found opaque type `impl Future<Output = ()>`
= note: distinct uses of `impl Trait` result in different opaque types
note: the lifetime requirement is introduced here
--> src/main.rs:44:48
|
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
| ^^^
error: implementation of `Fn` is not general enough
--> src/main.rs:57:9
|
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
| - ------------------------- doesn't satisfy where-clause
| _|
| |
45 | | where
46 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `f1`...
...
57 | f1(g).await;
| ^^^^^
|
= note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
= note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`
error: implementation of `Fn` is not general enough
--> src/main.rs:57:15
|
44 | async fn f1<Fut>(_: impl for<'a> Fn(&'a u8) -> Fut)
| - ------------------------- doesn't satisfy where-clause
| _|
| |
45 | | where
46 | | Fut: Future<Output = ()>,
| |_____________________________- due to a where-clause on `f1`...
...
57 | f1(g).await;
| ^^^^^
|
= note: ...`for<'a> fn(&'a u8) -> impl Future<Output = ()> {main::g}` must implement `Fn<(&'a u8,)>`
= note: ...but it actually implements `Fn<(&'0 u8,)>`, for some specific lifetime `'0`
*/
为了对 async fn 的函数参数支持 HRTB:
-
传统的解法是使用
trait object的Pin<Box<dyn Future<Output=xx> + Send + '_>>范式,这也 是async-trait crate的实现方式。(带来的影响:trait object 存在额外的一层 heap allocation,带来额外的性能开销。而 async fn 返回的 impl Future 是静态装箱,性能会更好些)。 -
Rust 1.75+ 开始,
async fn可以使用支持 HRTB 的AsyncFnXX限界: https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/
以 Pin<Box<dyn Future<Output=xx> + Send + '_>> 范式 为例:
- https://docs.rs/futures/latest/futures/future/type.BoxFuture.html 定义了
BoxFuture类型, 可以用于多线程环境。
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
// 把待返回的 Future 装箱到带有 lifetime 的 trait object 中:
use std::future::Future;
use std::pin::Pin;
async fn for_each_city<F>(mut f: F)
where
F: FnMut(&str) -> Pin<Box<dyn Future<Output = ()> + '_>>,
// 等效为:
// F: for<'c> FnMut(&'c str) -> Pin<Box<dyn Future<Output = ()> + 'c>>,
// 这样返回的 Pin<Box<dyn Future>> 对象的 lifetime 和输入参数一致,而不是如下所示的强迫所有 'c 都共用一个固定 Fut 类型。
// F: FnMut(&str) -> Fut,
// Fut: Future<Output=()>>
{
for x in ["New York", "London", "Tokyo"] {
f(x).await;
}
}
// 等效于:fn do_something2(city_name: &'c str) -> impl Future<Output=()> + 'c
async fn do_something2(city_name: &str) {
todo!()
}
async fn main() {
for_each_city(|city| Box::pin(do_something2(city))).await;
}
// 另一个例子:
// https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing/
fn async_callback<F, Fut>(callback: F)
where
// F 闭包输入、输出没有借用,没有 HRTB 的问题,故可以返回 Future 对象
F: FnOnce() -> Fut,
Fut: Future<Output = String>;
fn async_callback<F>(callback: F)
where
// F 闭包输入包含借用时,需要返回 Pin<Box<...>> 类型
F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
async fn do_something(name: &str) {}
// OK
async_callback(|name| Box::pin(async {
do_something(name).await;
}));
// OK: 强制将返回的 Pin<Box<impl Future<Output=()>>> 转换为 Pin<Box<dyn Future<Output=()>>>
// 这里将 impl Future<Output=()> 转换为 dyn Future<Output=()> 是 type coerce 支持的。
let ac = |name| {
let b: Pin<Box<dyn Future<Output=()>>> = Box::pin(async {
do_something(name).await;
});
b
};
async_callback(ac);
// ERROR: 这是由于 ac 的类型是 impl Fn(&str) -> Pin<Box<impl Future<Output=()>>>, 与 F 的限界不匹配
let ac = |name| {
Box::pin(async {
do_something(name).await;
})
};
async_callback(ac);
/*
error[E0271]: expected `{closure@main.rs:83:14}` to return `Pin<Box<dyn Future<Output = ()>>>`, but it returns `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
--> src/main.rs:84:9
|
83 | let ac = |name| {
| ------ this closure
84 | / Box::pin(async {
85 | | do_something(name).await;
86 | | })
| |__________^ expected `Pin<Box<dyn Future<Output = ()>>>`, found `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
87 | };
88 | async_callback(ac);
| -------------- -- closure used here
| |
| required by a bound introduced by this call
|
= note: expected struct `Pin<Box<(dyn Future<Output = ()> + 'static)>>`
found struct `Pin<Box<{async block@src/main.rs:84:18: 84:23}>>`
note: required by a bound in `async_callback`
--> src/main.rs:62:24
|
60 | fn async_callback<F>(callback: F)
| -------------- required by a bound in this function
61 | where
62 | F: FnOnce(&str) -> Pin<Box<dyn Future<Output = ()>>>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `async_callback`
For more information about this error, try `rustc --explain E0271`.
error: could not compile `my-demo` (bin "my-demo") due to 1 previous error
*/
最佳实践:只使用 Pin<Box<dyn Future<Output=()> + Send + '_>>。
async-trait crate #
async_trait proc macro 对 trait 中的 async fn 的返回值使用了另一种语法糖:它不返回静态分发的 impl Future 对象,而是返回动态装箱的 Pin<Box<dyn Future<Output=xx> + Send + 'static>> 类型对象
(带来的影响是性能会大些):
- 满足多线程环境,如各种 spawn() 提交的 async block 中各 .await 返回的 Future 对象需要满足
Send + 'static的限界要求; - 它的
as_mut()方法返回Pin<&mut dyn Future<Output=xx>>类型,它实现了Future,满足poll()方法的签名要求。 - 该 trait 是
trait object,即 dyn-safe;
async-trait 的核心原理是:把 trait 里的 async fn 改写成“返回 boxed Future 的普通函数”。
也就是把这个:
use async_trait::async_trait;
// 在定义和实现 trait 时都需要添加 #[async_trait]
#[async_trait::async_trait]
trait Service {
async fn call(&self, req: String) -> String;
}
// 大致变成:
trait Service {
fn call<'async_trait>(
&'async_trait self,
req: String,
) -> Pin<Box<dyn Future<Output = String> + Send + 'async_trait>>;
}
实现这个 trait 的代码也会被改写。比如:
// 在定义和实现 trait 时都需要添加 #[async_trait]
#[async_trait::async_trait]
impl Service for MyService {
async fn call(&self, req: String) -> String {
format!("hello {req}")
}
}
// 大致变成:
impl Service for MyService {
fn call<'async_trait>(
&'async_trait self,
req: String,
) -> Pin<Box<dyn Future<Output = String> + Send + 'async_trait>> {
Box::pin(async move { // 使用 async move 的目的是让 'async_trait 为 'static,满足 tokio::spawn() 等函数签名要求。
format!("hello {req}")
})
}
}
所以 async-trait 并没有让 trait 真正“原生支持 async”。它做的是宏展开,把 async fn 语法糖转换成 trait 能表达的形式。
关键点有三个。
第一,async fn 本质上返回一个匿名 Future 类型。
普通函数:
async fn foo() -> u32 {
1
}
近似等价于:
fn foo() -> impl Future<Output = u32> {
async move { 1 }
}
但 trait 里很长一段时间不能直接写这种“每个 impl 自己返回不同匿名 Future 类型”的接口,尤其是涉及 dyn Trait 时更麻烦。
第二,async-trait 用 dyn Future 做类型擦除。
不同实现的 async 函数会生成不同 Future 类型:
impl Service for A {
async fn call(&self, req: String) -> String { ... }
}
impl Service for B {
async fn call(&self, req: String) -> String { ... }
}
A::call 和 B::call 返回的 Future 类型不同。async-trait 把它们统一擦成:
Pin<Box<dyn Future<Output = String> + Send + 'async_trait>>
这样 trait 方法签名就有了一个固定返回类型。
第三,为什么要 Pin<Box<…»?
async block 生成的是状态机。这个状态机在 .await 之间可能保存对自己内部字段的引用,因此 Future 通常需要被 pin 住,不能随便移动。
Box 负责把 Future 放到堆上,得到稳定地址;Pin 负责表达“这个 Future 不应再被移动”。
所以:Box::pin(async move { ... }),就是 async-trait 的核心动作。
默认情况下,async-trait 会加 Send:Pin<Box<dyn Future<Output = T> + Send + 'async_trait>>
- 如果是返回的是
Box::pin(async move { ... }),则 ‘async_trait 一般是 ‘static 。
这意味着 async 方法返回的 Future 必须能在线程间移动。比如你在 async 方法里跨 .await 持有了 Rc 或 RefCell,可能就会报 future cannot be sent between threads safely。
如果不需要 Send,可以写:
#[async_trait::async_trait(?Send)]
trait Service {
async fn call(&self);
}
这会展开成近似: Pin<Box<dyn Future<Output = ()> + 'async_trait>>
少了 Send 约束。
它的代价也很明确:
- 每次调用 async trait 方法都会分配一次 Box。
- Future 通过 dyn Future 动态分发,少了一些静态优化机会。
- 类型和生命周期错误有时会变得绕,因为真实代码是宏展开后的 boxed future。
- 默认 Send 约束可能比你原本直觉里更严格。
一句话总结:
async-trait 的原理就是把: async fn method(...) -> T
改写成: fn method(...) -> Pin<Box<dyn Future<Output = T> + Send + '_>>
然后在函数体里用:Box::pin(async move { ... }) <=== 通过使用 async move 可以将 ‘_ 对应为 ‘static。
把原本的 async 逻辑塞进 boxed future。
另外,新版 Rust 的 unstable 特性 RTN 可以为 trait 的 async fn 返回值进行限界,可以弥补 async trait 的局限性
AsyncFn* 函数限界 #
async || { ... } 异步闭包和 AsyncFn / AsyncFnMut / AsyncFnOnce 是 Rust 1.85 稳定的。官方
1.85 发布说明里明确说 async closures 同时带来了这三个标准库 trait。
这些 trait 让用户能够为异步可调用类型(async callable types)表达 higher-ranked 的限界 (也即闭包参数包含借用),并允许异步闭包返回从闭包捕获对象中借用数据的 Future。
- 参考:https://rust-lang.github.io/rfcs/3668-async-closures.html
async closure 生成的匿名类型对象实现的是 AsyncFn / AsyncFnMut / AsyncFnOnce trait,而非
Fn/FnMut/FnOnce trait 。
AsyncFn* 可以理解成异步版的 Fn*:
- FnOnce -> AsyncFnOnce
- FnMut -> AsyncFnMut
- Fn -> AsyncFn
同步闭包是:F: FnMut(&str) -> R
异步闭包则是:F: AsyncFnMut(&str) -> R
注意异步闭包返回的 R 不是 Future 类型,而是 Future 的 Output 类型。也就是说:F: AsyncFnMut(&str) -> (),表达的是:调用 f(&str) 会返回一个 Future<Output = ()>
use std::ops::AsyncFnMut;
async fn for_each_city<F>(mut f: F)
where
F: for<'c> AsyncFnMut(&'c str),
{
for x in ["New York", "London", "Tokyo"] {
f(x).await;
}
}
async fn do_something2(city_name: &str) {
println!("{city_name}");
}
async fn main() {
for_each_city(do_something2).await;
}
这解决的正是之前 FnMut(&str) -> Fut 表达不了的问题。原来的写法:
F: for<'c> FnMut(&'c str) -> Fut,
Fut: Future<Output = ()>,
要求所有 ‘c 都返回同一个固定 Fut 类型:
&'short str -> Fut
&'long str -> Fut
但 async fn do_something2(city_name: &str) 更像: for<'c> fn(&'c str) -> Future<'c>
也就是返回 Future 的类型族:
&'short str -> DoSomethingFuture<'short>
&'long str -> DoSomethingFuture<'long>
AsyncFnMut 的关键能力就是把这个“返回类型族”编码进 trait 里。
标准库里 AsyncFnMut 的核心形状大概是:
trait AsyncFnMut<Args>: AsyncFnOnce<Args> {
type CallRefFuture<'a>: Future<Output = Self::Output>
where
Self: 'a;
fn async_call_mut(&mut self, args: Args) -> Self::CallRefFuture<'_>;
}
重点是这里:type CallRefFuture<'a>,它不是一个固定 Fut,而是一个带生命周期参数的关联类型。也就是说它能表达:返回的
Future 可以借用这次调用里的 self / 参数,借用活多久,Future 就活多久。
这就是 AsyncFn* 比:FnMut(...) -> Fut 更强的地方。
三个 trait 的区别和同步闭包一致:
-
AsyncFn: 表示可以通过共享引用反复调用,类似:
&self -> Future, 适合不修改捕获状态,或者只通过内部可变性修改状态的 async callback。 -
AsyncFnMut: 表示调用时需要可变访问闭包状态,类似:
&mut self -> Future, 适合这种:let mut vec = Vec::new(); let mut f = async || { vec.push(String::from("x")); }; f().await; f().await;但不能同时启动两个还没完成的调用:
let a = f(); let b = f(); // 不行:a 还持有对 f 捕获状态的可变借用 -
AsyncFnOnce: 表示调用会消费闭包,类似: self -> Future, 适合把捕获值 move 进 async 任务里,只调用一次。
一个实用判断规则:
- 如果 API 会多次调用 callback,且 callback 可能修改捕获状态,用:
F: AsyncFnMut(Args...) -> Output - 如果只调用一次,用:
F: AsyncFnOnce(Args...) -> Output - 如果需要 HRTB,也就是参数引用生命周期由调用点决定,用:
F: for<'a> AsyncFnMut(&'a T) -> Output
如果你还需要 Send,通常约束外层 async fn 返回的 future 或任务边界;AsyncFn* 本身并不等价于“返回的 Future 是 Send”。这点和 async fn in trait 类似,Send 是另一个维度,不能默认假设。
一句话总结:
- FnMut(&T) -> Fut 要求所有输入生命周期共用一个固定 Future 类型;
- AsyncFnMut(&T) 能表达“每次调用返回的 Future 可以借用这次调用的参数和闭包状态”;
- 所以它是 Rust 原生支持 lending async callback 的 trait bound。
Rust 1.75+ 开始,开始支持 AsyncFn* 函数限界,它支持 HRTB:
// https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing/
fn async_callback<F>(callback: F)
where
// 等效为:F: for<'a> FnOnce(&'a str) -> Pin<Box<dyn Future<Output=()> + 'a>>;
F: FnOnce(&str) -> Pin<Box<dyn Future<Output=()> + '_>>;
async fn do_something(name: &str) {}
// OK
async_callback(|name| Box::pin(async {
do_something(name).await;
}));
// Rust 1.75+ 异步闭包 AsyncFn** 限界直接支持 HRTB
// Instead of writing:
fn higher_ranked<F>(callback: F)
where
F: Fn(&Arg) -> Pin<Box<dyn Future<Output = ()> + '_>>
{ todo!() }
// Write this:
fn higher_ranked<F: AsyncFn(&Arg)> { todo!() }
async fn for_each_city<F>(mut f: F)
where
F: AsyncFnMut(&str),
// 等效为:
// F: for<'a> AsyncFnMut(&'a str),
{
for x in ["New York", "London", "Tokyo"] {
f(x).await;
}
}
async fn increment_city_population_db_query(city_name: &str) { todo!() }
async fn main() {
// Works for `async fn` that is higher-ranked.
for_each_city(increment_city_population_db_query).await;
}
Rust 自动为实现 Fn*() -> Fut 且 Fut 实现 Future<Output = T> 的类型(闭包、异步函数、 dyn Fn*
trait object 等)实现 AsyncFn*() -> T。
-
编译器为新的
async cloure实现了AsyncFn* trait。
// 以下类型自动实现了 AsyncFn*:
// Async functions:
async fn foo() {}
// Functions that return a concrete future type:
fn foo() -> Pin<Box<dyn Future<Output = ()>>> { Box::pin(async move {}) }
// Closures that return an async block:
let c = || async {};
当前只为直接可调用的具体类型实现了上述自动转换,对于函数参数位置的 impl Trait 类型,如 impl Fn() -> impl Future<Output = ()>,没有实现 AsyncFn();
fn is_async_fn(_: impl AsyncFn(&str)) {}
async fn async_fn_item(s: &str) {
is_async_fn(s);
// ^^^ This works.
}
fn generic(f: impl Fn() -> impl Future<Output = ()>) {
is_async_fn(f);
// ^^^ This does not work (yet).
}
在 trait bound、函数返回值场景,使用 AsyncFn{,Mut,Once} trait(而不是 async Fn{,Mut,Once}),也支持 HRTB。
// 不再使用老的 同步闭包返回 Future 的方式
// Instead of writing:
takes_async_callback(|arg| async {
// Do things here...
});
// 而是使用新支持的 async cloure 语法,编译器为 `async closure` 自动实现了 `AsyncFn{,Mut,Once} trait`
// Write this:
takes_async_callback(async |arg| {
// Do things here...
});
// 异步闭包限界场景:
// 不再使用返回 Fut 的限界。
// Instead of writing:
fn doesnt_exactly_take_an_async_closure<F, Fut>(callback: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = String>
{ todo!() }
// 而是使用 AsyncFnOnce 限界
// Write this:
fn takes_an_async_closure<F: AsyncFnOnce() -> String>(callback: F) { todo!() }
// 异步闭包限界支持 HRTB
// Instead of writing:
fn higher_ranked<F>(callback: F)
where
F: Fn(&Arg) -> Pin<Box<dyn Future<Output = ()> + '_>>
{ todo!() }
// Write this:
fn higher_ranked<F: AsyncFn(&Arg)> { todo!() }
参考:
- Stabilize async closures (RFC 3668)
- rfc-3668
- 注意:关于
async Fn* vs AsyncFn*语法问题,正式版本采用了后者: https://github.com/rust-lang/rust/issues/128129
- 注意:关于
let s = String::from("hello, world");
// 编译器为 async cloure 自动实现了 AsyncFn* trait
// Implements `AsyncFn()` along with `FnMut` and `Fn` because it can copy the `&String` that it captures.
let _ = async || {
println!("{s}");
};
let s = String::from("hello, world");
// Implements `AsyncFn()` but not `FnMut` or `Fn` because it moves and owns a value of type
// `String`, and therefore the future it returns needs to take a pointer to data owned by the
// closure.
let _ = async move || {
println!("{s}");
};
let mut s = String::from("hello, world");
// Implements `AsyncFnMut()` but not `FnMut` or `Fn` because it needs to reborrow a mutable pointer to `s`.
let _ = async move || {
s.push('!');
};
// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
let s = String::from("hello, world");
let closure = async move || {
ready(&s);
};
// At this point, `s` is moved out of. However, the allocation for `s` is still live. It just lives as a captured field in `closure`.
// Manually call `AsyncFnOnce` -- this isn't stable since `AsyncFnOnce` isn't stable, but it's useful for the demo.
let fut = AsyncFnOnce::call_once(closure, ());
// At this point, `closure` is dropped. However, the allocation for `s` is still live. It now lives sas a captured field in `fut`.
fut.await;
// After the future is awaited, it's dropped. At that point, the allocation for `s` is dropped.
// They can have arguments annotated with types:
let _ = async |_: u8| { todo!() };
// They can have their return types annotated:
let _ = async || -> u8 { todo!() };
// async closure 支持 HRTB
let _ = async |_: &str| { todo!() };
// They can capture values by move:
let x = String::from("hello, world");
let _ = async move || do_something(&x).await };
takes_an_async_fn(async |s| { other_fn(s).await }).await;
trait bound 场景和返回值:使用 AsyncFn{,Once,Mut} 语法:
// AsyncFn{,Once,Mut} 可以用于返回值、泛型参数限界 也就是:The AsyncFn* trait can be used anywhere a Fn* trait bound is allowed
//
/// In return-position impl trait:
fn closure() -> impl AsyncFn() { async || {} }
/// In trait bounds:
trait Foo<F>: Sized where F: AsyncFn()
{
fn new(f: F) -> Self;
}
/// in GATs:
trait Gat {
type AsyncHasher<T>: AsyncFn(T) -> i32;
}
// 用于限界时支持 HRTB
async fn takes_an_async_fn(f: impl AsyncFn(&str)) {
futures::join(f("hello"), f("world")).await;
}
// 作为限界:
// 老的写法,不建议:闭包和返回值分别限界:
fn doesnt_exactly_take_an_async_closure<F, Fut>(callback: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = String>
{ todo!() }
// 建议: 新的写法
fn takes_an_async_closure<F: AsyncFnOnce() -> String>(callback: F) { todo!() }
AsyncFn{,Once,Mut} 的局限性:不支持为返回值指定 trait bound,也就是不能为上面 AsyncFn* 的关联类型如
Output、CallOnceFuture 进行限界,但是老的 Fn* 是支持对内部的关联类型进行限界:
// AsyncFnOnce 的内部实现:
// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
/// An async-aware version of the [`FnOnce`](crate::ops::FnOnce) trait.
///
/// All `async fn` and functions returning futures implement this trait.
pub trait AsyncFnOnce<Args> {
/// Future returned by [`AsyncFnOnce::async_call_once`].
type CallOnceFuture: Future<Output = Self::Output>;
/// Output type of the called closure's future.
type Output;
/// Call the [`AsyncFnOnce`], returning a future which may move out of the called closure.
fn async_call_once(self, args: Args) -> Self::CallOnceFuture;
}
原因:https://rust-lang.github.io/rfcs/3668-async-closures.html#why-do-we-recommend-the-asyncfnonceoutput-type-remains-unstable-unlike-fnonceoutput
// 老的写法 OK:
fn foo<F, T>()
where
F: FnOnce() -> T,
F::Output: Send, // 支持为 FnOnce 的内部关联类型 Output 进行限界
// 等效于
//T: Send
{}
// 或者:
fn async_callback<F, Fut>(callback: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = String> + Send + 'static
{ todo!() }
// 新的写法不行:
fn foo<F, T>()
where
F: async FnOnce() -> T,
F::Output: Send, // 错误:不支持为 AsyncFnOnce 的内部关联类型 Output、CallOnceFuture 进行限界
F::CallOnceFuture: Send,
//~^ ERROR use of unstable library feature
{
}
// x 的返回值是 Future<Output=Result<()> 类型,而不是 spawn() 要求的 Future<Output=Result<()>> + Send + 'static 类型:
async fn foo(x: impl AsyncFn(&str)) -> Result<()> {
tokio::spawn(x("hello, world")).await
}
解决办法:
- 继续使用老的写法;
- 如果是 trait 中的 async 方法,则使用
async-trait crate; - 等待
Return Type Notation(RTN)特性稳定:
实验特性:RTN #
RTN:A way to name “the type returned by a function”
-
RFC: https://github.com/rust-lang/rfcs/pull/3654, https://github.com/rust-lang/rfcs/blob/master/text/3654-return-type-notation.md
// https://smallcultfollowing.com/babysteps/blog/2023/02/13/return-type-notation-send-bounds-part-2/
fn start_health_check<H>(health_check: H, server: Server)
where
H: HealthCheck + Send + 'static,
H::check(..): Send, // <— return type notation
// Here the where clause H::check(..): Send means “the type(s) returned when you call H::check must
// be Send. Since async functions return a future, this means that future must implement Send.
// 完整例子:
#![feature(return_type_notation)]
#![allow(dead_code)]
use std::time::Duration;
struct Server;
trait HealthCheck {
async fn check(&mut self, server: &Server) -> bool;
}
fn start_health_check<H>(mut health_check: H, server: Server)
where
H: HealthCheck + Send + 'static,
H::check(..): Send + 'static, // 加 'static 也 OK
{
tokio::spawn(async move {
while health_check.check(&server).await {
tokio::time::sleep(Duration::from_secs(1)).await;
}
emit_failure_log(&server).await;
});
}
async fn emit_failure_log(_server: &Server) { }
RTN 的各种语法(where 限界 或 关联类型限界):
// 注意:下面的 method 需要被替换为 Trait 实际的方法名称,例如上面的 check(...)
fn foo<T, U>()
where
// Associated type bound
T: Trait<method(..): Send + 'static>,
// Path bound
U: Trait,
U::method(..): Send + 'static,
{}
trait Trait {
// In GAT bounds.
type Item: Trait<method(..): Send + 'static>;
}
// In opaque item bounds too.
fn rpit() -> impl Foo<method(..): Send + 'static>;
使用 RTN 的示例:
// https://rust-lang.github.io/rfcs/3668-async-closures.html#interaction-with-return-type-notation-naming-the-future-returned-by-calling
async fn foo(x: F) -> Result<()>
where
F: AsyncFn(&str) -> Result<()>,
// The future from calling `F` is `Send` and `'static`.
F(..): Send + 'static,
// Which expands to two bounds:
// `for<'a> <F as AsyncFnMut>::CallRefFuture<'a>: Send`
// `<F as AsyncFnOnce>::CallOnceFuture: Send`
// the latter is only if `F` is bounded with `async Fn` or `async FnMut`.
{
tokio::spawn(x("hello, world")).await
}
// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766836658
#![feature(return_type_notation)]
trait Trait {
async fn foo() {}
}
trait Other {}
impl<T> Other for T
where
T: Trait,
T::foo(..): Send,
{}
impl Other for u32 {}
// https://github.com/rust-lang/rust/pull/138424#issuecomment-2766947426
#![feature(return_type_notation)]
trait Trait {
fn foo() -> impl Sized;
}
trait Id {
type This: ?Sized;
}
impl<T: ?Sized> Id for T {
type This = T;
}
trait GetOpaque {
type Opaque: ?Sized;
}
impl<T: ?Sized, U> GetOpaque for T
where
T: Trait,
T::foo(..): Id<This = U>,
{
type Opaque = U;
}
type FooTypePos<T> = <T as GetOpaque>::Opaque;
impl Trait for u8 {
fn foo() -> impl Sized {
|| ()
}
}
struct ContainsOpaque(FooTypePos<u8>);
// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=9691749c3f46ed13e459ee1d6382f7e8
#![feature(return_type_notation)]
async fn spawn_call<S>(service: S) -> S::Response
where
S: Service<(), Response: Send, call(..): Send> + Send + 'static,
{
tokio::spawn(async move {
service.call(()).await
}).await.unwrap()
}
trait Service<Request> {
type Response;
// Invoke the service.
async fn call(&self, req: Request) -> Self::Response;
}
// 另一个例子:https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=6d45f55355188001ea6499314ce30b4b
#![feature(return_type_notation)]
trait Factory {
fn widgets(&self) -> impl Iterator<Item = Widget>;
}
struct ReverseWidgets<F: Factory<widgets(..): DoubleEndedIterator>> {
factory: F,
}
impl<F> Factory for ReverseWidgets<F>
where
F: Factory<widgets(..): DoubleEndedIterator>,
{
fn widgets(&self) -> impl Iterator<Item = Widget> {
self.factory.widgets().rev()
// 👆 requires that the iterator be double-ended
}
}
struct Widget;
Future 对象的 ‘static + Send 问题 #
在多线程异步场景,tokio::spawn*(future) 函数提交的 Future 对象需要实现 Send+'static:
spawn(future):多线程执行,future 需要实现Future+Send+'staticspawn_blocking(closure): 多线程执行,closure 是实现Send+'static的同步闭包;
// tokio::task::spawn() 函数的 future 对象和返回值也需要实现 Send 和 'static
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static, // 对 Future 整体的要求:Send + 'static
F::Output: Send + 'static,
‘static 问题 #
spawn() 对提交的 Future 对象可能在后续任意时刻被异步运行时执行,甚至可能超出了创建该 Future 的 block 作用域,所以要求
Future 对象是 'static 类型,即在程序运行期间可以一直存在。同时由于是多线程环境来执行该 Future 对象,所以它也需要实现 Send。
和同步闭包 不一样,async fn/async closure 的输入参数包含 &T/&mut T 时 会影响 'static 的
达成,这是因为编译器在 async fn/async closure 的定义阶段创建一个 impl Future + '_ 的匿名类型对象,它会捕获输入
参数借用的生命周期:
- 若
async fn中没有任何非'static引用被捕获或返回,且输入参数也没有引用,则编译器会自动推导返回的 Future 为'static。
// 异步函数 foo 返回的实现 Future 的对象 lifetime 不是 'static
async fn foo(x: &u32) -> u32 {
*x
}
// 等效于:
fn foo<'a>(x: &'a u32) -> impl Future<Output = u32> + 'a {
async move { *x }
}
// 错误的例子:
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>>
{
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
// cheapo_request() 返回的 Future 对象不满足 'static 要求
handles.push(task::spawn_local(cheapo_request(&host, port, &path)));
}
let mut results = vec![];
for handle in handles {
results.push(handle.await);
}
results
}
// 解决办法:定义一个转移对象所有权的辅助函数:
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
cheapo_request(&host, port, &path).await
}
如果 feature 是通过 async block/async closure 创建的,和同步闭包一样, async block/closure 也是
闭包,也优先使用 &T/&mut T 方式捕获上下文中的对象,而这些借用很难满足 'static 的要求,故一般使用 async move 来将对象
所有权转移到 future 中。
注意:move 会把捕获的变量转移进闭包,但如果捕获的是 &T, 则闭包内部获得的还是一个非 'static 的借用,则还是有生命周期问题:
use std::thread;
let people = vec![
"Alice".to_string(),
"Bob".to_string(),
"Carol".to_string(),
];
let mut threads = Vec::new();
for person in &people {
threads.push(thread::spawn(move || {
// person 是 &String 类型,所以 move 捕获的是 &String 而非 String
println!("Hello, {}!", person);
}));
}
for thread in threads {
thread.join().unwrap();
}
// 报错:
/*
error[E0597]: `people` does not live long enough
--> src/main.rs:12:20
|
12 | for person in &people {
| ^^^^^^ borrowed value does not live long enough
...
21 | }
| - borrowed value only lives until here
|
= note: borrowed value must be valid for the static lifetime...
*/
解决办法:
// 方法 1:直接转移所有权
for person in people {
threads.push(thread::spawn(move || {
println!("Hello, {}!", person);
}));
}
// 方法 2:克隆每个元素
for person in &people {
let person = person.clone();
threads.push(thread::spawn(move || {
println!("Hello, {}!", person);
}));
}
Send 问题 #
同步闭包和异步闭包实现 Send 的判断方式有区别:同步闭包不看内部定义的对象、也不看传入的参数是否实现 Send,但是对于异步的 Future 对象,对象内任 意 .await 点返回的 Future 对象都实现 Send 时该 Future 对象才实现 Send,
async fn/async block/async closure 内部处于异步上下文,可以使用 .await 来 poll future 对象,所以
async executor 在执行异步上下文时可能会多次暂停,暂停的位置是各 .await 表达式(这些 .await 位置也是异步运行时的 yield point)。每次暂停都会返回一个新的 Future 对象,它封装了暂停位置依赖的上下文对象:栈变量、函数参数等,后续可能被调度到其他线程上运行,所以要求
异步上下文中的所 有 .await 的 future 都必须实现 Send。
- 因为 future 可能持有栈变量的地址,所以要求在 poll 时必须使用
Pin<&mut Self>类型,从而确保不能对没有实现 Unpin 的 future 对象进行移动。
只有这些 .await 位置返回的 Future 对象都满足 Send 时,async fn/async block/async closure 才是 Send 的。
所以需要看下面三种场景的跨 .await 对象都是否实现了 Send(典型的例外是 trait 中的 async fn 函数返回值没有实现 Send):
- 输入的参数对象;
- 内部定义的对象;
- 捕获的对象
对于普通 async fn、async block、async closure 返回的实现 impl Future 的匿名类型对象,编译器会检查它是否实现 Send。
但是对于 trait 中的 async fn 返回的 impl Future 匿名类型对象,它无论如何都没有实现
Send(原因参考后文,实际上不能对它进行任何的限界,如 Send、Unpin、'static), 所以如果在 async block/async closure 中 await 该对象会编译失败。
另外,trait object 默认也没有实现 Send/Sync/Unpin,在异步上下文中如果跨 await 使用它们则可能编译出错:
但 trait object 可以显式的声明实现 Send/Sync/Unpine:
- 定义 trait 时,通过 super trait 语法来定义它实现
Send/Sync/Unpin:trait MyTrait: Send + Sync + Unpin; - 或者使用
trait object时,显式进行限界:Box<dyn Debug + Send + UnwindSafe + Unpin>;
// Not recommended!
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;
fn some_fallible_thing() -> GenericResult<i32> {
//...
}
// This function's future is not `Send`...
async fn unfortunate() {
// Result<T, Box<dyn std::error::Error>> 的 Box<dyn std::error::Error> 没有实现 Send + ‘static
match some_fallible_thing() {
Err(error) => {
report_error(error);
}
Ok(output) => {
// ... is alive across this await ...
use_output(output).await;
}
}
}
// ... and thus this `spawn` is an error.
async_std::task::spawn(unfortunate());
// 解决办法:
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;
对于单线程的异步执行环境,提交的异步闭包可以不实现 Send,但是还是要求 'static,如:
tokio::spawn_local(future)tokio::task::LocalSettokio::select(以及futures::future::select, futures::future::select_all)
impl Future 对象没有实现 Send 的情况举例:
-
trait 内定义的 async fn:
// https://smallcultfollowing.com/babysteps/blog/2023/02/01/async-trait-send-bounds-part-1-intro/ trait HealthCheck { async fn check(&mut self, server: &Server) -> bool; // 等效于 // fn check(&mut self, server: &Server) -> impl Future<Output = bool>; // ^ Problem is here! This returns a future, but not necessarily a `Send` future. // The problem is that check returns an impl Future, but the trait doesn’t say whether this future is Send or not. } fn start_health_check<H>(health_check: H, server: Server) where H: HealthCheck + Send + 'static, // 对 H 泛型类型整体的限界,Send 和 'static 是对实现 HealthCheck 对象整体而言 { tokio::spawn(async move { // 错误:health_check.check(&server) 返回的 Future 未实现 Send while health_check.check(&server).await { tokio::time::sleep(Duration::from_secs(1)).await; } emit_failure_log(&server).await; }); } -
异步函数输入参数 &T 没有实现 Send:
// Context 包含 Cell 成员,所以 Context 没有实现 Sync,进而 &Context 没有实现 Send #[derive(Default)] pub struct Context { counter: Cell<i32> } impl Context { fn increment(&self) { self.counter.set(self.counter.get() + 1); } } async fn f(context: &Context) { // context 跨 .await,但未实现 Send,所以报错 g(context).await; context.increment(); } async fn g(_context: &Context) { } async fn task_main() { let context = Context::default(); // f(&context) 返回一个 Future 对象,而它内部的 &context 没有实现 Send,所以不能被 .await f(&context).await; } #[tokio::main] async fn main() { tokio::spawn(task_main()); } /* * error: future cannot be sent between threads safely --> src/main.rs:254:18 | 254 | tokio::spawn(task_main()); | ^^^^^^^^^^^ future returned by `task_main` is not `Send` | = help: within `Context`, the trait `Sync` is not implemented for `Cell<i32>` = note: if you want to do aliasing and mutation between multiple threads, use `std::sync::RwLock` or `std::sync::atomic::AtomicI32` instead note: future is not `Send` as this value is used across an await * */这是因为当编译普通的
async fn时,编译器生成一个匿名类型,包含它 stack 对象的引用:struct FStackFrame<'a> { context: &'a Context, await_state: usize }这个结构包含一个 context 的借用,而
Context: !Sync意味着&Context: !Send意味着FStackFrame<'_>: !Send意味着返回的 Future 对象不满足 Send 要求:pub fn spawn<F>(future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, // <- note this Send F::Output: Send + 'static,Tokio 的 Runtime 默认是多线程的 work-stealing 模式,所以后续可能在其它线程 poll 该 Future 对象,所以它要求它 Send 类型。同时调用 Future 对象时,可能已经脱离了创建它的 block,所以要求该对象捕获的上下文满足 ‘static。
-
异步函数内部创建的栈对象,如果跨 await 且没有实现 Send 则报错;
async fn not_send_future() { // Rc 不是 Send let _rc = Rc::new(42); // 模拟异步点 tokio::time::sleep(std::time::Duration::from_secs(1)).await; } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> { tokio::task::spawn(not_send_future()).await.unwrap(); } /* * error: future cannot be sent between threads safely --> src/main.rs:261:24 | 261 | tokio::task::spawn(not_send_future()).await.unwrap(); | ^^^^^^^^^^^^^^^^^ future returned by `not_send_future` is not `Send` | = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<i32>` note: future is not `Send` as this value is used across an await */ // 解决办法:将 _rc 在 await 前 drop,这样可以确保它不跨 await 未实现 Send 而报错: async fn not_send_future() { // Rc 不是 Send let _rc = Rc::new(42); drop(_rc); // 模拟异步点 tokio::time::sleep(std::time::Duration::from_secs(1)).await; } -
异步函数返回值没有实现 Send 报错:
// OK 的情况: use tokio::task; use std::error::Error; async fn ok_future() -> Result<(), Box<dyn Error>> { // await 在前,返回值在最后才构造 tokio::time::sleep(std::time::Duration::from_secs(1)).await; // 返回时才构造 Box<dyn Error> Err("some error".into()) } #[tokio::main] async fn main() { // OK:因为 Future 没有跨 await 捕获非 Send 类型 task::spawn(ok_future()).await.unwrap(); } // 错误的情况:在 .await 之前持有(报错) async fn returns_not_send_error() -> Result<(), Box<dyn std::error::Error>> { let err: Box<dyn std::error::Error> = "some error".into(); // 👇 err 在 await 之前被捕获,编译器必须把它放进 Future 的状态里 tokio::time::sleep(std::time::Duration::from_secs(1)).await; Err(err) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error + Send + 'static>> { tokio::task::spawn(returns_not_send_error()).await.unwrap(); } /* * error[E0277]: `(dyn std::error::Error + 'static)` cannot be sent between threads safely --> src/main.rs:274:24 | 274 | tokio::task::spawn(returns_not_send_error()).await.unwrap(); | ------------------ ^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn std::error::Error + 'static)` cannot be sent between threads safely | | | required by a bound introduced by this call | = help: the trait `Send` is not implemented for `(dyn std::error::Error + 'static)` = note: required for `Unique<(dyn std::error::Error + 'static)>` to implement `Send` */解决办法:将 async fn 返回的
Box<dyn std::error::Error>改成Box<dyn std::error::Error + Send+ Sync>就能满足tokio::spawn的要求(另外,Rust 为Box<dyn std::error::Error + Send + Sync>类型提供了多种方法和函数支持。)use tokio::task; use std::error::Error; async fn fixed_future() -> Result<(), Box<dyn Error + Send + Sync>> { // 这里的 Box 是 Send + Sync 的 let err: Box<dyn Error + Send + Sync> = "some error".into(); // await 之前就创建了,但类型满足 Send + Sync tokio::time::sleep(std::time::Duration::from_secs(1)).await; Err(err) } #[tokio::main] async fn main() { // ✔ 可以编译,因为 Future 是 Send task::spawn(fixed_future()).await.unwrap(); }
参考:
Pin/Unpin #
Pin/Unpin trait 参考:10-rust-lang-generic-trait.md
pin!() 和 Box::pin() #
async fn/block/closure 返回的 impl Future 匿名类型对象都是 Future+!Unpin 的, 不能用于
Pin::new(&mut fut) (因为签名要求 fut 实现 Unpin)。
- 是否实现
Send,编译器会进行自动判断。但trait中async fn返回的impl Future一定没有实现Send。
async fn add_one(x: u32) -> u32 {
x + 1
}
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
let mut future = add_one(42); // fut 类型:impl Future<Output=u32> + !Unpin
//let mut future = std::pin::Pin:new(&mut future); // 错误:future 没有实现 Unpin
pin!(future) // OK:返回一个 Pin<&mut impl Future> 类型对象,满足 poll 的签名要求
let _ = future.poll(&mut context)
Box<T> 有单独的 Future trait 实现定义:T 必须是 Future + Unpin 的,所以上面 async fn/async block/async closure 返回的 impl Future 也不满足要求:
impl<F, A> Future for Box<F, A>
where
F: Future + Unpin + ?Sized,
A: Allocator,
为了对 async fn/async block/async closure 返回的 impl Future + !Unpin 匿名类型对象进行
poll,需要创建一个 Pin<&mut impl Future> 类型对象。(future.await 不要求 future 实现
Unpin,原因见后文 .await 原理一节)。
解决办法:使用 Box::pin() 或 pin!() 来创建满足 Future::poll() 函数签名要求的 Pin<&mut impl Future> 对象:
pin!(future):传入一个 future 对象,但不要求它实现 Unpin,返回一个同名的Pin<&mut impl Future>类型对象。
-
T 实现 Future,则 Pin
也实现了 Future,所以可以对返回的 future 对象执行: future.await或(&mut future).await -
或者对返回的 future 对象执行:
future.poll()或(&mut future).poll() -
或者调用返回 future 的
as_mut()方法,返回一个新的Pin<&mut impl Future>对象,再调用它的poll()方法; -
Pin<&mut impl Future>实现了Future但未实现Unpin
impl<P> Future for Pin<P> where P: DerefMut, <P as Deref>::Target: Future,
type Output = <<P as Deref>::Target as Future>::Output
fn poll(self: Pin<&mut Pin<P>>, cx: &mut Context<'_>,) -> Poll<<Pin<P> as Future>::Output>
// Ptr 是 Unpin 时,Pin<Ptr> 才是 Unpin
impl<Ptr> Unpin for Pin<Ptr> where Ptr: Unpin,
// Box<T> 无条件实现了 Unpin
impl<T, A> Unpin for Box<T, A> where A: Allocator, T: ?Sized,
pin!(T) 宏返回 Pin<&mut impl Future<T>> 类型对象:如果 T 没有实现 Unpin(如 async fn/block/closure 返回的 impl Future 对象),则该宏将 T 值 pin 到堆内存中(不能被通过 &mut T 进行
move)。
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
pin!(future); // 返回一个同名的 Pin<&mut impl Future> 类型对象,满足 poll() 方法签名要求
let _ = future.poll(&mut context)
pin!() 参数必须是 Future 对象,而不是能是表达式:
async fn my_async_fn() {
// async logic here
}
#[tokio::main]
async fn main() {
// 错误
// let mut future = pin!(my_async_fn());
// OK
let mut future = my_async_fn()
pin!(future);
// 如果 Pin<Ptr> 的 Ptr 实现了 Future,则 Pin<Ptr> 也实现了 Future,所以可以对 Pin<Ptr> 进行 .await;
// future.await;
//
// 或者对 &mut Pin<&mut impl Future> 对象进行 .await
(&mut future).await;
}
pin!{} 支持同时创建多个 Future 对象并 Pin 住:
use tokio::{pin, select};
async fn my_async_fn() {
// async logic here
}
#[tokio::main]
async fn main() {
pin! {
let future1 = my_async_fn();
let future2 = my_async_fn();
}
select! {
_ = &mut future1 => {}
_ = &mut future2 => {}
}
}
使用 pin!() 来包装 async block/closure/fn 返回的结果,将获得 Pin<&mut impl Future> 类型对象,
从而满足 poll 方法签名要求。
-
Box::pin(future): 不要求 future 实现 Unpin,返回一个Pin<Box<impl Future>>类型对象,它的Pin::as_mut()方法返回新的Pin<&mut impl Future>对象,满足 poll() 方法签名要求;-
将对象 Pin 在堆上;
-
Pin<Box<impl Future>>实现了Future和Unpin
-
async fn add_one(x: u32) -> u32 {
x + 1
}
let fut = add_one(42); // fut 类型:impl Future<Output=u32> + !Unpin
fut.await; // OK,.await 不要求 fut 实现 Unpin
// Pin 的 as_mut() 方法签名:pub fn as_mut(&mut self) -> Pin<&mut <Ptr as Deref>::Target>
// 所以,future.as_mut() 返回 Pin<&mut impl Future> 对象,满足 Future 的 poll() 方法的函数签名要求。
let pinned_fut: Pin<Box<_>> = Box::pin(fut); // pinned_fut 类型:Pin<Box<impl Future>>, 被 Pin 在堆上,可以被安全 poll
pinned_fut.as_mut().poll(&mut cx); // 安全,且不转移 pinned_fut 的所有权
另外,由于 poll() 会消耗传入的 Pin 对象,如果对 future 进行重复 poll(如 loop + select!),则需要使用
Box::pin(T),它返回的 Pin<Box<impl Future>> 类型对象的 as_mut() 方法返回一个新的 Pin<&impl Future> 类型对象,可以进行 poll。
对于已有的 Box<T> 类型值, 可以使用 Box::into_pin(value) 创建 Pin<Box<T>> 对象:
async fn add_one(x: u32) -> u32 {
x + 1
}
fn boxed_add_one(x: u32) -> Box<dyn Future<Output = u32>> {
Box::new(add_one(x))
}
// boxed_fut 类型是 Box<dyn Future<Output = u32>>
let boxed_fut = boxed_add_one(42);
// 返回 Pin<Box<dyn Future> 对象
let pinned_fut: Pin<Box<_>> = Box::into_pin(boxed_fut);
为什么需要 pin!? #
pin! 用来把一个对象(而非它的引用)放在当前栈帧里,并拿到一个 Pin<&mut T>,同时让编译器保证这个值在 Pin 有效期间不会再被移动。
为什么不能直接写?
let mut x = some_not_unpin_value;
// p 是 x 的 &mut 借用。在它有效时,x 不能被移动和借用。
// 但是由于 x 变量还可用,所以还有可能通过 x 来对变量来操作值的可能。
let p = unsafe { Pin::new_unchecked(&mut x) };
因为 Pin<&mut T> 的语义承诺是:只要这个 Pin 还有效,里面的 T 就不能被移动。但 x 仍然是一个普通局部变量,理论上之后还可能被移动、
替换、mem::swap、mem::replace 等。对 !Unpin 类型来说,这会破坏 pinning 约束。所以直接对栈变量构造
Pin<&mut T> 很容易写出不安全代码。
pin! 的实现大概如下:
let mut p = pin!(future);
// 大致可以理解为:
let mut hidden_temp = future; // future 对象被移动进 栈上的 隐藏临时变量, future 变量本身不能再使用
let mut p = Pin<&mut hidden_temp>; // 之后只能通过 p 访问
关键效果有三个:
- future 被吃掉,原变量不能再直接使用。 《=== 原来的 future 对象不能再被使用。
- 栈上的真实对象被隐藏(转移)起来,只暴露
Pin<&mut T>。 - 这个隐藏对象的生命周期和返回的 Pin 绑定,Pin 不会悬空。
所以 pin! 的价值是: 在不分配堆内存的情况下,为 !Unpin 类型安全地创建栈上 pin 。常见场景就是手动 poll 一个 !Unpin future,而不想用 Box::pin。
由于 future 对象被转移到了栈上的隐藏临时对象中(以及包含它的借用的 Pin<&impl Future> 中),当所在的 block 结束时,
future 对象会被隐式 drop,Pin 对象 也会被 drop。由于隐藏对象在 block 结束时被 drop,所以该 Pin 对象不能被转移到
block 之外(因为它 &mut 借用的对象被 drop),所以说,**pin!(future) 是 future 对象 Pin 到所在的 block 或栈上
**。作 为对比,Pin<Box
pin!() 和 Box::pin() 的差异 #
因为 pin!(future) 的实现方式是在栈上创建一个隐藏零时对象,然后再用它的 &mut T 创建一个 Pin<&mut T> 对象,所以该
Pin 对象 的生命周期受限于隐藏临时对象所在的作用域,不能安全地“逃逸”到外部,,返回 std::pin::Pin<&mut impl Future>,
不允许被 move,所以 pin!() 不适合后续可能 move 返回 pin 值的情 况:
use core::pin::{pin, Pin};
// pin!() 错误的用法:
let x: Pin<&mut Foo> = {
let x: Pin<&mut Foo> = pin!(Foo { /* … */ });
x
}; // <- Foo is dropped
stuff(x); // Error: use of dropped value
而 Box::pin() 是将值分配到堆上,所以返回的 Pin<Box<T>> 可以被
move,但内存地址仍然不变,生命周期不受栈作用域限制,可以安全地传出,甚至跨线程。
另外异步任务调度的参数常要求是 Future+Send+'static,而 Pin<Box<impl Future>> 满足要求,所以也常用于
tokio::spawn(Box::pin(fut))。(这个场景下,pin!() 返回的 Pin<&mut impl Future> 不满足要求)。
Box::pin() 缺点:在堆上分配对象,开销比 pin!() 更大。
总结:
pin!():轻量,栈上,局部 pin,用于短期/局部逻辑。Box::pin():稳定,堆上,全局 pin,用于长期存储/跨线程场景。
.await 原理 #
对 Future 对象进行 .await 时,编译器内部自动使用 Pin::new_unchecked(&mut future) 来将 future 对
象 Pin 住,这个函数不检查 future 是否实现 Unpin(但是 Pin::new(&mut fut) 函数要求 fut 必须实现
Unpin),然后在一个 loop 中 poll,直到返回 Ready 的数据(消耗 Future 对象)。
async fn foo() -> i32 { 1 }
#[tokio::main]
async fn main() {
let mut f = foo(); // impl Future<Output=i32> + !Unpin,也即 f 没有实现 Unpin
let v = f.await; // Ok,但是还是可以被 await 或 poll。
// 这是因为 .await 的内部实现是使用 Pin::new_unchecked(&mut fut) 来将 fut Pin 住,等效于:
// loop {
// //
// match Pin::new_unchecked(&mut f).poll(&mut cx) {
// Poll::Ready(val) => break val,
// Poll::Pending => yield,
// }
// }
println!("{v}");
}
同理,tokio runtime 的 spawn() 也会对传入的 future 自动进行 Pin 操作,而且使用的是不检查 fut 是否实现
Unpin 的 Pin::new_unchecked(&mut fut) 函数:
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static, // 不需要 future 实现 Unpin
let fut = foo();
// OK
tokio::spawn(fut);
// 例子:
use futures::executor::block_on;
async fn hello_world() {
println!("hello, world!");
}
fn main() {
let future = hello_world();
// block_on() 自动对 future 对象进行 Pin 操作。
block_on(future);
}
// block_on() 的一种简化实现方式:
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
// pin!(T) 宏返回 Pin<&mut T> 类型对象;
// 为 future 创建一个 Pin<&mut dyn Future> 对象,
pin!(future);
loop {
// future 对象是 Pin<&mut dyn Future> 类型,满足 poll() 方法前面要求,但是会消耗 future 自身。
// 所以在 loop 循环中,需要使用 Pin::as_mut() 返回一个新的 Pin<&mut dyn Future> 对象。
//
// future.as_mut() 实际是 Pin<&mut dyn Future>.as_mut(), 而 &mut dyn Future Deref 返回的是 dyn Future 对象,
// 所以 as_mut() 返回的是 Pin<&mut dyn Future> 对象
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
}
.await 不支持对 &mut Future 进行 poll,因为 poll() 方法的签名是 self: Pin<&mut Self>:
async fn my_async_fn() {
// async logic here
}
#[tokio::main]
async fn main() {
let mut future = my_async_fn();
future.await // OK
// 错误:.await 不支持 &mut Future 自动 Pin
(&mut future).await;
}
解决办法:对 future 对象使用 pin!(future) 返回 Pin<&mut impl Future> 对象,然后对该对象或它的 &mut Pin<&mut impl Future> 进行 .await:
.await 的本质是对 Future 进行 poll,而 poll 需要 Pin<&mut Future>,所以只要能得到 Pin<&mut Future>,无论是 Pin<&mut Future>.await 还是 (&mut Pin<&mut Future>).await 都可以 (但 &mut impl Future 是不能 .await 的)。
#[tokio::main]
async fn main() {
let future = my_async_fn();
pin!(future); // 返回一个同名(future)的 Pin<&mut impl Future> 对象;
// 必须先将 Future 对象 pin 住,获得同名的 Pin<&mut impl Future> 类型对象,然后才能使用 &mut 进行 .await (因为 &mut impl Future 不支持 .await)。
// 以下 3 种方式均 OK:
// 1. 消耗 future 对象
future.await;
// 2. 不消耗 future 对象
(&mut future).await;
// 3. 不消耗 future 对象,as_mut() 返回一个新的 Pin<&mut impl Future> 类型对象
future.as_mut().await;
}
(&mut Pin<&mut Future>).await 在 loop+select!{} 中得到应用:要对 Future 对象进行重复 poll,而
select!{} 在并发 .await 时,如果有返回值,则会 drop 其它 branch 的 Future 对象,所以在 select branch
中不能直接使用 future 对象,以避免它被 drop 后下一次 loop 失败 select 失败。
解决办法是:
- 创建一个 mut 类型的 Future 对象;
- 使用
pin!(future)来创建一个同名,但类型是Pin<&mut impl Future<Output=xx>>的 Pin 对象, 它也实现了 Future; - 在 select branch 中使用
&mut future来进行轮询。
async fn action() {
// Some asynchronous logic
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
let operation = action(); // 返回一个 Future 对象, 没有对它 .await
// 必须先 Pin 该 Future 对象,创建一个同名的类型为 Pin<&mut impl Future<Output=i32> 类型对象 ;
tokio::pin!(operation);
loop {
tokio::select! {
// &mut impl Future 是不被 .await 支持的。
//
// 但是 &mut operation 类型是 &mut Pin<&mut impl Future<Output=i32> 被 .await 表达式所支持, 所以可以被 poll 轮询。
//
// 当该 branch 因不匹配或无返回值而被 drop/cancel 时,operation 任然有效,下一次 select!{} 可以接续使用 &mut operation
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}
除了 &mut pinned_future 外,还可以调用 pinned_future.as_mut() 方法来创建一个新的 Pin<&mut impl Future> 类型对象,它也实现了 Future,可以被 poll。
避免大的 stack buffer #
尽量避免 stack buffer, 因为跨 .await 的上下文变量会随着 task Future 对象一起被保存, 如果使用较大的 stack
buffer 变量, 则自动生成的 task Future 对象就比较大, buffer size 一般是 page sized 对齐的, 这导致 task 的
大小大概是 $page-size + a-few-bytes, 浪费内存。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// 在堆中分配 buff 内存, 而不使用 stack 上的 array
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => {
if socket.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(_) => {
return;
}
}
}
});
}
}
spawn_blocking() 简单实现 #
传入的闭包是同步函数,必须实现 Send + 'static。内部创建一个独立线程(tokio 实际是一个单独的 blocking 线程池
)来执行闭包对应的业务逻辑,结束后通过调用 waker 来通知 executor 完成。
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
// Arc 确保 inner 和它 clone 后的对象都使用同一个 Mutex 和内部的 Shared 对象。
let inner = Arc::new(Mutex::new(Shared { value: None, waker: None, }));
std::thread::spawn({
let inner = inner.clone();
move || {
// 执行传入的同步闭包
let value = closure();
let maybe_waker = {
let mut guard = inner.lock().unwrap();
guard.value = Some(value);
guard.waker.take()
};
if let Some(waker) = maybe_waker {
// 通知 executor 指向完成
waker.wake();
}
} });
SpawnBlocking(inner)
}
// 为自定义 SpawnBlocking 实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<T: Send> Future for SpawnBlocking<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut guard = self.0.lock().unwrap();
if let Some(value) = guard.value.take() {
return Poll::Ready(value);
}
// 更新 SpawnBlocking 内部的 waker
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
block_on() 简单实现 #
使用 Parker 机制来实现 waker,传入的 Future 在 loop poll 前,必须使用 pin!(future) 来转换为同名但类型为
Pin<&mut impl Future> 的对象。
future.poll() 会获得 Pin 对象的所有权,这样不能在 loop 中使用,使用 future.as_mut() 返回一个新的 Pin 对象来解决该问题。
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
// pin!(T) 宏返回 Pin<&mut T> 类型对象, 为 future 创建一个同名的 Pin<&mut dyn Future> 对象,
pin!(future);
loop {
// Pin<&mut dyn Future> 的 as_mut 方法返回一个新的 Pin<&mut dyn Future> 对象
// 满足 poll() 方法要求同时不消耗 future。
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
} }
std::task::Poll #
对于返回 Poll 类型对象的函数/方法,不能使用 .await 来 poll 它,因为 Poll 类型本身没有实现 Future,但是可以调用它的 poll() 方法来返回 Ready 或 Pending 结果,或使用 std::task::ready!() 宏:
use std::task::{ready, Context, Poll};
use std::future::{self, Future};
use std::pin::Pin;
pub fn do_poll(cx: &mut Context<'_>) -> Poll<()> {
let mut fut = future::ready(42);
let fut = Pin::new(&mut fut);
let num = ready!(fut.poll(cx));
// ... use num
Poll::Ready(())
// 其中的 ready!(fut.poll(cx)) 等效于:
let num = match fut.poll(cx) {
Poll::Ready(t) => t,
Poll::Pending => return Poll::Pending,
};