Rust 编写任务管理器

这篇文档描述了如何在使用 Rust 以及 Tokio 编写一个健壮的任务管理器 TaskManager。它具有如下特征:

  • 异步任务管理:基于 Tokio 的异步运行时
  • 优雅关闭:支持取消令牌(CancellationToken)和超时控制
  • 自动重启:配置化的重试机制与指数退避策略
  • 任务依赖:支持任务间依赖关系管理
  • 优先级调度:基于优先级的任务启动顺序
  • 状态追踪:完整的任务状态机和执行指标
  • 并发安全:使用 Arc + RwLock 保证线程安全

结构设计

初始化项目

在开始之前,你需要先初始化一个项目:

cargo init task_manager_example

Cargo.toml 如下:

[package]
name = "task-manager-example"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.34.0", features = ["full"] }

然后你会得到一个 hello world 的示例程序,src/main.rs 内容如下:

fn main() {
    println!("Hello, world!");
}

编写 Task Manager

千里之行,始于足下。首先,我们来编写 TaskManager 这个最基础的组件:

use std::collections::HashMap;
use tokio::task::JoinHandle;

struct TaskManager {
    pub tasks: HashMap<String, JoinHandle<()>>,
}

TaskManager 就一个字段 HashMap 类型的 tasks, 包含了多个 task

重点来说 tokio::task::JoinHandle, 注意不要引入成 std::thread::JoinHandle , 后者是标准库中用来创建 OS 原生线程的,而 tokio 提供的是协程模型。

JoinHandle 是对 Future 的包装,对其生命周期进行管理,并且提供了跨任务通信机制。

实现 new()

接着,我们来实现 new(), 初始化 tasks 字段:

impl TaskManager {
    pub fn new() -> Self {
        Self {
            tasks: HashMap::new(),
        }
    }
}

这里我特意留了一个坑,你知道是什么吗?

实现 add_task()

上面我们已经初始化了 tasks 这个字段,但它只是一个 HashMap 类型的容器,我们需要通过 add_task() 方法,将任务加入进去,填充它:

impl TaskManager {
    // ...省略...

    pub fn add_task(&mut self, name: String, handle: JoinHandle<()>) {
        self.tasks.insert(name, handle);
    }
}

实现 run()

接着,我们就可以来实现 run 方法了,代码如下:

pub async fn run(self) {
  for (name, handle) in self.tasks {
    match handle.await {
      Ok(_) => println!("Task '{}' completed successfully", name),
      Err(e) => eprintln!("Task '{}' failed: {:?}", name, e),
    }
  }
}

代码比较简单,就是遍历 tasks ,逐个执行。因为 JoinHandle 是一个 Future 的包装器,别忘了加上 asyncawait

测试

最后,我们来测试这个最基础版本的 TaskManager, 在 main 函数中 add_task 后调用 run 方法:

#[tokio::main]
async fn main() {
    let mut manager = TaskManager::new();

    manager.add_task(
        "task1".to_string(),
        tokio::spawn(async {
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("Task 1 executing...");
        }),
    );

    manager.add_task(
        "task2".to_string(),
        tokio::spawn(async {
            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
            println!("Task 2 executing...");
        }),
    );

    manager.add_task(
        "task3".to_string(),
        tokio::spawn(async {
            println!("Task 3 executing immediately...");
        }),
    );

    println!("Starting all tasks...");
    manager.run().await;
    println!("All tasks completed!");
}

猜一个,println! 的输出的顺序是什么?其实是不固定的。

另外还有一个问题,TaskManager 中的 tasks 没有 Lock 的保护,是否存在并发问题?答案是不存在的,原因如下:

  • 单一所有权:TaskManagermain 函数中只有一个可变引用,所有操作都是顺序的;
  • 没有跨任务共享:HashMap 本身没有在多个任务之间共享;
  • 消费式设计:run() 方法消费 self, 之后无法再访问 HashMap 了;

实现 TaskHandle

在 tokio 生态中,一个任务就是 JoinHandle。但是在生产中, JoinHandle 只是最基础的结构,并不能满足我们更多的需求,比如状态管理、优雅关机等。

所以我们需要在 JoinHandle 的基础上,再包装一层,实现 TaskHandle , 核心作用是**实现 Task 完整的生命周期管理:发送取消信号 -> 等待优雅关闭 -> 超时强制中止,以及实现任务状态的查询和监控。

实现基础结构

根据上面的结构图,我们来新增 TaskHandlestruct

struct TaskHandle {
    join_handle: JoinHandle<()>,
}

然后将 JoinHandle 全部替换为 TaskHandle:

struct TaskManager {
    pub tasks: HashMap<String, TaskHandle>,
}

pub fn add_task(&mut self, name: String, handle: TaskHandle) {}

pub async fn run(self) {
    for (name, handle) in self.tasks {
        match handle.join_handle.await {}  // 这里要改为 `handle.join_handle`
    }
}

manager.add_task(
    "task1".to_string(),
    TaskHandle {    // 在 join_handle 的外面包一层
        join_handle: tokio::spawn(async {}),
    },
);

至此,我们只是用 TaskHandle 简单地替换了 JoinHandle ,并没有实现更多的能力。但是为后面扩展奠定了基础。这在计算机中很常见,也很重要,有一句话是这么说的:

All problems in computer science can be solved by another level of indirection。

所有的问题,如果加一层还不能解决,那就再加上一层(除了性能问题哦~)。当然这也不是真理:

All non-trivial abstractions, to some degree, are leaky.(所有非微不足道的抽象,到最后都是漏洞百出)。

Passive Manager Pattern

这样实现对吗?如果你仔细看上面的类图,你会发现 TaskManager 并没有一个 run 的方法,它采用的是 Passive Manager Pattern (被动管理器模式)。

所以正确的实现方式是,在 add_task 后,立即启动这个任务,而不是通过一个 run 方法来统一启动所有任务。