这篇文档描述了如何在使用 Rust 以及 Tokio 编写一个健壮的任务管理器 TaskManager。它具有如下特征:
- ✅ 异步任务管理:基于
Tokio的异步运行时 - ✅ 优雅关闭:支持取消令牌(
CancellationToken)和超时控制 - ✅ 自动重启:配置化的重试机制与指数退避策略
- ✅ 任务依赖:支持任务间依赖关系管理
- ✅ 优先级调度:基于优先级的任务启动顺序
- ✅ 状态追踪:完整的任务状态机和执行指标
- ✅ 并发安全:使用
Arc + RwLock保证线程安全
结构设计

初始化项目
在开始之前,你需要先初始化一个项目:
cargo init task_manager_exampleCargo.toml 如下:
[package]
name = "task-manager-example"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.34.0", features = ["full"] }然后你会得到一个 hello world 的示例程序,src/main.rs 内容如下:
fn main() {
println!("Hello, world!");
}编写 Task Manager
千里之行,始于足下。首先,我们来编写 TaskManager 这个最基础的组件:
use std::collections::HashMap;
use tokio::task::JoinHandle;
struct TaskManager {
pub tasks: HashMap<String, JoinHandle<()>>,
}TaskManager 就一个字段 HashMap 类型的 tasks, 包含了多个 task。
重点来说 tokio::task::JoinHandle, 注意不要引入成 std::thread::JoinHandle , 后者是标准库中用来创建 OS 原生线程的,而 tokio 提供的是协程模型。

JoinHandle 是对 Future 的包装,对其生命周期进行管理,并且提供了跨任务通信机制。
实现 new()
接着,我们来实现 new(), 初始化 tasks 字段:
impl TaskManager {
pub fn new() -> Self {
Self {
tasks: HashMap::new(),
}
}
}这里我特意留了一个坑,你知道是什么吗?
实现 add_task()
上面我们已经初始化了 tasks 这个字段,但它只是一个 HashMap 类型的容器,我们需要通过 add_task() 方法,将任务加入进去,填充它:
impl TaskManager {
// ...省略...
pub fn add_task(&mut self, name: String, handle: JoinHandle<()>) {
self.tasks.insert(name, handle);
}
}实现 run()
接着,我们就可以来实现 run 方法了,代码如下:
pub async fn run(self) {
for (name, handle) in self.tasks {
match handle.await {
Ok(_) => println!("Task '{}' completed successfully", name),
Err(e) => eprintln!("Task '{}' failed: {:?}", name, e),
}
}
}代码比较简单,就是遍历 tasks ,逐个执行。因为 JoinHandle 是一个 Future 的包装器,别忘了加上 async 和 await 。
测试
最后,我们来测试这个最基础版本的 TaskManager, 在 main 函数中 add_task 后调用 run 方法:
#[tokio::main]
async fn main() {
let mut manager = TaskManager::new();
manager.add_task(
"task1".to_string(),
tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("Task 1 executing...");
}),
);
manager.add_task(
"task2".to_string(),
tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("Task 2 executing...");
}),
);
manager.add_task(
"task3".to_string(),
tokio::spawn(async {
println!("Task 3 executing immediately...");
}),
);
println!("Starting all tasks...");
manager.run().await;
println!("All tasks completed!");
}猜一个,println! 的输出的顺序是什么?其实是不固定的。
另外还有一个问题,TaskManager 中的 tasks 没有 Lock 的保护,是否存在并发问题?答案是不存在的,原因如下:
- 单一所有权:
TaskManager在main函数中只有一个可变引用,所有操作都是顺序的; - 没有跨任务共享:
HashMap本身没有在多个任务之间共享; - 消费式设计:
run()方法消费self, 之后无法再访问HashMap了;
实现 TaskHandle
在 tokio 生态中,一个任务就是 JoinHandle。但是在生产中, JoinHandle 只是最基础的结构,并不能满足我们更多的需求,比如状态管理、优雅关机等。
所以我们需要在 JoinHandle 的基础上,再包装一层,实现 TaskHandle , 核心作用是**实现 Task 完整的生命周期管理:发送取消信号 -> 等待优雅关闭 -> 超时强制中止,以及实现任务状态的查询和监控。
实现基础结构
根据上面的结构图,我们来新增 TaskHandle 的 struct:
struct TaskHandle {
join_handle: JoinHandle<()>,
}然后将 JoinHandle 全部替换为 TaskHandle:
struct TaskManager {
pub tasks: HashMap<String, TaskHandle>,
}
pub fn add_task(&mut self, name: String, handle: TaskHandle) {}
pub async fn run(self) {
for (name, handle) in self.tasks {
match handle.join_handle.await {} // 这里要改为 `handle.join_handle`
}
}
manager.add_task(
"task1".to_string(),
TaskHandle { // 在 join_handle 的外面包一层
join_handle: tokio::spawn(async {}),
},
);至此,我们只是用 TaskHandle 简单地替换了 JoinHandle ,并没有实现更多的能力。但是为后面扩展奠定了基础。这在计算机中很常见,也很重要,有一句话是这么说的:
All problems in computer science can be solved by another level of indirection。
所有的问题,如果加一层还不能解决,那就再加上一层(除了性能问题哦~)。当然这也不是真理:
All non-trivial abstractions, to some degree, are leaky.(所有非微不足道的抽象,到最后都是漏洞百出)。
Passive Manager Pattern
这样实现对吗?如果你仔细看上面的类图,你会发现 TaskManager 并没有一个 run 的方法,它采用的是 Passive Manager Pattern (被动管理器模式)。
所以正确的实现方式是,在 add_task 后,立即启动这个任务,而不是通过一个 run 方法来统一启动所有任务。