fix(cron): prevent duplicate proactive delivery on transient retry (#40646)
* fix(cron): prevent duplicate proactive delivery on transient retry * refactor: scope skipQueue to retryTransient path only Non-retrying direct delivery (structured content / thread) keeps the write-ahead queue so recoverPendingDeliveries can replay after a crash. Addresses review feedback from codex-connector. * fix: preserve write-ahead queue on initial delivery attempt The first call through retryTransientDirectCronDelivery now keeps the write-ahead queue entry so recoverPendingDeliveries can replay after a crash. Only subsequent retry attempts set skipQueue to prevent duplicate sends. Addresses second codex-connector review on ea5ae5c. * ci: retrigger checks * Cron: bypass write-ahead queue for direct isolated delivery * Tests: assert isolated cron skipQueue invariants * Changelog: add cron duplicate-delivery fix entry --------- Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Cron/proactive delivery: keep isolated direct cron sends out of the write-ahead resend queue so transient-send retries do not replay duplicate proactive messages after restart. (#40646) Thanks @openperf and @vincentkoc.
|
||||
- TUI/chat log: reuse the active assistant message component for the same streaming run so `openclaw tui` no longer renders duplicate assistant replies. (#35364) Thanks @lisitan.
|
||||
- macOS/Reminders: add the missing `NSRemindersUsageDescription` to the bundled app so `apple-reminders` can trigger the system permission prompt from OpenClaw.app. (#8559) Thanks @dinakars777.
|
||||
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
|
||||
|
||||
@@ -217,6 +217,9 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
payloads: [{ text: "Detailed child result, everything finished successfully." }],
|
||||
}),
|
||||
);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ skipQueue: true }),
|
||||
);
|
||||
});
|
||||
|
||||
it("normal text delivery sends exactly once and sets deliveryAttempted=true", async () => {
|
||||
@@ -304,4 +307,69 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
expect(deliverOutboundPayloads).not.toHaveBeenCalled();
|
||||
expect(state.deliveryAttempted).toBe(false);
|
||||
});
|
||||
|
||||
it("text delivery always bypasses the write-ahead queue", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Daily digest ready." });
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(state.delivered).toBe(true);
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "telegram",
|
||||
to: "123456",
|
||||
payloads: [{ text: "Daily digest ready." }],
|
||||
skipQueue: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("structured/thread delivery also bypasses the write-ahead queue", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Report attached." });
|
||||
// Simulate structured content so useDirectDelivery path is taken (no retryTransient)
|
||||
(params as Record<string, unknown>).deliveryPayloadHasStructuredContent = true;
|
||||
await dispatchCronDelivery(params);
|
||||
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ skipQueue: true }),
|
||||
);
|
||||
});
|
||||
|
||||
it("transient retry delivers exactly once with skipQueue on both attempts", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
|
||||
// First call throws a transient error, second call succeeds.
|
||||
vi.mocked(deliverOutboundPayloads)
|
||||
.mockRejectedValueOnce(new Error("gateway timeout"))
|
||||
.mockResolvedValueOnce([{ ok: true } as never]);
|
||||
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
try {
|
||||
const params = makeBaseParams({ synthesizedText: "Retry test." });
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(state.delivered).toBe(true);
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
// Two calls total: first failed transiently, second succeeded.
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
|
||||
|
||||
const calls = vi.mocked(deliverOutboundPayloads).mock.calls;
|
||||
expect(calls[0][0]).toEqual(expect.objectContaining({ skipQueue: true }));
|
||||
expect(calls[1][0]).toEqual(expect.objectContaining({ skipQueue: true }));
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -157,7 +157,9 @@ function isTransientDirectCronDeliveryError(error: unknown): boolean {
|
||||
}
|
||||
|
||||
function resolveDirectCronRetryDelaysMs(): readonly number[] {
|
||||
return process.env.OPENCLAW_TEST_FAST === "1" ? [8, 16, 32] : [5_000, 10_000, 20_000];
|
||||
return process.env.NODE_ENV === "test" && process.env.OPENCLAW_TEST_FAST === "1"
|
||||
? [8, 16, 32]
|
||||
: [5_000, 10_000, 20_000];
|
||||
}
|
||||
|
||||
async function retryTransientDirectCronDelivery<T>(params: {
|
||||
@@ -256,6 +258,12 @@ export async function dispatchCronDelivery(
|
||||
bestEffort: params.deliveryBestEffort,
|
||||
deps: createOutboundSendDeps(params.deps),
|
||||
abortSignal: params.abortSignal,
|
||||
// Isolated cron direct delivery uses its own transient retry loop.
|
||||
// Keep all attempts out of the write-ahead delivery queue so a
|
||||
// late-successful first send cannot leave behind a failed queue
|
||||
// entry that replays on the next restart.
|
||||
// See: https://github.com/openclaw/openclaw/issues/40545
|
||||
skipQueue: true,
|
||||
});
|
||||
const deliveryResults = options?.retryTransient
|
||||
? await retryTransientDirectCronDelivery({
|
||||
|
||||
Reference in New Issue
Block a user