跳转至

Outbound 持久化队列

出站消息落盘队列,保证消息不丢失。两阶段 ack 防崩溃,指数退避重试。

源文件: src/infra/outbound/delivery-queue-storage.ts (278 行)


架构

graph TB
    A[出站消息] --> B[enqueueDelivery]
    B --> C[写入 .json<br>原子 tmp → rename]
    C --> D[立即 deliver]

    D -->|成功| E[ackDelivery]
    E --> E1[Phase 1: rename .json → .delivered]
    E1 --> E2[Phase 2: unlink .delivered]

    D -->|失败| F[failDelivery]
    F --> G{retryCount >= 5?}
    G -->|是| H[moveToFailed]
    G -->|否| I[指数退避]

    I --> J{退避时间到?}
    J -->|否| J1[跳过,等下次 recovery]
    J -->|是| D

    K[启动/重连] --> L[recoverPendingDeliveries]
    L --> M[loadPendingDeliveries]
    M --> N[按 enqueuedAt 排序]
    N --> G

核心数据结构

src/infra/outbound/delivery-queue-storage.ts L40-46:

export interface QueuedDelivery extends QueuedDeliveryPayload {
  id: string;           // UUID
  enqueuedAt: number;
  retryCount: number;
  lastAttemptAt?: number;
  lastError?: string;
}

两阶段 Ack(防崩溃)

src/infra/outbound/delivery-queue-storage.ts L163-176:

/**
 * Uses a two-phase approach so that a crash between delivery and cleanup
 * does not cause the message to be replayed on the next recovery scan:
 *   Phase 1: atomic rename  {id}.json → {id}.delivered
 *   Phase 2: unlink the .delivered marker
 */
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
  const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir);
  try {
    await fs.promises.rename(jsonPath, deliveredPath);  // Phase 1
  } catch (err) {
    // ...
  }
  // Phase 2: unlink
}

原子写入

src/infra/outbound/delivery-queue-storage.ts L85-92:

async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise<void> {
  const tmp = `${filePath}.${process.pid}.tmp`;
  await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
    encoding: "utf-8",
    mode: 0o600,
  });
  await fs.promises.rename(tmp, filePath);  // 原子 rename
}

指数退避策略

retryCount 等待时间
1 5s
2 25s
3 2m
4 10m
5+ 10m(上限)或 moveToFailed

永久错误模式

正则匹配,直接 moveToFailed,不再重试: - chat not found - bot was blocked - user not found - 等

存储路径

<stateDir>/delivery-queue/
├── <uuid>.json           # 待发送
├── <uuid>.delivered      # 已确认(临时,即将删除)
└── failed/
    └── <uuid>.json       # 永久失败