dart异步实现机制
参考资料 Flutter 单线程的Dart为何能够流程运行UI
总结:异步IO+事件循环
IO模型
- 阻塞式调用
- 非阻塞式调用
思路发展
传统的多线程模型虽然可以提升并发能力,但受限于CPU核心数量,并且线程间同步带来复杂性和性能开销。
为了解决CPU与IO速度不匹配的问题,出现了非阻塞IO和轮询机制,但这种方式依然会造成CPU资源浪费。
IO多路复用技术进一步优化,通过一个线程监听多个IO事件,提高了资源利用率。
最终,异步IO模型应运而生,通过事件驱动和回调机制,使得单线程也能高效处理大量并发IO任务,极大提升了程序的响应能力和扩展性。
事件循环
Dart在单线程中以消息循环机制来运行的,包含两个任务队列,一个是微任务队列 microTask queue
,另一个叫做事件队列 event queue
。
- 从main函数开始运行,待main函数执行完毕后,
event looper
开始工作 event looper
先遍历执行微任务队列所有事件,直到微任务队列为空event looper
再遍历执行事件队列中的所有事件,直到事件队列为空- 视情况退出循环
/// 伪代码
while (true) {
// 1. 先执行所有微任务队列里的任务
while (microtaskQueue 不空) {
取出一个任务执行();
}
// 2. 取出一个事件任务(event)执行
if (eventQueue 不空) {
取出一个事件执行();
}
}
测试代码
当Flutter应用启动后,消息循环机制便启动了。
首先会按照先进先出的顺序逐个执行微任务队列中的任务,当所有微任务队列执行完后便开始执行事件队列中的任务,事件任务执行完毕后再去执行微任务,如此循环往复。
- 微任务:由scheduleMicroTask建立的
- 事件任务:封装一层Future,未来会完成的任务
import 'dart:async';
void main() {
// A E C D B
print('A');
Future(() => print('B')); // 事件任务
Future.microtask(() => print('C')); // 微任务
scheduleMicrotask(() => print('D')); // 微任务
print('E');
}
Isolate
隔离
每个Isolate
都有自己的Event Loop
与Queue
。
Isolate
之间不共享任何资源,只能依靠消息机制通信,因此也就没有资源抢占问题。
假如不同的Isolate
需要通信(单向/双向),就只能通过向对方的事件循环队列里写入任务,并且它们之间的通讯方式是通过port(端口)
实现的,其中,Port又分为receivePort(接收端口)
和sendPort(发送端口)
,它们是成对出现的。
测试代码
void main() {
isolate1DoSomeThing();
}
void isolate1DoSomeThing() async {
// 1. 当前Isolate创建一个ReceivePort对象,并获得对应的SendPort对象;
var receivePort = ReceivePort();
var sendPort = receivePort.sendPort;
// 2. 创建一个新的Isolate,并实现新Isolate要执行的异步任务。
// 同时,将当前Isolate的SendPort对象传递给新的Isolate,以便新Isolate使用这个SendPort对象向原来的Isolate发送事件
// 调用Isolate.spawn创建一个新的Isolate,这是一个异步操作,因此使用await等待执行完毕
// 函数签名: static Future<FlutterIsolate> spawn<T>(void entryPoint(T message), T message) async{...}
Isolate? anotherIsolate =
await Isolate.spawn<SendPort>(otherIsolateInit, sendPort);
// 3. 调用当前Isolate#receivePort的listen方法监听新的Isolate传递过来的数据。Isolate之间什么数据类型(T)都可以传递,不必做任何标记
receivePort.listen((date) {
print('Isolate 1 接受消息:data = $date');
// 4. 消息传递完毕,关闭新创建的Isolate。
anotherIsolate?.kill(priority: Isolate.immediate);
anotherIsolate = null;
receivePort.close();
});
}
// 新Isolate要执行的异步任务
// 即调用当前Isolate的sendPort向其receivePort发送消息
void otherIsolateInit(SendPort sendPort) async {
print('Other Thread!');
sendPort.send('88');
}
Future
异步
Future
,中文意思未来,Dart为实现异步计算得到结果的封装。
构造方法
Future(FutureOr<T> computation())
,computation函数
会被添加到event队列
中执行Future.microtask(FutureOr<T> computation())
,通过scheduleMicrotask
方法将computation函数
添加到microtask队列
中,优先于event队列
执行Future.sync(FutureOr<T> computation())
,将会在当前task执行computation函数
,而不是将计算过程添加到任务队列Future.value([FutureOr<T>? value])
,创建一个返回指定value
的Future
Future.error(Object error, [StackTrace? stackTrace])
,通过error对象
和可选的stackTrace
创建Future
,使用该方法可创建个一个状态为failed的Future
Future.delayed(Duration duration, [FutureOr<T> computation()?])
,创建一个延迟执行的Future
静态方法
Future.wait<T>(Iterable<Future<T>> futures)
,等待所有futures
完成,返回一个包含所有结果的Future
,如果有任何一个Future
失败,则返回的Future
也会失败Future.forEach<T>(Iterable<T> input, FutureOr<void> f(T element))
,依次对input
中的每个元素执行异步操作f
,每次等待上一个操作完成后再执行下一个Future.any<T>(Iterable<Future<T>> futures)
,返回第一个完成的Future
,无论其是成功还是失败Future.doWhile(FutureOr<bool> f())
,重复执行异步操作f
,直到f
返回false
,每次等待上一次操作完成后再判断是否继续
处理结果
Future
支持的链式方法来处理异步结果:
then
:在Future
完成(成功)时执行回调函数,可以链式调用,实现异步流程控制。catchError
:在Future
发生异常时捕获错误并处理,通常与then
配合使用。whenComplete
:无论Future
成功或失败,都会执行的回调,适合做收尾清理工作。asStream
:将Future
转换为单值的Stream
,便于与流式 API 结合。timeout
:为Future
设置超时时间,超时后可执行指定回调或抛出异常。
async/await
async
函数本质上会返回一个Future
。在await
关键字处,Dart会把后续代码注册为一个微任务,等Future
完成时再执行。await
后的代码是在下一轮微任务里跑的。
async/await
本质是Future + 微任务调度。
测试代码
Future<void> main() async {
print(1);
var value = await getNum();
// 有 await
// 1 2 3 4
// 无 await
// 1 3 4 2
Future.delayed(Duration(seconds: 2), () => {print(2)});
print(value);
print(4);
}
Future<int> getNum() async {
await Future.delayed(Duration(seconds: 1));
return 3;
}