fix(announce): use deterministic idempotency keys to prevent duplicate subagent announces (#17150)
Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 54bba3cea1bcb74e9048aeb9c4968cb2629530c7 Co-authored-by: widingmarcus-cyber <245375637+widingmarcus-cyber@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Telegram: retry inbound media `getFile` calls (3 attempts with backoff) and gracefully fall back to placeholder-only processing when retries fail, preventing dropped voice/media messages on transient Telegram network errors. (#16154) Thanks @yinghaosang.
|
||||
- Telegram: finalize streaming preview replies in place instead of sending a second final message, preventing duplicate Telegram assistant outputs at stream completion. (#17218) Thanks @obviyus.
|
||||
- Cron: infer `payload.kind="agentTurn"` for model-only `cron.update` payload patches, so partial agent-turn updates do not fail validation when `kind` is omitted. (#15664) Thanks @rodrigouroz.
|
||||
- Subagents: use child-run-based deterministic announce idempotency keys across direct and queued delivery paths (with legacy queued-item fallback) to prevent duplicate announce retries without collapsing distinct same-millisecond announces. (#17150) Thanks @widingmarcus-cyber.
|
||||
|
||||
## 2026.2.14
|
||||
|
||||
|
||||
25
src/agents/announce-idempotency.ts
Normal file
25
src/agents/announce-idempotency.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
export type AnnounceIdFromChildRunParams = {
|
||||
childSessionKey: string;
|
||||
childRunId: string;
|
||||
};
|
||||
|
||||
export function buildAnnounceIdFromChildRun(params: AnnounceIdFromChildRunParams): string {
|
||||
return `v1:${params.childSessionKey}:${params.childRunId}`;
|
||||
}
|
||||
|
||||
export function buildAnnounceIdempotencyKey(announceId: string): string {
|
||||
return `announce:${announceId}`;
|
||||
}
|
||||
|
||||
export function resolveQueueAnnounceId(params: {
|
||||
announceId?: string;
|
||||
sessionKey: string;
|
||||
enqueuedAt: number;
|
||||
}): string {
|
||||
const announceId = params.announceId?.trim();
|
||||
if (announceId) {
|
||||
return announceId;
|
||||
}
|
||||
// Backward-compatible fallback for queue items that predate announceId.
|
||||
return `legacy:${params.sessionKey}:${params.enqueuedAt}`;
|
||||
}
|
||||
@@ -14,6 +14,9 @@ import {
|
||||
} from "../utils/queue-helpers.js";
|
||||
|
||||
export type AnnounceQueueItem = {
|
||||
// Stable announce identity shared by direct + queued delivery paths.
|
||||
// Optional for backward compatibility with previously queued items.
|
||||
announceId?: string;
|
||||
prompt: string;
|
||||
summaryLine?: string;
|
||||
enqueuedAt: number;
|
||||
|
||||
@@ -149,6 +149,28 @@ describe("subagent announce formatting", () => {
|
||||
expect(msg).toContain("completed successfully");
|
||||
});
|
||||
|
||||
it("uses child-run announce identity for direct idempotency", async () => {
|
||||
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
|
||||
await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:worker",
|
||||
childRunId: "run-direct-idem",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do thing",
|
||||
timeoutMs: 1000,
|
||||
cleanup: "keep",
|
||||
waitForCompletion: false,
|
||||
startedAt: 10,
|
||||
endedAt: 20,
|
||||
outcome: { status: "ok" },
|
||||
});
|
||||
|
||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
|
||||
expect(call?.params?.idempotencyKey).toBe(
|
||||
"announce:v1:agent:main:subagent:worker:run-direct-idem",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps full findings and includes compact stats", async () => {
|
||||
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
|
||||
sessionStore = {
|
||||
@@ -266,6 +288,60 @@ describe("subagent announce formatting", () => {
|
||||
expect(call?.params?.accountId).toBe("kev");
|
||||
});
|
||||
|
||||
it("keeps queued idempotency unique for same-ms distinct child runs", async () => {
|
||||
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
|
||||
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
|
||||
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
|
||||
sessionStore = {
|
||||
"agent:main:main": {
|
||||
sessionId: "session-followup",
|
||||
lastChannel: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
queueMode: "followup",
|
||||
queueDebounceMs: 0,
|
||||
},
|
||||
};
|
||||
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_700_000_000_000);
|
||||
try {
|
||||
await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:worker",
|
||||
childRunId: "run-1",
|
||||
requesterSessionKey: "main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "first task",
|
||||
timeoutMs: 1000,
|
||||
cleanup: "keep",
|
||||
waitForCompletion: false,
|
||||
startedAt: 10,
|
||||
endedAt: 20,
|
||||
outcome: { status: "ok" },
|
||||
});
|
||||
await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:worker",
|
||||
childRunId: "run-2",
|
||||
requesterSessionKey: "main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "second task",
|
||||
timeoutMs: 1000,
|
||||
cleanup: "keep",
|
||||
waitForCompletion: false,
|
||||
startedAt: 10,
|
||||
endedAt: 20,
|
||||
outcome: { status: "ok" },
|
||||
});
|
||||
} finally {
|
||||
nowSpy.mockRestore();
|
||||
}
|
||||
|
||||
await expect.poll(() => agentSpy.mock.calls.length).toBe(2);
|
||||
const idempotencyKeys = agentSpy.mock.calls
|
||||
.map((call) => (call[0] as { params?: Record<string, unknown> })?.params?.idempotencyKey)
|
||||
.filter((value): value is string => typeof value === "string");
|
||||
expect(idempotencyKeys).toContain("announce:v1:agent:main:subagent:worker:run-1");
|
||||
expect(idempotencyKeys).toContain("announce:v1:agent:main:subagent:worker:run-2");
|
||||
expect(new Set(idempotencyKeys).size).toBe(2);
|
||||
});
|
||||
|
||||
it("queues announce delivery back into requester subagent session", async () => {
|
||||
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
|
||||
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import crypto from "node:crypto";
|
||||
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
@@ -16,6 +15,11 @@ import {
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
} from "../utils/delivery-context.js";
|
||||
import {
|
||||
buildAnnounceIdFromChildRun,
|
||||
buildAnnounceIdempotencyKey,
|
||||
resolveQueueAnnounceId,
|
||||
} from "./announce-idempotency.js";
|
||||
import {
|
||||
isEmbeddedPiRunActive,
|
||||
queueEmbeddedPiMessage,
|
||||
@@ -113,6 +117,15 @@ async function sendAnnounce(item: AnnounceQueueItem) {
|
||||
const origin = item.origin;
|
||||
const threadId =
|
||||
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
|
||||
// Share one announce identity across direct and queued delivery paths so
|
||||
// gateway dedupe suppresses true retries without collapsing distinct events.
|
||||
const idempotencyKey = buildAnnounceIdempotencyKey(
|
||||
resolveQueueAnnounceId({
|
||||
announceId: item.announceId,
|
||||
sessionKey: item.sessionKey,
|
||||
enqueuedAt: item.enqueuedAt,
|
||||
}),
|
||||
);
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
@@ -123,7 +136,7 @@ async function sendAnnounce(item: AnnounceQueueItem) {
|
||||
to: requesterIsSubagent ? undefined : origin?.to,
|
||||
threadId: requesterIsSubagent ? undefined : threadId,
|
||||
deliver: !requesterIsSubagent,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
idempotencyKey,
|
||||
},
|
||||
timeoutMs: 15_000,
|
||||
});
|
||||
@@ -163,6 +176,7 @@ function loadRequesterSessionEntry(requesterSessionKey: string) {
|
||||
|
||||
async function maybeQueueSubagentAnnounce(params: {
|
||||
requesterSessionKey: string;
|
||||
announceId?: string;
|
||||
triggerMessage: string;
|
||||
summaryLine?: string;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
@@ -199,6 +213,7 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
enqueueAnnounce({
|
||||
key: canonicalKey,
|
||||
item: {
|
||||
announceId: params.announceId,
|
||||
prompt: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
enqueuedAt: Date.now(),
|
||||
@@ -543,8 +558,13 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
replyInstruction,
|
||||
].join("\n");
|
||||
|
||||
const announceId = buildAnnounceIdFromChildRun({
|
||||
childSessionKey: params.childSessionKey,
|
||||
childRunId: params.childRunId,
|
||||
});
|
||||
const queued = await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: targetRequesterSessionKey,
|
||||
announceId,
|
||||
triggerMessage,
|
||||
summaryLine: taskLabel,
|
||||
requesterOrigin: targetRequesterOrigin,
|
||||
@@ -565,6 +585,10 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey);
|
||||
directOrigin = deliveryContextFromSession(entry);
|
||||
}
|
||||
// Use a deterministic idempotency key so the gateway dedup cache
|
||||
// catches duplicates if this announce is also queued by the gateway-
|
||||
// level message queue while the main session is busy (#17122).
|
||||
const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId);
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
@@ -578,7 +602,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
!requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== ""
|
||||
? String(directOrigin.threadId)
|
||||
: undefined,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
idempotencyKey: directIdempotencyKey,
|
||||
},
|
||||
expectFinal: true,
|
||||
timeoutMs: 15_000,
|
||||
|
||||
Reference in New Issue
Block a user