From ade11ec892ddedf0241ab6f7b14ef7c636edd8c8 Mon Sep 17 00:00:00 2001 From: Marcus Widing Date: Sun, 15 Feb 2026 16:34:34 +0100 Subject: [PATCH] fix(announce): use deterministic idempotency keys to prevent duplicate subagent announces (#17150) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 54bba3cea1bcb74e9048aeb9c4968cb2629530c7 Co-authored-by: widingmarcus-cyber <245375637+widingmarcus-cyber@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + src/agents/announce-idempotency.ts | 25 ++++++ src/agents/subagent-announce-queue.ts | 3 + .../subagent-announce.format.e2e.test.ts | 76 +++++++++++++++++++ src/agents/subagent-announce.ts | 30 +++++++- 5 files changed, 132 insertions(+), 3 deletions(-) create mode 100644 src/agents/announce-idempotency.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 910a00ad2..9021f6596 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Telegram: retry inbound media `getFile` calls (3 attempts with backoff) and gracefully fall back to placeholder-only processing when retries fail, preventing dropped voice/media messages on transient Telegram network errors. (#16154) Thanks @yinghaosang. - Telegram: finalize streaming preview replies in place instead of sending a second final message, preventing duplicate Telegram assistant outputs at stream completion. (#17218) Thanks @obviyus. - Cron: infer `payload.kind="agentTurn"` for model-only `cron.update` payload patches, so partial agent-turn updates do not fail validation when `kind` is omitted. (#15664) Thanks @rodrigouroz. +- Subagents: use child-run-based deterministic announce idempotency keys across direct and queued delivery paths (with legacy queued-item fallback) to prevent duplicate announce retries without collapsing distinct same-millisecond announces. (#17150) Thanks @widingmarcus-cyber. ## 2026.2.14 diff --git a/src/agents/announce-idempotency.ts b/src/agents/announce-idempotency.ts new file mode 100644 index 000000000..e792b2627 --- /dev/null +++ b/src/agents/announce-idempotency.ts @@ -0,0 +1,25 @@ +export type AnnounceIdFromChildRunParams = { + childSessionKey: string; + childRunId: string; +}; + +export function buildAnnounceIdFromChildRun(params: AnnounceIdFromChildRunParams): string { + return `v1:${params.childSessionKey}:${params.childRunId}`; +} + +export function buildAnnounceIdempotencyKey(announceId: string): string { + return `announce:${announceId}`; +} + +export function resolveQueueAnnounceId(params: { + announceId?: string; + sessionKey: string; + enqueuedAt: number; +}): string { + const announceId = params.announceId?.trim(); + if (announceId) { + return announceId; + } + // Backward-compatible fallback for queue items that predate announceId. + return `legacy:${params.sessionKey}:${params.enqueuedAt}`; +} diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 468cd02aa..eca237c66 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -14,6 +14,9 @@ import { } from "../utils/queue-helpers.js"; export type AnnounceQueueItem = { + // Stable announce identity shared by direct + queued delivery paths. + // Optional for backward compatibility with previously queued items. + announceId?: string; prompt: string; summaryLine?: string; enqueuedAt: number; diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 894a80eb3..752b2a07d 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -149,6 +149,28 @@ describe("subagent announce formatting", () => { expect(msg).toContain("completed successfully"); }); + it("uses child-run announce identity for direct idempotency", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-direct-idem", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.idempotencyKey).toBe( + "announce:v1:agent:main:subagent:worker:run-direct-idem", + ); + }); + it("keeps full findings and includes compact stats", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); sessionStore = { @@ -266,6 +288,60 @@ describe("subagent announce formatting", () => { expect(call?.params?.accountId).toBe("kev"); }); + it("keeps queued idempotency unique for same-ms distinct child runs", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + sessionStore = { + "agent:main:main": { + sessionId: "session-followup", + lastChannel: "whatsapp", + lastTo: "+1555", + queueMode: "followup", + queueDebounceMs: 0, + }, + }; + const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_700_000_000_000); + try { + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-1", + requesterSessionKey: "main", + requesterDisplayKey: "main", + task: "first task", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:worker", + childRunId: "run-2", + requesterSessionKey: "main", + requesterDisplayKey: "main", + task: "second task", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + } finally { + nowSpy.mockRestore(); + } + + await expect.poll(() => agentSpy.mock.calls.length).toBe(2); + const idempotencyKeys = agentSpy.mock.calls + .map((call) => (call[0] as { params?: Record })?.params?.idempotencyKey) + .filter((value): value is string => typeof value === "string"); + expect(idempotencyKeys).toContain("announce:v1:agent:main:subagent:worker:run-1"); + expect(idempotencyKeys).toContain("announce:v1:agent:main:subagent:worker:run-2"); + expect(new Set(idempotencyKeys).size).toBe(2); + }); + it("queues announce delivery back into requester subagent session", async () => { const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 4d454a04b..5293d9c45 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -1,4 +1,3 @@ -import crypto from "node:crypto"; import { resolveQueueSettings } from "../auto-reply/reply/queue.js"; import { loadConfig } from "../config/config.js"; import { @@ -16,6 +15,11 @@ import { mergeDeliveryContext, normalizeDeliveryContext, } from "../utils/delivery-context.js"; +import { + buildAnnounceIdFromChildRun, + buildAnnounceIdempotencyKey, + resolveQueueAnnounceId, +} from "./announce-idempotency.js"; import { isEmbeddedPiRunActive, queueEmbeddedPiMessage, @@ -113,6 +117,15 @@ async function sendAnnounce(item: AnnounceQueueItem) { const origin = item.origin; const threadId = origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined; + // Share one announce identity across direct and queued delivery paths so + // gateway dedupe suppresses true retries without collapsing distinct events. + const idempotencyKey = buildAnnounceIdempotencyKey( + resolveQueueAnnounceId({ + announceId: item.announceId, + sessionKey: item.sessionKey, + enqueuedAt: item.enqueuedAt, + }), + ); await callGateway({ method: "agent", params: { @@ -123,7 +136,7 @@ async function sendAnnounce(item: AnnounceQueueItem) { to: requesterIsSubagent ? undefined : origin?.to, threadId: requesterIsSubagent ? undefined : threadId, deliver: !requesterIsSubagent, - idempotencyKey: crypto.randomUUID(), + idempotencyKey, }, timeoutMs: 15_000, }); @@ -163,6 +176,7 @@ function loadRequesterSessionEntry(requesterSessionKey: string) { async function maybeQueueSubagentAnnounce(params: { requesterSessionKey: string; + announceId?: string; triggerMessage: string; summaryLine?: string; requesterOrigin?: DeliveryContext; @@ -199,6 +213,7 @@ async function maybeQueueSubagentAnnounce(params: { enqueueAnnounce({ key: canonicalKey, item: { + announceId: params.announceId, prompt: params.triggerMessage, summaryLine: params.summaryLine, enqueuedAt: Date.now(), @@ -543,8 +558,13 @@ export async function runSubagentAnnounceFlow(params: { replyInstruction, ].join("\n"); + const announceId = buildAnnounceIdFromChildRun({ + childSessionKey: params.childSessionKey, + childRunId: params.childRunId, + }); const queued = await maybeQueueSubagentAnnounce({ requesterSessionKey: targetRequesterSessionKey, + announceId, triggerMessage, summaryLine: taskLabel, requesterOrigin: targetRequesterOrigin, @@ -565,6 +585,10 @@ export async function runSubagentAnnounceFlow(params: { const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey); directOrigin = deliveryContextFromSession(entry); } + // Use a deterministic idempotency key so the gateway dedup cache + // catches duplicates if this announce is also queued by the gateway- + // level message queue while the main session is busy (#17122). + const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId); await callGateway({ method: "agent", params: { @@ -578,7 +602,7 @@ export async function runSubagentAnnounceFlow(params: { !requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== "" ? String(directOrigin.threadId) : undefined, - idempotencyKey: crypto.randomUUID(), + idempotencyKey: directIdempotencyKey, }, expectFinal: true, timeoutMs: 15_000,