Learn Claude Code
s08

后台任务

并发

Fire and Forget

193 LOC6 个工具BackgroundManager + notification queue
Non-blocking daemon threads + notification queue

BackgroundManager 在独立线程中运行命令, 在每次 LLM 调用前排空通知队列, 使智能体永远不会因长时间运行的操作而阻塞。

问题

有些命令需要几分钟: npm installpytestdocker build。在阻塞式的 agent loop 中, 模型只能干等子进程结束, 什么也做不了。如果用户要求 "安装依赖, 同时创建配置文件", 智能体会先安装, 然后才创建配置 -- 串行执行, 而非并行。

智能体需要并发能力。不是将 agent loop 本身完全多线程化, 而是能够发起一个长时间命令然后继续工作。当命令完成时, 结果自然地出现在对话中。

解决方案是一个 BackgroundManager, 它在守护线程中运行命令, 将结果收集到通知队列中。每次 LLM 调用前, 队列被排空, 结果注入到消息中。

解决方案

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | task executes   |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- notification queue --+
                                      |
                           [results injected before
                            next LLM call]

工作原理

  1. BackgroundManager 追踪任务并维护一个线程安全的通知队列。
class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()
  1. run() 启动一个守护线程并立即返回 task_id。
def run(self, command: str) -> str:
    task_id = str(uuid.uuid4())[:8]
    self.tasks[task_id] = {
        "status": "running",
        "result": None,
        "command": command,
    }
    thread = threading.Thread(
        target=self._execute,
        args=(task_id, command),
        daemon=True,
    )
    thread.start()
    return f"Background task {task_id} started"
  1. 线程目标函数 _execute 运行子进程并将结果推入通知队列。
def _execute(self, task_id: str, command: str):
    try:
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        output = (r.stdout + r.stderr).strip()[:50000]
        status = "completed"
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
        status = "timeout"
    self.tasks[task_id]["status"] = status
    self.tasks[task_id]["result"] = output
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id,
            "status": status,
            "result": output[:500],
        })
  1. drain_notifications() 返回并清空待处理的结果。
def drain_notifications(self) -> list:
    with self._lock:
        notifs = list(self._notification_queue)
        self._notification_queue.clear()
    return notifs
  1. Agent loop 在每次 LLM 调用前排空通知。
def agent_loop(messages: list):
    while True:
        notifs = BG.drain_notifications()
        if notifs and messages:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['status']}: "
                f"{n['result']}" for n in notifs
            )
            messages.append({"role": "user",
                "content": f"<background-results>"
                           f"\n{notif_text}\n"
                           f"</background-results>"})
            messages.append({"role": "assistant",
                "content": "Noted background results."})
        response = client.messages.create(...)

核心代码

BackgroundManager (来自 agents/s08_background_tasks.py, 第 49-107 行):

class BackgroundManager:
    def __init__(self):
        self.tasks = {}
        self._notification_queue = []
        self._lock = threading.Lock()

    def run(self, command: str) -> str:
        task_id = str(uuid.uuid4())[:8]
        self.tasks[task_id] = {"status": "running",
                               "result": None,
                               "command": command}
        thread = threading.Thread(
            target=self._execute,
            args=(task_id, command), daemon=True)
        thread.start()
        return f"Background task {task_id} started"

    def _execute(self, task_id, command):
        # run subprocess, push to queue
        ...

    def drain_notifications(self) -> list:
        with self._lock:
            notifs = list(self._notification_queue)
            self._notification_queue.clear()
        return notifs

相对 s07 的变更

组件之前 (s07)之后 (s08)
Tools86 (基础 + background_run + check)
执行方式仅阻塞阻塞 + 后台线程
通知机制每轮排空的队列
并发守护线程

设计原理

智能体循环本质上是单线程的 (一次一个 LLM 调用)。后台线程为 I/O 密集型工作 (测试、构建、安装) 打破了这个限制。通知队列模式 ("在下一次 LLM 调用前排空") 确保结果在对话的自然间断点到达, 而不是打断模型的推理过程。这是一个最小化的并发模型: 智能体循环保持单线程和确定性, 只有 I/O 密集型的子进程执行被并行化。

试一试

cd learn-claude-code
python agents/s08_background_tasks.py

可以尝试的提示:

  1. Run "sleep 5 && echo done" in the background, then create a file while it runs
  2. Start 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.
  3. Run pytest in the background and keep working on other things