跳转至

命令车道队列(Command Lane Queue)

进程级最底层队列,负责序列化所有 agent 执行。通过 lane(车道) 隔离不同来源的任务。

源文件: src/process/command-queue.ts (527 行)


架构

graph LR
    A[enqueueCommandInLane] --> B{gatewayDraining?}
    B -->|是| C[GatewayDrainingError]
    B -->|否| D[push 到 LaneState.queue]
    D --> E[drainLane]
    E --> F[pump 循环]
    F --> G{active < max && queue > 0?}
    G -->|是| H[shift QueueEntry]
    H --> I[runQueueEntryTask]
    I --> J{超时?}
    J -->|否| K[entry.resolve]
    J -->|是| L[CommandLaneTaskTimeoutError]
    K --> G
    L --> G
    G -->|否| M[draining = false]

核心数据结构

src/process/command-queue.ts L49-66:

type QueueEntry = {
  task: () => Promise<unknown>;
  resolve: (value: unknown) => void;
  reject: (reason?: unknown) => void;
  enqueuedAt: number;
  warnAfterMs: number;       // 默认 2000ms,超时打警告
  taskTimeoutMs?: number;    // 任务级超时
  onWait?: (waitMs: number, queuedAhead: number) => void;
};

type LaneState = {
  lane: string;
  queue: QueueEntry[];
  activeTaskIds: Set<number>;   // 当前活跃任务 id 集合
  maxConcurrent: number;        // 并发上限
  draining: boolean;
  generation: number;           // 每次 reset 递增,防旧任务污染
};

进程级单例

src/process/command-queue.ts L91-99:

const COMMAND_QUEUE_STATE_KEY = Symbol.for("openclaw.commandQueueState");

function getQueueState() {
  const state = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
    gatewayDraining: false,
    lanes: new Map<string, LaneState>(),
    activeTaskWaiters: new Set<ActiveTaskWaiter>(),
    nextTaskId: 1,
  }));
  // ...
}

Drain 核心循环

src/process/command-queue.ts L231-298:

function drainLane(lane: string) {
  const state = getLaneState(lane);
  if (state.draining) { return; }
  state.draining = true;

  const pump = () => {
    try {
      while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
        const entry = state.queue.shift() as QueueEntry;
        const waitedMs = Date.now() - entry.enqueuedAt;
        // ... warn if waited too long
        const taskId = getQueueState().nextTaskId++;
        state.activeTaskIds.add(taskId);
        void (async () => {
          try {
            const result = await runQueueEntryTask(lane, entry);
            completeTask(state, taskId, taskGeneration);
            pump();  // 递归消费
            entry.resolve(result);
          } catch (err) {
            completeTask(state, taskId, taskGeneration);
            pump();
            entry.reject(err);
          }
        })();
      }
    } finally {
      state.draining = false;
    }
  };
}

任务超时机制

src/process/command-queue.ts L196-229:

async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise<unknown> {
  const taskPromise = Promise.resolve().then(entry.task);
  const taskTimeoutMs = normalizeTaskTimeoutMs(entry.taskTimeoutMs);
  if (taskTimeoutMs === undefined) {
    return await taskPromise;
  }
  // Promise.race 实现超时
  const timeoutPromise = new Promise<never>((_, reject) => {
    timeoutHandle = setTimeout(() => {
      timedOut = true;
      reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
    }, taskTimeoutMs);
    timeoutHandle.unref?.();
  });
  return await Promise.race([taskPromise, timeoutPromise]);
}

车道并发配置

车道名 默认并发 用途
main 4 入站 auto-reply + 主心跳
subagent 8 子 agent 执行
cron cron.maxConcurrentRuns 定时任务外层
cron-nested 同上 定时任务内层 agent run
nested 独立 嵌套非 cron 流程
session:<key> 1(固定) 保证单 session 串行
auth-probe:<x> 1(固定) 鉴权探测,静默错误

错误类型

src/process/command-queue.ts L14-42:

export class CommandLaneClearedError extends Error {
  // clearCommandLane() 时 reject 所有排队条目
}

export class CommandLaneTaskTimeoutError extends Error {
  // 任务超时,释放车道但底层任务继续异步收尾
}

export class GatewayDrainingError extends Error {
  // 网关正在重启,拒绝新任务入队
}

恢复机制

  • resetCommandLane(lane):递增 generation,清空 activeTaskIds,重抽队列
  • resetAllLanes():SIGUSR1 in-process 重启后调用,保留队列条目、清空活跃状态