跳转至

入站 Followup 队列

核心消息调度层,支持 6 种队列模式,决定新消息如何与当前活跃 Run 交互。

源文件目录: src/auto-reply/reply/queue/


6 种队列模式

graph TB
    A[新消息到达] --> B{resolveActiveRunQueueAction}
    B -->|run-now| C[立即执行]
    B -->|enqueue-followup| D[入队]
    B -->|drop| E[丢弃]

    D --> F{QueueMode?}
    F -->|steer| G[注入活跃 Run<br>model boundary 后]
    F -->|followup| H[等 Run 结束再起新 Run]
    F -->|collect| I[合并多条消息<br>为一个 followup turn]
    F -->|steer-backlog| J[steer + 保留 followup]
    F -->|interrupt| K[中止当前 Run<br>立即执行]
    F -->|queue| L[legacy: 一次只 steer 一条]

核心数据结构

src/auto-reply/reply/queue/types.ts L12-21:

export type QueueMode = "steer" | "followup" | "collect" | "steer-backlog" | "interrupt" | "queue";

export type QueueDropPolicy = "old" | "new" | "summarize";

export type QueueSettings = {
  mode: QueueMode;
  debounceMs?: number;
  cap?: number;
  dropPolicy?: QueueDropPolicy;
};

src/auto-reply/reply/queue/state.ts L6-17:

export type FollowupQueueState = {
  items: FollowupRun[];
  draining: boolean;
  lastEnqueuedAt: number;
  mode: QueueMode;
  debounceMs: number;          // 默认 500ms
  cap: number;                 // 默认 20
  dropPolicy: QueueDropPolicy; // 默认 "summarize"
  droppedCount: number;
  summaryLines: string[];
  lastRun?: FollowupRun["run"];
};

全局单例

src/auto-reply/reply/queue/state.ts L27-29:

const FOLLOWUP_QUEUES_KEY = Symbol.for("openclaw.followupQueues");
export const FOLLOWUP_QUEUES = resolveGlobalMap<string, FollowupQueueState>(FOLLOWUP_QUEUES_KEY);

入队与去重

src/auto-reply/reply/queue/enqueue.ts L13-18:

// 消息去重缓存:TTL 5min,上限 10000
const RECENT_QUEUE_MESSAGE_IDS = resolveGlobalDedupeCache(
  Symbol.for("openclaw.recentQueueMessageIds"),
  { ttlMs: 5 * 60 * 1000, maxSize: 10_000 }
);

src/auto-reply/reply/queue/enqueue.ts L59-80:

export function enqueueFollowupRun(
  key: string, run: FollowupRun, settings: QueueSettings,
  dedupeMode: QueueDedupeMode = "message-id",
  runFollowup?: (run: FollowupRun) => Promise<void>,
  restartIfIdle = true,
): boolean {
  const queue = getFollowupQueue(key, settings);
  // 检查 message-id 去重
  if (recentMessageIdKey && RECENT_QUEUE_MESSAGE_IDS.peek(recentMessageIdKey)) {
    return false;
  }
  // 检查 prompt 去重
  if (shouldSkipQueueItem({ item: run, items: queue.items, dedupe })) {
    return false;
  }
  // applyQueueDropPolicy → cap 满时按策略处理
}

Drain 循环

src/auto-reply/reply/queue/drain.ts L142-180:

export function scheduleFollowupDrain(
  key: string, runFollowup: (run: FollowupRun) => Promise<void>,
): void {
  const queue = beginQueueDrain(FOLLOWUP_QUEUES, key);
  if (!queue) { return; }
  void (async () => {
    while (queue.items.length > 0 || queue.droppedCount > 0) {
      await waitForQueueDebounce(queue);  // 等 500ms 安静窗口
      if (queue.mode === "collect") {
        // 跨 channel 检查 → 拆分 → 合并 prompt → runFollowup
        const isCrossChannel = hasCrossChannelItems(queue.items, resolveCrossChannelKey);
        await drainCollectQueueStep({ ... });
      } else {
        await drainNextQueueItem({ ... });
      }
    }
  })();
}

Drop 策略

策略 行为
summarize(默认) 丢弃最老条目,保留文本摘要,后续作为 synthetic prompt 注入
old 丢弃最老条目,不保留摘要
new 拒绝新条目入队

Collect 模式合并逻辑

src/auto-reply/reply/queue/drain.ts L84-111:

// 按 sender 权限分组,不同权限的消息不能合并
function splitCollectItemsByAuthorization(items: FollowupRun[]): FollowupRun[][] {
  const groups: FollowupRun[][] = [];
  let currentGroup: FollowupRun[] = [];
  let currentKey: string | undefined;
  for (const item of items) {
    const itemKey = resolveFollowupAuthorizationKey(item.run);
    if (currentGroup.length === 0 || itemKey === currentKey) {
      currentGroup.push(item);
      currentKey = itemKey;
    } else {
      groups.push(currentGroup);
      currentGroup = [item];
      currentKey = itemKey;
    }
  }
  return groups;
}