TaskPool 与 Worker 多线程详解
·
鸿蒙开发:TaskPool 与 Worker 多线程详解
HarmonyOS NEXT 提供了两种多线程并发机制:TaskPool 和 Worker。两者都基于 Actor 并发模型,线程间不共享内存,通过消息传递通信。本文详细介绍两者的用法与区别。
一、TaskPool(任务池)
1.1 概念
TaskPool 是系统管理的线程池,开发者只需提交任务,无需关心线程的创建与销毁。系统会根据负载自动调度,适合短时、独立、高频的并发任务。
1.2 基本用法
import taskpool from '@ohos.taskpool';
// 定义一个可并发执行的函数(必须用 @Concurrent 装饰)
@Concurrent
function heavyCompute(input: number): number {
let result = 0;
for (let i = 0; i < input; i++) {
result += i;
}
return result;
}
// 创建任务并执行
async function runTask() {
const task = new taskpool.Task(heavyCompute, 1000000);
const result = await taskpool.execute(task);
console.log('计算结果:', result);
}
1.3 设置任务优先级
import taskpool from '@ohos.taskpool';
@Concurrent
function parseData(data: string): object {
return JSON.parse(data);
}
async function runWithPriority() {
const task = new taskpool.Task(parseData, '{"key":"value"}');
// 优先级:HIGH / MEDIUM / LOW
const result = await taskpool.execute(task, taskpool.Priority.HIGH);
console.log('解析结果:', result);
}
1.4 任务取消
import taskpool from '@ohos.taskpool';
@Concurrent
function longTask(n: number): number {
let sum = 0;
for (let i = 0; i < n; i++) sum += i;
return sum;
}
async function cancelDemo() {
const task = new taskpool.Task(longTask, 999999999);
// 异步执行,不等待结果
taskpool.execute(task).catch((e: Error) => {
console.log('任务被取消或出错:', e.message);
});
// 取消任务
taskpool.cancel(task);
}
1.5 任务组(TaskGroup)
将多个任务打包,全部完成后统一返回结果:
import taskpool from '@ohos.taskpool';
@Concurrent
function multiply(a: number, b: number): number {
return a * b;
}
async function runGroup() {
const group = new taskpool.TaskGroup();
group.addTask(multiply, 2, 3);
group.addTask(multiply, 4, 5);
group.addTask(multiply, 6, 7);
// 返回所有任务结果的数组
const results = await taskpool.execute(group) as number[];
console.log('任务组结果:', results); // [6, 20, 42]
}
二、Worker(工作线程)
2.1 概念
Worker 是开发者手动创建和管理的独立线程,适合长时间运行、需要持续通信的任务,例如文件处理、持续监听、复杂状态维护等。
2.2 创建 Worker 文件
Worker 逻辑需要写在独立文件中,路径通常为 entry/src/main/ets/workers/MyWorker.ts:
// entry/src/main/ets/workers/MyWorker.ts
import worker, { MessageEvents } from '@ohos.worker';
const workerPort = worker.workerPort;
// 接收主线程消息
workerPort.onmessage = (e: MessageEvents) => {
const input = e.data as number;
console.log('Worker 收到:', input);
// 执行耗时操作
let result = 0;
for (let i = 0; i < input; i++) {
result += i;
}
// 将结果发回主线程
workerPort.postMessage(result);
};
// 错误处理
workerPort.onmessageerror = (e: MessageEvents) => {
console.error('Worker 消息错误:', e);
};
2.3 主线程使用 Worker
import worker from '@ohos.worker';
function startWorker() {
// 创建 Worker 实例,指向 Worker 文件路径
const myWorker = new worker.ThreadWorker(
'entry/ets/workers/MyWorker.ts'
);
// 接收 Worker 返回的消息
myWorker.onmessage = (e) => {
console.log('主线程收到结果:', e.data);
// 任务完成后销毁 Worker
myWorker.terminate();
};
// 错误处理
myWorker.onerror = (e) => {
console.error('Worker 出错:', e.message);
};
// 向 Worker 发送任务数据
myWorker.postMessage(1000000);
}
2.4 主线程与 Worker 双向通信
// Worker 文件:持续接收并响应消息
workerPort.onmessage = (e: MessageEvents) => {
const { type, data } = e.data;
if (type === 'compute') {
const result = data * data;
workerPort.postMessage({ type: 'result', data: result });
} else if (type === 'stop') {
workerPort.close(); // Worker 主动关闭
}
};
// 主线程:多次与 Worker 通信
const myWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
myWorker.onmessage = (e) => {
console.log('收到结果:', e.data);
};
// 发送多次消息
myWorker.postMessage({ type: 'compute', data: 5 });
myWorker.postMessage({ type: 'compute', data: 10 });
myWorker.postMessage({ type: 'stop', data: null });
2.5 传递可转移对象(ArrayBuffer)
通过转移所有权避免内存拷贝,提升大数据传输性能:
const buffer = new ArrayBuffer(1024 * 1024); // 1MB
// 第二个参数指定转移的对象,转移后主线程不可再访问该 buffer
myWorker.postMessage(buffer, [buffer]);
三、TaskPool vs Worker 对比
| 对比项 | TaskPool | Worker |
|---|---|---|
| 线程管理 | 系统自动管理 | 开发者手动创建/销毁 |
| 适用场景 | 短时、独立、高频任务 | 长时运行、持续通信任务 |
| 任务粒度 | 单个函数 | 独立线程文件 |
| 通信方式 | async/await 返回值 | postMessage 消息传递 |
| 优先级控制 | 支持(HIGH/MEDIUM/LOW) | 不支持 |
| 任务取消 | 支持 cancel | 支持 terminate |
| 代码复杂度 | 低 | 较高 |
| 线程数量 | 系统自动伸缩 | 每次 new 创建一个 |
| 状态维护 | 不适合(无状态) | 适合(有状态) |
| 数据传递 | 支持序列化对象 | 支持序列化 + 可转移对象 |
四、如何选择
任务是否需要长期运行或维护状态?
├── 是 → 使用 Worker
└── 否
├── 任务是否需要频繁提交?
│ ├── 是 → 使用 TaskPool(系统自动复用线程)
│ └── 否
│ ├── 是否需要任务优先级控制?
│ │ ├── 是 → 使用 TaskPool
│ │ └── 否 → TaskPool 或 Worker 均可
简单原则:
- 能用 TaskPool 就用 TaskPool,开销小、使用简单
- 需要持续运行或复杂双向通信时,选 Worker
五、注意事项
- @Concurrent 装饰器:TaskPool 中执行的函数必须加此装饰器,且不能引用外部闭包变量。
- 数据隔离:两种方式线程间均不共享内存,传递的对象会被序列化(深拷贝),修改不会影响原对象。
- UI 操作:Worker 和 TaskPool 子线程中均不能操作 UI,需将结果传回主线程再更新。
- Worker 文件路径:创建 Worker 时路径需与实际文件位置一致,否则运行时报错。
- 及时销毁:Worker 使用完毕后应调用
terminate()释放资源,避免内存泄漏。
六、总结
| 场景示例 | 推荐方案 |
|---|---|
| 图片压缩、JSON 解析 | TaskPool |
| 批量数学计算 | TaskPool(TaskGroup) |
| 文件持续读写监听 | Worker |
| WebSocket 长连接维护 | Worker |
| 多个独立请求并发处理 | TaskPool |
| 需要与子线程反复交互 | Worker |
TaskPool 轻量易用,适合大多数短任务场景;Worker 灵活强大,适合需要精细控制的复杂场景。根据实际业务选择合适的方案,才能在鸿蒙平台上写出高性能的应用。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)