入站 Followup 队列¶
核心消息调度层,支持 6 种队列模式,决定新消息如何与当前活跃 Run 交互。
源文件目录:
src/auto-reply/reply/queue/
6 种队列模式¶
graph TB
A[新消息到达] --> B{resolveActiveRunQueueAction}
B -->|run-now| C[立即执行]
B -->|enqueue-followup| D[入队]
B -->|drop| E[丢弃]
D --> F{QueueMode?}
F -->|steer| G[注入活跃 Run<br>model boundary 后]
F -->|followup| H[等 Run 结束再起新 Run]
F -->|collect| I[合并多条消息<br>为一个 followup turn]
F -->|steer-backlog| J[steer + 保留 followup]
F -->|interrupt| K[中止当前 Run<br>立即执行]
F -->|queue| L[legacy: 一次只 steer 一条]
核心数据结构¶
src/auto-reply/reply/queue/types.ts L12-21:
export type QueueMode = "steer" | "followup" | "collect" | "steer-backlog" | "interrupt" | "queue";
export type QueueDropPolicy = "old" | "new" | "summarize";
export type QueueSettings = {
mode: QueueMode;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
};
src/auto-reply/reply/queue/state.ts L6-17:
export type FollowupQueueState = {
items: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
debounceMs: number; // 默认 500ms
cap: number; // 默认 20
dropPolicy: QueueDropPolicy; // 默认 "summarize"
droppedCount: number;
summaryLines: string[];
lastRun?: FollowupRun["run"];
};
全局单例¶
src/auto-reply/reply/queue/state.ts L27-29:
const FOLLOWUP_QUEUES_KEY = Symbol.for("openclaw.followupQueues");
export const FOLLOWUP_QUEUES = resolveGlobalMap<string, FollowupQueueState>(FOLLOWUP_QUEUES_KEY);
入队与去重¶
src/auto-reply/reply/queue/enqueue.ts L13-18:
// 消息去重缓存:TTL 5min,上限 10000
const RECENT_QUEUE_MESSAGE_IDS = resolveGlobalDedupeCache(
Symbol.for("openclaw.recentQueueMessageIds"),
{ ttlMs: 5 * 60 * 1000, maxSize: 10_000 }
);
src/auto-reply/reply/queue/enqueue.ts L59-80:
export function enqueueFollowupRun(
key: string, run: FollowupRun, settings: QueueSettings,
dedupeMode: QueueDedupeMode = "message-id",
runFollowup?: (run: FollowupRun) => Promise<void>,
restartIfIdle = true,
): boolean {
const queue = getFollowupQueue(key, settings);
// 检查 message-id 去重
if (recentMessageIdKey && RECENT_QUEUE_MESSAGE_IDS.peek(recentMessageIdKey)) {
return false;
}
// 检查 prompt 去重
if (shouldSkipQueueItem({ item: run, items: queue.items, dedupe })) {
return false;
}
// applyQueueDropPolicy → cap 满时按策略处理
}
Drain 循环¶
src/auto-reply/reply/queue/drain.ts L142-180:
export function scheduleFollowupDrain(
key: string, runFollowup: (run: FollowupRun) => Promise<void>,
): void {
const queue = beginQueueDrain(FOLLOWUP_QUEUES, key);
if (!queue) { return; }
void (async () => {
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue); // 等 500ms 安静窗口
if (queue.mode === "collect") {
// 跨 channel 检查 → 拆分 → 合并 prompt → runFollowup
const isCrossChannel = hasCrossChannelItems(queue.items, resolveCrossChannelKey);
await drainCollectQueueStep({ ... });
} else {
await drainNextQueueItem({ ... });
}
}
})();
}
Drop 策略¶
| 策略 | 行为 |
|---|---|
summarize(默认) |
丢弃最老条目,保留文本摘要,后续作为 synthetic prompt 注入 |
old |
丢弃最老条目,不保留摘要 |
new |
拒绝新条目入队 |
Collect 模式合并逻辑¶
src/auto-reply/reply/queue/drain.ts L84-111:
// 按 sender 权限分组,不同权限的消息不能合并
function splitCollectItemsByAuthorization(items: FollowupRun[]): FollowupRun[][] {
const groups: FollowupRun[][] = [];
let currentGroup: FollowupRun[] = [];
let currentKey: string | undefined;
for (const item of items) {
const itemKey = resolveFollowupAuthorizationKey(item.run);
if (currentGroup.length === 0 || itemKey === currentKey) {
currentGroup.push(item);
currentKey = itemKey;
} else {
groups.push(currentGroup);
currentGroup = [item];
currentKey = itemKey;
}
}
return groups;
}