命令车道队列(Command Lane Queue)¶
进程级最底层队列,负责序列化所有 agent 执行。通过 lane(车道) 隔离不同来源的任务。
源文件:
src/process/command-queue.ts(527 行)
架构¶
graph LR
A[enqueueCommandInLane] --> B{gatewayDraining?}
B -->|是| C[GatewayDrainingError]
B -->|否| D[push 到 LaneState.queue]
D --> E[drainLane]
E --> F[pump 循环]
F --> G{active < max && queue > 0?}
G -->|是| H[shift QueueEntry]
H --> I[runQueueEntryTask]
I --> J{超时?}
J -->|否| K[entry.resolve]
J -->|是| L[CommandLaneTaskTimeoutError]
K --> G
L --> G
G -->|否| M[draining = false]
核心数据结构¶
src/process/command-queue.ts L49-66:
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
enqueuedAt: number;
warnAfterMs: number; // 默认 2000ms,超时打警告
taskTimeoutMs?: number; // 任务级超时
onWait?: (waitMs: number, queuedAhead: number) => void;
};
type LaneState = {
lane: string;
queue: QueueEntry[];
activeTaskIds: Set<number>; // 当前活跃任务 id 集合
maxConcurrent: number; // 并发上限
draining: boolean;
generation: number; // 每次 reset 递增,防旧任务污染
};
进程级单例¶
src/process/command-queue.ts L91-99:
const COMMAND_QUEUE_STATE_KEY = Symbol.for("openclaw.commandQueueState");
function getQueueState() {
const state = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({
gatewayDraining: false,
lanes: new Map<string, LaneState>(),
activeTaskWaiters: new Set<ActiveTaskWaiter>(),
nextTaskId: 1,
}));
// ...
}
Drain 核心循环¶
src/process/command-queue.ts L231-298:
function drainLane(lane: string) {
const state = getLaneState(lane);
if (state.draining) { return; }
state.draining = true;
const pump = () => {
try {
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
const entry = state.queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
// ... warn if waited too long
const taskId = getQueueState().nextTaskId++;
state.activeTaskIds.add(taskId);
void (async () => {
try {
const result = await runQueueEntryTask(lane, entry);
completeTask(state, taskId, taskGeneration);
pump(); // 递归消费
entry.resolve(result);
} catch (err) {
completeTask(state, taskId, taskGeneration);
pump();
entry.reject(err);
}
})();
}
} finally {
state.draining = false;
}
};
}
任务超时机制¶
src/process/command-queue.ts L196-229:
async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise<unknown> {
const taskPromise = Promise.resolve().then(entry.task);
const taskTimeoutMs = normalizeTaskTimeoutMs(entry.taskTimeoutMs);
if (taskTimeoutMs === undefined) {
return await taskPromise;
}
// Promise.race 实现超时
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutHandle = setTimeout(() => {
timedOut = true;
reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
}, taskTimeoutMs);
timeoutHandle.unref?.();
});
return await Promise.race([taskPromise, timeoutPromise]);
}
车道并发配置¶
| 车道名 | 默认并发 | 用途 |
|---|---|---|
main |
4 | 入站 auto-reply + 主心跳 |
subagent |
8 | 子 agent 执行 |
cron |
按 cron.maxConcurrentRuns |
定时任务外层 |
cron-nested |
同上 | 定时任务内层 agent run |
nested |
独立 | 嵌套非 cron 流程 |
session:<key> |
1(固定) | 保证单 session 串行 |
auth-probe:<x> |
1(固定) | 鉴权探测,静默错误 |
错误类型¶
src/process/command-queue.ts L14-42:
export class CommandLaneClearedError extends Error {
// clearCommandLane() 时 reject 所有排队条目
}
export class CommandLaneTaskTimeoutError extends Error {
// 任务超时,释放车道但底层任务继续异步收尾
}
export class GatewayDrainingError extends Error {
// 网关正在重启,拒绝新任务入队
}
恢复机制¶
resetCommandLane(lane):递增 generation,清空 activeTaskIds,重抽队列resetAllLanes():SIGUSR1 in-process 重启后调用,保留队列条目、清空活跃状态