Outbound 持久化队列¶
出站消息落盘队列,保证消息不丢失。两阶段 ack 防崩溃,指数退避重试。
源文件:
src/infra/outbound/delivery-queue-storage.ts(278 行)
架构¶
graph TB
A[出站消息] --> B[enqueueDelivery]
B --> C[写入 .json<br>原子 tmp → rename]
C --> D[立即 deliver]
D -->|成功| E[ackDelivery]
E --> E1[Phase 1: rename .json → .delivered]
E1 --> E2[Phase 2: unlink .delivered]
D -->|失败| F[failDelivery]
F --> G{retryCount >= 5?}
G -->|是| H[moveToFailed]
G -->|否| I[指数退避]
I --> J{退避时间到?}
J -->|否| J1[跳过,等下次 recovery]
J -->|是| D
K[启动/重连] --> L[recoverPendingDeliveries]
L --> M[loadPendingDeliveries]
M --> N[按 enqueuedAt 排序]
N --> G
核心数据结构¶
src/infra/outbound/delivery-queue-storage.ts L40-46:
export interface QueuedDelivery extends QueuedDeliveryPayload {
id: string; // UUID
enqueuedAt: number;
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
}
两阶段 Ack(防崩溃)¶
src/infra/outbound/delivery-queue-storage.ts L163-176:
/**
* Uses a two-phase approach so that a crash between delivery and cleanup
* does not cause the message to be replayed on the next recovery scan:
* Phase 1: atomic rename {id}.json → {id}.delivered
* Phase 2: unlink the .delivered marker
*/
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir);
try {
await fs.promises.rename(jsonPath, deliveredPath); // Phase 1
} catch (err) {
// ...
}
// Phase 2: unlink
}
原子写入¶
src/infra/outbound/delivery-queue-storage.ts L85-92:
async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise<void> {
const tmp = `${filePath}.${process.pid}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
await fs.promises.rename(tmp, filePath); // 原子 rename
}
指数退避策略¶
| retryCount | 等待时间 |
|---|---|
| 1 | 5s |
| 2 | 25s |
| 3 | 2m |
| 4 | 10m |
| 5+ | 10m(上限)或 moveToFailed |
永久错误模式¶
正则匹配,直接 moveToFailed,不再重试:
- chat not found
- bot was blocked
- user not found
- 等