深色模式
Dart 并发编程
Dart并发
此页面从概念上概述了 Dart 中并发编程的工作原理。它从高层次解释了事件循环、异步语言特性和隔离区(isolates)。若要获取更多在 Dart 中使用并发的实际代码示例,请阅读“异步支持”页面和“隔离区”页面。
Dart 中的并发编程既指像 Future
和 Stream
这样的异步 API,也指隔离区,隔离区能让你将进程迁移到不同的核心上运行。
所有 Dart 代码都在隔离区中运行,从默认的主隔离区开始,还可以根据需要显式创建其他隔离区。当你创建一个新的隔离区时,它有自己独立的内存和自己的事件循环。事件循环是 Dart 实现异步和并发编程的基础。
事件循环
Dart 的运行时模型基于事件循环。事件循环负责执行程序代码、收集和处理事件等。
在应用程序运行时,所有事件都会被添加到一个名为事件队列的队列中。事件可以是各种类型,从重绘 UI 的请求、用户的点击和按键操作,到磁盘的 I/O 操作等。由于应用程序无法预测事件发生的顺序,事件循环会按照事件入队的顺序依次处理它们,一次处理一个事件。
事件循环的工作方式类似于以下代码:
dart
while (eventQueue.waitForEvent()) {
eventQueue.processNextEvent();
}
这个示例中的事件循环是同步的,并且在单线程上运行。然而,大多数 Dart 应用程序需要同时处理多个任务。例如,一个客户端应用程序可能需要执行一个 HTTP 请求,同时还要监听用户的按钮点击操作。为了处理这种情况,Dart 提供了许多异步 API,如 Future
、Stream
和 async-await
。这些 API 都是围绕这个事件循环构建的。
例如,考虑发起一个网络请求:
dart
http.get('https://example.com').then((response) {
if (response.statusCode == 200) {
print('Success!');
}
}
当这段代码到达事件循环时,它会立即调用 http.get
方法,并返回一个 Future
对象。它还会告诉事件循环保留 then()
子句中的回调函数,直到 HTTP 请求完成。当请求完成时,事件循环会执行该回调函数,并将请求的结果作为参数传递给它。
通常,事件循环就是以这种方式处理 Dart 中的所有其他异步事件的,比如 Stream
对象。
异步编程
本节总结了 Dart 中不同类型和语法的异步编程。如果你已经熟悉 Future
、Stream
和 async-await
,可以直接跳到隔离区部分。
Future
Future
表示一个异步操作的结果,该操作最终会以一个值或一个错误结束。
在下面的示例代码中,Future<String>
返回类型表示最终会提供一个 String
类型的值(或错误)的承诺。
dart
Future<String> _readFileAsync(String filename) {
final file = File(filename);
// .readAsString() 返回一个 Future。
// .then() 注册一个回调函数,当 `readAsString` 完成时执行。
return file.readAsString().then((contents) {
return contents.trim();
});
}
async-await
语法
async
和 await
关键字提供了一种声明式的方式来定义异步函数并使用它们的结果。
以下是一段同步代码的示例,它在等待文件 I/O 操作时会阻塞:
dart
const String filename = 'with_keys.json';
void main() {
// 读取一些数据。
final fileData = _readFileSync();
final jsonData = jsonDecode(fileData);
// 使用这些数据。
print('Number of JSON keys: ${jsonData.length}');
}
String _readFileSync() {
final file = File(filename);
final contents = file.readAsStringSync();
return contents.trim();
}
以下是一段类似的代码,但进行了修改(高亮部分)以使其变为异步代码:
dart
const String filename = 'with_keys.json';
void main() async {
// 读取一些数据。
final fileData = await _readFileAsync();
final jsonData = jsonDecode(fileData);
// 使用这些数据。
print('Number of JSON keys: ${jsonData.length}');
}
Future<String> _readFileAsync() async {
final file = File(filename);
final contents = await file.readAsString();
return contents.trim();
}
main()
函数在 _readFileAsync()
前面使用了 await
关键字,这样在执行原生代码(文件 I/O)时,其他 Dart 代码(如事件处理程序)可以使用 CPU。使用 await
还会将 _readFileAsync()
返回的 Future<String>
转换为 String
类型。因此,contents
变量的隐式类型为 String
。
注意:await
关键字只能在函数体前面带有 async
的函数中使用。
如下图所示,当 readAsString()
执行非 Dart 代码(无论是在 Dart 运行时还是操作系统中)时,Dart 代码会暂停。一旦 readAsString()
返回一个值,Dart 代码的执行就会继续。
Stream
Dart 还支持以流的形式编写异步代码。流会在未来多次提供值。承诺在一段时间内提供一系列 int
类型的值的类型为 Stream<int>
。
在下面的示例中,使用 Stream.periodic
创建的流每秒会重复发出一个新的 int
类型的值。
dart
Stream<int> stream = Stream.periodic(const Duration(seconds: 1), (i) => i * i);
await-for
和 yield
await-for
是一种 for
循环,当有新值提供时,它会执行循环的下一次迭代。换句话说,它用于“遍历”流。在这个示例中,当作为参数提供的流发出新值时,sumStream
函数也会发出一个新值。在返回值流的函数中,使用 yield
关键字而不是 return
。
dart
Stream<int> sumStream(Stream<int> stream) async* {
var sum = 0;
await for (final value in stream) {
yield sum += value;
}
}
如果你想了解更多关于使用 async
、await
、Stream
和 Future
的信息,请查看异步编程教程。
隔离区
除了异步 API 之外,Dart 还通过隔离区支持并发。大多数现代设备都有多核 CPU。为了利用多个核心,开发人员有时会使用共享内存线程进行并发运行。然而,共享状态的并发容易出错,并且会导致代码变得复杂。
Dart 代码不是在传统线程中运行,而是在隔离区中运行。使用隔离区,你的 Dart 代码可以同时执行多个独立的任务,如果有可用的额外处理器核心,还可以利用它们。隔离区类似于线程或进程,但每个隔离区都有自己的内存和一个运行事件循环的单线程。
每个隔离区都有自己的全局字段,这确保了一个隔离区中的状态无法被其他隔离区访问。隔离区之间只能通过消息传递进行通信。隔离区之间没有共享状态意味着像互斥锁、锁和数据竞争这样的并发复杂性问题不会在 Dart 中出现。不过,隔离区并不能完全防止竞态条件。若要了解更多关于这种并发模型的信息,请阅读有关 Actor 模型的内容。
平台说明:只有 Dart 原生平台实现了隔离区。若要了解更多关于 Dart Web 平台的信息,请参阅“Web 上的并发”部分。
主隔离区
在大多数情况下,你根本不需要考虑隔离区。Dart 程序默认在主隔离区中运行。主隔离区是程序开始运行和执行的线程,如下图所示:
即使是单隔离区程序也能平稳运行。在继续执行下一行代码之前,这些应用程序会使用 async-await
等待异步操作完成。一个表现良好的应用程序会快速启动,尽快进入事件循环。然后,应用程序会及时响应每个排队的事件,必要时使用异步操作。
隔离区的生命周期
如下图所示,每个隔离区都从运行一些 Dart 代码开始,比如 main()
函数。这段 Dart 代码可能会注册一些事件监听器,例如用于响应用户输入或文件 I/O 操作。当隔离区的初始函数返回时,如果它需要处理事件,隔离区会继续存在。处理完事件后,隔离区会退出。
事件处理
在客户端应用程序中,主隔离区的事件队列可能包含重绘请求以及点击和其他 UI 事件的通知。例如,下图展示了一个重绘事件,接着是一个点击事件,然后是两个重绘事件。事件循环按照先进先出的顺序从队列中取出事件。
事件处理在 main()
函数退出后在主隔离区上进行。在下图中,main()
函数退出后,主隔离区处理第一个重绘事件。之后,主隔离区处理点击事件,接着是一个重绘事件。
如果一个同步操作占用了太多的处理时间,应用程序可能会变得无响应。在下图中,点击处理代码执行时间过长,导致后续事件处理过晚。应用程序可能会出现冻结的情况,并且它执行的任何动画可能会变得卡顿。
在客户端应用程序中,长时间的同步操作通常会导致 UI 动画不流畅。更糟糕的是,UI 可能会完全无响应。
后台工作隔离区
如果你的应用程序的 UI 由于耗时的计算(例如解析一个大的 JSON 文件)而变得无响应,可以考虑将该计算任务卸载到一个工作隔离区,通常称为后台工作隔离区。常见的情况是,如下图所示,创建一个简单的工作隔离区来执行计算,然后退出。工作隔离区在退出时会通过消息返回其计算结果。
工作隔离区可以执行 I/O 操作(例如读写文件)、设置定时器等。它有自己的内存,并且不与主隔离区共享任何状态。工作隔离区可以阻塞而不会影响其他隔离区。
使用隔离区
在 Dart 中,根据不同的使用场景,有两种使用隔离区的方式:
- 使用
Isolate.run()
在一个单独的线程上执行单个计算任务。 - 使用
Isolate.spawn()
创建一个可以长时间处理多个消息的隔离区,或者创建一个后台工作隔离区。若要了解更多关于使用长时间运行的隔离区的信息,请阅读“隔离区”页面。
在大多数情况下,建议使用 Isolate.run
这个 API 在后台运行进程。
Isolate.run()
静态方法 Isolate.run()
需要一个参数:一个回调函数,该函数将在新创建的隔离区上运行。
dart
int slowFib(int n) => n <= 1 ? 1 : slowFib(n - 1) + slowFib(n - 2);
// 在不阻塞当前隔离区的情况下进行计算。
void fib40() async {
var result = await Isolate.run(() => slowFib(40));
print('Fib(40) = $result');
}
性能和隔离区组
当一个隔离区调用 Isolate.spawn()
时,这两个隔离区拥有相同的可执行代码,并且属于同一个隔离区组。隔离区组可以实现性能优化,例如共享代码;新的隔离区会立即运行隔离区组拥有的代码。此外,Isolate.exit()
只有在隔离区属于同一个隔离区组时才有效。
在某些特殊情况下,你可能需要使用 Isolate.spawnUri()
,它会使用指定 URI 处的代码副本设置新的隔离区。然而,spawnUri()
比 spawn()
慢得多,并且新的隔离区不属于创建它的隔离区的隔离区组。另一个性能影响是,当隔离区属于不同的组时,消息传递会变慢。
隔离区的局限性
隔离区不是线程
如果你是从支持多线程的语言转到 Dart 的,可能会期望隔离区的行为像线程一样,但事实并非如此。每个隔离区都有自己的状态,这确保了一个隔离区中的状态无法被其他隔离区访问。因此,隔离区只能访问自己的内存。
例如,如果你有一个包含全局可变变量的应用程序,该变量在你创建的隔离区中会是一个独立的变量。如果你在创建的隔离区中修改了这个变量,主隔离区中的该变量不会受到影响。这就是隔离区的工作方式,在考虑使用隔离区时,记住这一点很重要。
消息类型
通过 SendPort
发送的消息几乎可以是任何类型的 Dart 对象,但有一些例外:
- 具有原生资源的对象,如
Socket
。 ReceivePort
DynamicLibrary
Finalizable
Finalizer
NativeFinalizer
Pointer
UserTag
- 标记有
@pragma('vm:isolate-unsendable')
的类的实例
除了这些例外情况,任何对象都可以发送。有关更多信息,请查看 SendPort.send
的文档。
请注意,Isolate.spawn()
和 Isolate.exit()
对 SendPort
对象进行了抽象,因此它们也受到相同的限制。
隔离区之间的同步阻塞通信
可以并行运行的隔离区数量是有限制的。这个限制不会影响 Dart 中隔离区之间通过消息进行的标准异步通信。你可以让数百个隔离区并发运行并取得进展。隔离区会以轮询的方式在 CPU 上进行调度,并且经常会相互让出控制权。
隔离区只能在纯 Dart 之外进行同步通信,需要通过 FFI 使用 C 代码来实现。如果在 FFI 调用中通过同步阻塞来尝试在隔离区之间进行同步通信,当隔离区数量超过限制时,可能会导致死锁,除非采取特殊措施。这个限制不是硬编码为某个特定的数字,而是根据 Dart 应用程序可用的 Dart VM 堆大小来计算的。
为了避免这种情况,执行同步阻塞的 C 代码需要在执行阻塞操作之前离开当前隔离区,并在从 FFI 调用返回 Dart 之前重新进入该隔离区。阅读有关 Dart_EnterIsolate
和 Dart_ExitIsolate
的内容以了解更多信息。
Web 上的并发
所有 Dart 应用程序都可以使用 async-await
、Future
和 Stream
进行非阻塞、交错的计算。然而,Dart Web 平台不支持隔离区。Dart Web 应用程序可以使用 Web Worker 在后台线程中运行脚本,类似于隔离区的功能。不过,Web Worker 的功能和能力与隔离区有所不同。
例如,当 Web Worker 在不同线程之间发送数据时,它们会来回复制数据。不过,数据复制可能会非常慢,尤其是对于大消息。隔离区也会复制数据,但同时还提供了一些 API 可以更高效地传输保存消息的内存。
创建 Web Worker 和隔离区的方式也不同。你只能通过声明一个单独的程序入口点并单独编译它来创建 Web Worker。启动一个 Web Worker 类似于使用 Isolate.spawnUri
启动一个隔离区。你还可以使用 Isolate.spawn
启动一个隔离区,这种方式需要的资源更少,因为它会重用创建它的隔离区的一些代码和数据。Web Worker 没有等效的 API。
额外资源
- 如果你使用了多个隔离区,可以考虑使用 Flutter 中的
IsolateNameServer
,或者使用package:isolate_name_server
,它为非 Flutter 的 Dart 应用程序提供了类似的功能。 - 阅读更多关于 Actor 模型的内容,Dart 的隔离区就是基于该模型的。
- 有关隔离区 API 的更多文档:
Isolate.exit()
Isolate.spawn()
ReceivePort
SendPort
异步支持
Dart 库中充满了返回 Future
或 Stream
对象的函数。这些函数是异步的:它们在设置可能耗时的操作(如 I/O)后就会返回,而不会等待该操作完成。
async
和 await
关键字支持异步编程,让你可以编写看起来类似于同步代码的异步代码。
处理 Future
当你需要已完成的 Future
的结果时,有两种选择:
- 按照这里以及异步编程教程中的描述,使用
async
和await
。 - 按照
dart:async
文档中的描述,使用Future
API。
使用 async
和 await
的代码是异步的,但看起来很像同步代码。例如,以下是一段使用 await
等待异步函数结果的代码:
dart
await lookUpVersion();
要使用 await
,代码必须位于一个异步函数中,即标记为 async
的函数:
dart
Future<void> checkVersion() async {
var version = await lookUpVersion();
// 对版本号进行一些操作
}
注意:尽管异步函数可能会执行耗时的操作,但它不会等待这些操作完成。相反,异步函数会一直执行,直到遇到第一个 await
表达式。然后它会返回一个 Future
对象,只有在 await
表达式完成后才会继续执行。
在使用 await
的代码中,可以使用 try
、catch
和 finally
来处理错误和进行清理操作:
dart
try {
version = await lookUpVersion();
} catch (e) {
// 处理无法查询版本号的情况
}
你可以在异步函数中多次使用 await
。例如,以下代码三次等待函数的结果:
dart
var entrypoint = await findEntryPoint();
var exitCode = await runExecutable(entrypoint, args);
await flushThenExit(exitCode);
在 await
表达式中,表达式的值通常是一个 Future
;如果不是,该值会自动被包装在一个 Future
中。这个 Future
对象表示一个返回对象的承诺。await
表达式的值就是那个返回的对象。await
表达式会使执行暂停,直到该对象可用。
如果你在使用 await
时遇到编译时错误,请确保 await
位于异步函数中。例如,要在应用程序的 main()
函数中使用 await
,main()
函数体必须标记为 async
:
dart
void main() async {
checkVersion();
print('In main: version is ${await lookUpVersion()}');
}
注意:前面的示例使用了一个异步函数(checkVersion()
),但没有等待结果。如果代码假设该函数已经执行完毕,这种做法可能会导致问题。为避免此问题,请使用 unawaited_futures
代码检查规则。
若要交互式地了解如何使用 Future
、async
和 await
,请参阅异步编程教程。
声明异步函数
异步函数是其函数体标记有 async
修饰符的函数。
在函数中添加 async
关键字会使其返回一个 Future
。例如,考虑这个返回 String
类型的同步函数:
dart
String lookUpVersion() => '1.0.0';
如果你将其改为异步函数(例如,因为未来的实现会很耗时),返回值将是一个 Future
:
dart
Future<String> lookUpVersion() async => '1.0.0';
请注意,函数体不需要使用 Future
API。必要时,Dart 会创建 Future
对象。如果你的函数不返回有用的值,将其返回类型设为 Future<void>
。
若要交互式地了解如何使用 Future
、async
和 await
,请参阅异步编程教程。
处理 Stream
当你需要从 Stream
获取值时,有两种选择:
- 使用
async
和异步for
循环(await for
)。 - 按照
dart:async
文档中的描述,使用Stream
API。
注意:在使用 await for
之前,请确保它能使代码更清晰,并且你确实想等待流的所有结果。例如,通常不应该对 UI 事件监听器使用 await for
,因为 UI 框架会发送源源不断的事件。
异步 for
循环的形式如下:
dart
await for (varOrType identifier in expression) {
// 每次流发出一个值时执行。
}
表达式的值必须是 Stream
类型。执行过程如下:
- 等待流发出一个值。
- 执行
for
循环体,将变量设置为发出的值。 - 重复步骤 1 和 2,直到流关闭。
若要停止监听流,可以使用 break
或 return
语句,它们会跳出 for
循环并取消订阅流。
如果你在实现异步 for
循环时遇到编译时错误,请确保 await for
位于异步函数中。例如,要在应用程序的 main()
函数中使用异步 for
循环,main()
函数体必须标记为 async
:
dart
void main() async {
// ...
await for (final request in requestServer) {
handleRequest(request);
}
// ...
}
有关 Dart 异步编程支持的更多信息,请查看 dart:async
库文档。
Isolate
本页面将讨论一些使用隔离区(Isolate)API 实现隔离区的示例。
当应用程序需要处理可能会暂时阻塞其他计算的大量计算任务时,就应该使用隔离区。最常见的例子是在 Flutter 应用程序中,当你需要执行大量计算时,这些计算可能会导致 UI 无响应。
虽然没有严格规定何时必须使用隔离区,但以下是一些使用隔离区会很有用的场景:
- 解析和解码特别大的 JSON 数据块。
- 处理和压缩照片、音频和视频。
- 转换音频和视频文件。
- 对大型列表或文件系统进行复杂的搜索和过滤操作。
- 执行 I/O 操作,例如与数据库通信。
- 处理大量的网络请求。
实现一个简单的工作隔离区
这些示例实现了一个主隔离区,该主隔离区会创建一个简单的工作隔离区。Isolate.run()
简化了设置和管理工作隔离区的步骤:
- 创建(启动并实例化)一个隔离区。
- 在新创建的隔离区上运行一个函数。
- 捕获结果。
- 将结果返回给主隔离区。
- 工作完成后终止隔离区。
- 检查、捕获异常和错误,并将其抛回主隔离区。
Flutter 注意事项:如果你使用的是 Flutter,可以使用 Flutter 的 compute
函数来替代 Isolate.run()
。
在新隔离区中运行现有方法
在主隔离区中直接调用 run()
来创建一个新的隔离区(后台工作隔离区),同时 main()
函数会等待结果:
dart
const String filename = 'with_keys.json';
void main() async {
// 读取一些数据
final jsonData = await Isolate.run(_readAndParseJson);
// 使用这些数据
print('Number of JSON keys: ${jsonData.length}');
}
将你希望工作隔离区执行的函数作为第一个参数传递给它。在这个例子中,是现有的 _readAndParseJson()
函数:
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
}
Isolate.run()
获取 _readAndParseJson()
返回的结果,并将该值发送回主隔离区,然后关闭工作隔离区。
工作隔离区会将保存结果的内存转移到主隔离区,而不是复制数据。工作隔离区会进行一次验证,以确保对象可以被转移。
_readAndParseJson()
是一个现有的异步函数,它也可以直接在主隔离区中运行。使用 Isolate.run()
来运行它可以实现并发。工作隔离区完全将 _readAndParseJson()
的计算操作隔离出来,它可以在不阻塞主隔离区的情况下完成计算。
Isolate.run()
的结果总是一个 Future
,因为主隔离区中的代码会继续运行。工作隔离区执行的计算是同步还是异步并不影响主隔离区,因为无论哪种情况,它都是并发运行的。
完整的程序请查看 send_and_receive.dart
示例。
使用隔离区发送闭包
你还可以在主隔离区中直接使用函数字面量(即闭包),通过 run()
创建一个简单的工作隔离区。
dart
const String filename = 'with_keys.json';
void main() async {
// 读取一些数据
final jsonData = await Isolate.run(() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
});
// 使用这些数据
print('Number of JSON keys: ${jsonData.length}');
}
这个示例和前面的示例实现的功能相同。它创建了一个新的隔离区,进行计算,并将结果发送回来。
不过,这次隔离区发送的是一个闭包。闭包在功能和代码编写方式上比普通的命名函数限制更少。在这个例子中,Isolate.run()
并发地执行看起来像是本地代码的代码。从这个意义上说,你可以把 run()
想象成一个用于“并行运行”的控制流操作符。
使用端口在隔离区之间发送多条消息
短期存在的隔离区使用起来很方便,但创建新的隔离区以及将对象从一个隔离区复制到另一个隔离区会带来性能开销。如果你的代码需要反复使用 Isolate.run
来运行相同的计算,那么创建不会立即退出的长期存在的隔离区可能会提高性能。
为此,你可以使用一些 Isolate.run
封装的底层隔离区 API:
Isolate.spawn()
和Isolate.exit()
ReceivePort
和SendPort
SendPort.send()
方法
本节将介绍在新创建的隔离区和主隔离区之间建立双向通信所需的步骤。第一个示例“基本端口”从高层次介绍了这个过程。第二个示例“健壮端口”在第一个示例的基础上逐步添加了更多实用的、实际场景中的功能。
ReceivePort
和 SendPort
在隔离区之间建立长期通信需要两个类(除了 Isolate
之外):ReceivePort
和 SendPort
。这些端口是隔离区之间相互通信的唯一方式。
ReceivePort
是一个处理从其他隔离区发送过来的消息的对象。这些消息是通过 SendPort
发送的。
注意:一个 SendPort
对象精确关联一个 ReceivePort
,但一个 ReceivePort
可以有多个 SendPort
。当你创建一个 ReceivePort
时,它会为自己创建一个 SendPort
。你也可以创建额外的 SendPort
来向现有的 ReceivePort
发送消息。
端口的行为与 Stream
对象类似(实际上,接收端口实现了 Stream
接口)。你可以分别将 SendPort
和 ReceivePort
想象成 Stream
的 StreamController
和监听器。SendPort
类似于 StreamController
,因为你可以使用 SendPort.send()
方法向它们“添加”消息,而这些消息会由监听器(在这种情况下是 ReceivePort
)处理。然后,ReceivePort
会将接收到的消息作为参数传递给你提供的回调函数来进行处理。
设置端口
新创建的隔离区只能通过 Isolate.spawn
调用接收信息。如果你需要主隔离区在新隔离区创建后继续与其通信,就必须设置一个通信通道,让新隔离区可以向主隔离区发送消息。隔离区之间只能通过消息传递进行通信,它们无法“查看”彼此的内存,这也是“隔离区”这个名称的由来。
要设置这种双向通信,首先在主隔离区中创建一个 ReceivePort
,然后在使用 Isolate.spawn
创建新隔离区时,将其 SendPort
作为参数传递给新隔离区。新隔离区随后创建自己的 ReceivePort
,并通过主隔离区传递给它的 SendPort
将自己的 SendPort
发送回去。主隔离区接收到这个 SendPort
后,双方就都有了一个可以发送和接收消息的开放通道。
注意:本节中的图表是高层次的,旨在传达为隔离区使用端口的概念。实际实现需要更多的代码,你可以在本页面后面找到这些代码。
- 在主隔离区中创建一个
ReceivePort
。SendPort
会作为ReceivePort
的一个属性自动创建。 - 使用
Isolate.spawn()
创建工作隔离区。 - 将
ReceivePort.sendPort
的引用作为第一条消息传递给工作隔离区。 - 在工作隔离区中创建另一个新的
ReceivePort
。 - 将工作隔离区的
ReceivePort.sendPort
的引用作为第一条消息发送回主隔离区。
除了创建端口并设置通信之外,你还需要告诉端口在接收到消息时要做什么。这可以通过在每个 ReceivePort
上使用 listen
方法来实现。
- 通过主隔离区对工作隔离区的
SendPort
的引用发送一条消息。 - 通过工作隔离区的
ReceivePort
上的监听器接收并处理该消息。这里将执行你想要从主隔离区移开的计算任务。 - 通过工作隔离区对主隔离区的
SendPort
的引用发送一条返回消息。 - 通过主隔离区的
ReceivePort
上的监听器接收该消息。
基本端口示例
这个示例展示了如何设置一个长期存在的工作隔离区,并在它和主隔离区之间建立双向通信。代码以向新隔离区发送 JSON 文本为例,在新隔离区中对 JSON 进行解析和解码,然后将结果发送回主隔离区。
警告:这个示例旨在教授创建一个能够随时间发送和接收多条消息的新隔离区所需的最低限度知识。
它没有涵盖生产软件中预期的重要功能,如错误处理、关闭端口和消息排序。
下一节的“健壮端口”示例将涵盖这些功能,并讨论如果没有这些功能可能会出现的一些问题。
步骤 1:定义工作类
首先,为你的后台工作隔离区创建一个类。这个类包含了你需要的所有功能:
- 创建一个隔离区。
- 向该隔离区发送消息。
- 让隔离区解码一些 JSON 数据。
- 将解码后的 JSON 数据发送回主隔离区。
该类公开了两个公共方法:一个用于创建工作隔离区,另一个用于处理向该工作隔离区发送消息。
这个示例的其余部分将逐步展示如何填充类的方法。
dart
class Worker {
Future<void> spawn() async {
// 待办事项:添加创建工作隔离区的功能
}
void _handleResponsesFromIsolate(dynamic message) {
// 待办事项:处理从工作隔离区发送回来的消息
}
static void _startRemoteIsolate(SendPort port) {
// 待办事项:定义应在工作隔离区上执行的代码
}
Future<void> parseJson(String message) async {
// 待办事项:定义一个公共方法,用于向工作隔离区发送消息
}
}
步骤 2:创建工作隔离区
Worker.spawn
方法是你集中创建工作隔离区并确保它能够接收和发送消息的代码的地方。
- 首先,创建一个
ReceivePort
。这允许主隔离区接收新创建的工作隔离区发送的消息。 - 接下来,为接收端口添加一个监听器,以处理工作隔离区将发送回来的消息。传递给监听器的回调函数
_handleResponsesFromIsolate
将在步骤 4 中介绍。 - 最后,使用
Isolate.spawn
创建工作隔离区。它需要两个参数:一个要在工作隔离区上执行的函数(步骤 3 中介绍),以及接收端口的sendPort
属性。
dart
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
receivePort.sendPort
参数将在工作隔离区上调用回调函数(_startRemoteIsolate
)时作为参数传递给它。这是确保工作隔离区有办法向主隔离区发送消息的第一步。
步骤 3:在工作隔离区上执行代码
在这一步中,你定义了 _startRemoteIsolate
方法,该方法会被发送到工作隔离区,并在其创建时执行。这个方法就像是工作隔离区的“主”方法。
- 首先,创建另一个新的
ReceivePort
。这个端口用于接收来自主隔离区的未来消息。 - 接下来,将该端口的
SendPort
发送回主隔离区。 - 最后,为新的
ReceivePort
添加一个监听器。这个监听器处理主隔离区发送给工作隔离区的消息。
dart
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
工作隔离区的 ReceivePort
上的监听器对从主隔离区传递过来的 JSON 数据进行解码,然后将解码后的 JSON 数据发送回主隔离区。
这个监听器是主隔离区向工作隔离区发送消息的入口点。这是你告诉工作隔离区未来要执行什么代码的唯一机会。
步骤 4:在主隔离区上处理消息
最后,你需要告诉主隔离区如何处理从工作隔离区发送回主隔离区的消息。为此,你需要填充 _handleResponsesFromIsolate
方法。回想一下,在步骤 2 中,这个方法被传递给了 receivePort.listen
方法:
dart
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
还要回想一下,在步骤 3 中,你向主隔离区发送了一个 SendPort
。这个方法处理该 SendPort
的接收,以及处理未来的消息(即解码后的 JSON 数据)。
- 首先,检查消息是否为
SendPort
类型。如果是,则将该端口分配给类的_sendPort
属性,以便稍后可以使用它来发送消息。 - 接下来,检查消息是否为
Map<String, dynamic>
类型,这是解码后的 JSON 数据的预期类型。如果是,则使用你的应用程序特定逻辑处理该消息。在这个例子中,只是打印该消息。
dart
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
步骤 5:添加一个 Completer
以确保隔离区已设置好
为了完善这个类,定义一个名为 parseJson
的公共方法,该方法负责向工作隔离区发送消息。它还需要确保在隔离区完全设置好之前不会发送消息。为了处理这个问题,使用一个 Completer
。
- 首先,添加一个名为
_isolateReady
的类级别的Completer
属性。 - 接下来,如果消息是一个
SendPort
,在_handleResponsesFromIsolate
方法(在步骤 4 中创建)中调用completer
的complete()
方法。 - 最后,在
parseJson
方法中,在调用_sendPort.send
之前添加await _isolateReady.future
。这确保了在工作隔离区创建并将其SendPort
发送回主隔离区之前,不会向其发送任何消息。
dart
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
完整示例
展开查看完整示例
健壮端口示例
前面的示例解释了设置一个具有双向通信的长期存在的隔离区所需的基本构建块。如前所述,该示例缺少一些重要的功能,例如错误处理、在端口不再使用时关闭端口的能力,以及在某些情况下消息排序的不一致性。
这个示例在第一个示例的基础上进行了扩展,创建了一个具有这些额外功能以及更多功能的长期存在的工作隔离区,并遵循了更好的设计模式。虽然这段代码与第一个示例有相似之处,但它并不是第一个示例的扩展。
注意:这个示例假设你已经熟悉使用 Isolate.spawn
和端口在隔离区之间建立通信,这在前面的示例中已经介绍过。
步骤 1:定义工作类
首先,为你的后台工作隔离区创建一个类。这个类包含了你需要的所有功能:
- 创建一个隔离区。
- 向该隔离区发送消息。
- 让隔离区解码一些 JSON 数据。
- 将解码后的 JSON 数据发送回主隔离区。
该类公开了三个公共方法:一个用于创建工作隔离区,一个用于处理向该工作隔离区发送消息,还有一个用于在端口不再使用时关闭它们。
dart
class Worker {
final SendPort _commands;
final ReceivePort _responses;
Future<Object?> parseJson(String message) async {
// 待办事项:确保端口仍然开放
_commands.send(message);
}
static Future<Worker> spawn() async {
// 待办事项:添加功能以创建一个与新创建的隔离区建立连接的新 Worker 对象
throw UnimplementedError();
}
Worker._(this._responses, this._commands) {
// 待办事项:初始化主隔离区接收端口的监听器
}
void _handleResponsesFromIsolate(dynamic message) {
// 待办事项:处理从工作隔离区发送回来的消息
}
static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
// 待办事项:处理从工作隔离区发送回来的消息
}
static void _startRemoteIsolate(SendPort sp) {
// // 待办事项:初始化工作隔离区的端口
}
}
注意:在这个示例中,SendPort
和 ReceivePort
实例遵循了一个最佳实践命名约定,即它们的命名与主隔离区相关。从主隔离区通过 SendPort
发送到工作隔离区的消息称为命令,而发送回主隔离区的消息称为响应。
步骤 2:在 Worker.spawn
方法中创建一个 RawReceivePort
在创建隔离区之前,你需要创建一个 RawReceivePort
,它是一个底层的 ReceivePort
。使用 RawReceivePort
是一种推荐的模式,因为它允许你将隔离区的启动逻辑与处理隔离区上消息传递的逻辑分开。
在 Worker.spawn
方法中:
- 首先,创建
RawReceivePort
。这个ReceivePort
仅负责接收来自工作隔离区的初始消息,该消息将是一个SendPort
。 - 接下来,创建一个
Completer
,它将指示隔离区何时准备好接收消息。当这个Completer
完成时,它将返回一个包含ReceivePort
和SendPort
的记录。 - 然后,定义
RawReceivePort.handler
属性。这个属性是一个Function?
,其行为类似于ReceivePort.listener
。当这个端口接收到消息时,会调用这个函数。 - 在处理函数中,调用
connection.complete()
。这个方法期望一个包含ReceivePort
和SendPort
的记录作为参数。SendPort
是从工作隔离区发送的初始消息,将在下一步中分配给类级别的SendPort
,名为_commands
。 - 最后,使用
ReceivePort.fromRawReceivePort
构造函数创建一个新的ReceivePort
,并传入initPort
。
dart
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// 创建一个接收端口并添加其初始消息处理程序
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
}
}
先创建 RawReceivePort
,然后再创建 ReceivePort
,这样你就可以在之后为 ReceivePort.listen
添加新的回调函数。相反,如果你直接创建 ReceivePort
,由于 ReceivePort
实现的是 Stream
而不是 BroadcastStream
,你将只能添加一个监听器。
实际上,这允许你将隔离区的启动逻辑与完成通信设置后处理接收消息的逻辑分开。随着其他方法中的逻辑增多,这种好处会变得更加明显。
步骤 3:使用 Isolate.spawn
创建工作隔离区
这一步将继续完善 Worker.spawn
方法。你将添加创建隔离区所需的代码,并从这个类返回一个 Worker
实例。在这个示例中,对 Isolate.spawn
的调用被包装在一个 try/catch
块中,以确保如果隔离区启动失败,initPort
将被关闭,并且不会创建 Worker
对象。
- 首先,在
try/catch
块中尝试创建工作隔离区。如果创建工作隔离区失败,关闭上一步中创建的接收端口。传递给Isolate.spawn
的方法将在后面的步骤中介绍。 - 接下来,等待
connection.future
,并从它返回的记录中解构出sendPort
和receivePort
。 - 最后,通过调用其私有构造函数返回一个
Worker
实例,并传入completer
中的端口。
dart
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// 创建一个接收端口并添加其初始消息处理程序
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// 创建隔离区
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
}
请注意,与前面的示例相比,在这个示例中,Worker.spawn
充当了这个类的异步静态构造函数,并且是创建 Worker
实例的唯一方式。这简化了 API,使创建 Worker
实例的代码更加简洁。
步骤 4:完成隔离区的设置过程
在这一步中,你将完成基本的隔离区设置过程。这几乎与前面的示例完全对应,没有新的概念。唯一的细微变化是代码被拆分成了更多的方法,这是一种设计实践,为在本示例的其余部分添加更多功能做好了准备。要深入了解设置隔离区的基本过程,请参阅基本端口示例。
- 首先,创建从
Worker.spawn
方法返回的私有构造函数。在构造函数体中,为主隔离区使用的接收端口添加一个监听器,并将一个尚未定义的方法_handleResponsesFromIsolate
传递给该监听器。
dart
class Worker {
final SendPort _commands;
final ReceivePort _responses;
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
}
- 接下来,在
_startRemoteIsolate
中添加负责初始化工作隔离区上端口的代码。回想一下,这个方法在Worker.spawn
方法中被传递给了Isolate.spawn
,并且它将接收主隔离区的SendPort
作为参数。- 创建一个新的
ReceivePort
。 - 将该端口的
SendPort
发送回主隔离区。 - 调用一个名为
_handleCommandsToIsolate
的新方法,并将新的ReceivePort
和主隔离区的SendPort
作为参数传递。
- 创建一个新的
dart
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
- 然后,添加
_handleCommandsToIsolate
方法,该方法负责接收来自主隔离区的消息,在工作隔离区上解码 JSON 数据,并将解码后的 JSON 数据作为响应发送回去。- 首先,在工作隔离区的
ReceivePort
上声明一个监听器。 - 在添加到监听器的回调函数中,在
try/catch
块中尝试对从主隔离区传递过来的 JSON 数据进行解码。如果解码成功,将解码后的 JSON 数据发送回主隔离区。 - 如果发生错误,发送一个
RemoteError
。
- 首先,在工作隔离区的
dart
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
try {
final jsonData = jsonDecode(message as String);
sendPort.send(jsonData);
} catch (e) {
sendPort.send(RemoteError(e.toString(), ''));
}
});
}
- 接着,添加
_handleResponsesFromIsolate
方法的代码。- 首先,检查消息是否为
RemoteError
类型,如果是,则抛出该错误。 - 否则,打印该消息。在后续步骤中,你将更新此代码以返回消息而不是打印它们。
- 首先,检查消息是否为
dart
void _handleResponsesFromIsolate(dynamic message) {
if (message is RemoteError) {
throw message;
} else {
print(message);
}
}
- 最后,添加
parseJson
方法,这是一个公共方法,允许外部代码将 JSON 数据发送到工作隔离区进行解码。
dart
Future<Object?> parseJson(String message) async {
_commands.send(message);
}
你将在下一步中更新此方法。
步骤 5:同时处理多条消息
目前,如果你快速向工作隔离区发送消息,隔离区将按照完成的顺序发送解码后的 JSON 响应,而不是按照发送的顺序。你无法确定哪个响应对应哪个消息。
在这一步中,你将通过为每条消息分配一个 ID,并使用 Completer
对象来解决这个问题,以确保当外部代码调用 parseJson
时,返回给调用者的响应是正确的响应。
- 首先,为
Worker
类添加两个类级别的属性:Map<int, Completer<Object?>> _activeRequests
int _idCounter
dart
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
// ···
}
_activeRequests
映射将发送到工作隔离区的消息与一个 Completer
关联起来。_activeRequests
中使用的键来自 _idCounter
,随着发送的消息增多,_idCounter
会递增。 2. 接下来,更新 parseJson
方法,在向工作隔离区发送消息之前创建 Completer
。 - 首先创建一个 Completer
。 - 然后,递增 _idCounter
,以便每个 Completer
都与一个唯一的数字关联。 - 在 _activeRequests
映射中添加一个条目,其中键是 _idCounter
的当前值,值是 Completer
。 - 将消息和 ID 一起发送到工作隔离区。由于你只能通过 SendPort
发送一个值,因此将 ID 和消息包装在一个记录中。 - 最后,返回 Completer
的 future
,它最终将包含来自工作隔离区的响应。
dart
Future<Object?> parseJson(String message) async {
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
你还需要更新 _handleResponsesFromIsolate
和 _handleCommandsToIsolate
以处理这个系统。 3. 在 _handleCommandsToIsolate
中,你需要考虑消息是一个包含两个值的记录,而不仅仅是 JSON 文本。通过从 message
中解构出这些值来实现。 然后,在解码 JSON 数据后,更新对 sendPort.send
的调用,再次使用记录将 ID 和解码后的 JSON 数据一起发送回主隔离区。
dart
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
final (int id, String jsonText) = message as (int, String); // 新增
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData)); // 更新
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
- 最后,更新
_handleResponsesFromIsolate
。- 首先,再次从
message
参数中解构出 ID 和响应。 - 然后,从
_activeRequests
映射中移除与该请求对应的Completer
。 - 最后,不是抛出错误或打印解码后的 JSON 数据,而是完成
Completer
,并传入响应。当完成时,响应将被返回给在主隔离区调用parseJson
的代码。
- 首先,再次从
dart
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?); // 新增
final completer = _activeRequests.remove(id)!; // 新增
if (response is RemoteError) {
completer.completeError(response); // 更新
} else {
completer.complete(response); // 更新
}
}
步骤 6:添加关闭端口的功能
当你的代码不再使用隔离区时,你应该关闭主隔离区和工作隔离区上的端口。
- 首先,添加一个类级别的布尔值,用于跟踪端口是否已关闭。
- 然后,添加
Worker.close
方法。在这个方法中:- 将
_closed
更新为true
。 - 向工作隔离区发送一条最终消息。这条消息是一个字符串 “shutdown”,但你可以使用任何你想要的对象。你将在接下来的代码片段中使用它。
- 最后,检查
_activeRequests
是否为空。如果为空,则关闭主隔离区的ReceivePort
,名为_responses
。
- 将
dart
class Worker {
bool _closed = false;
// ···
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
- 接下来,你需要在工作隔离区中处理 “shutdown” 消息。在
_handleCommandsToIsolate
方法中添加以下代码。这段代码将检查消息是否是字符串 “shutdown”。如果是,它将关闭工作隔离区的ReceivePort
并返回。
dart
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
// 新增 if 块
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
- 最后,你应该添加代码,在尝试发送消息之前检查端口是否已关闭。在
Worker.parseJson
方法中添加一行代码。
dart
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed'); // 新增
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}