From 2649c03cdb52ec5b81d57cd46792b93b1d4c16d2 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 12 Mar 2026 20:43:38 -0400 Subject: [PATCH] fix(hooks): dedupe repeated agent deliveries by idempotency key (#44438) * Hooks: add hook idempotency key resolution * Hooks: dedupe repeated agent deliveries by idempotency key * Tests: cover hook idempotency dedupe * Changelog: note hook idempotency dedupe * Hooks: cap hook idempotency key length * Gateway: hash hook replay cache keys * Tests: cover hook replay key hardening --- CHANGELOG.md | 1 + src/gateway/hooks.ts | 26 +++++ src/gateway/server-http.ts | 151 ++++++++++++++++++++++++++-- src/gateway/server.hooks.test.ts | 167 ++++++++++++++++++++++++++++++- 4 files changed, 336 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21b960bc5..4a595786e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Windows/native update: make package installs use the npm update path instead of the git path, carry portable Git into native Windows updates, and mirror the installer's Windows npm env so `openclaw update` no longer dies early on missing `git` or `node-llama-cpp` download setup. - Sandbox/write: preserve pinned mutation-helper payload stdin so sandboxed `write` no longer reports success while creating empty files. (#43876) Thanks @glitch418x. - Security/exec approvals: escape invisible Unicode format characters in approval prompts so zero-width command text renders as visible `\u{...}` escapes instead of spoofing the reviewed command. (`GHSA-pcqg-f7rg-xfvv`)(#43687) Thanks @EkiXu and @vincentkoc. +- Hooks/agent deliveries: dedupe repeated hook requests by optional idempotency key so webhook retries can reuse the first run instead of launching duplicate agent executions. (#44438) Thanks @vincentkoc. - Security/exec detection: normalize compatibility Unicode and strip invisible formatting code points before obfuscation checks so zero-width and fullwidth command tricks no longer suppress heuristic detection. (`GHSA-9r3v-37xh-2cf6`)(#44091) Thanks @wooluo and @vincentkoc. - Security/exec allowlist: preserve POSIX case sensitivity and keep `?` within a single path segment so exact-looking allowlist patterns no longer overmatch executables across case or directory boundaries. (`GHSA-f8r2-vg7x-gh8m`)(#43798) Thanks @zpbrent and @vincentkoc. - Security/commands: require sender ownership for `/config` and `/debug` so authorized non-owner senders can no longer reach owner-only config and runtime debug surfaces. (`GHSA-r7vr-gr74-94p8`)(#44305) Thanks @tdjackey and @vincentkoc. diff --git a/src/gateway/hooks.ts b/src/gateway/hooks.ts index 32751369f..f371e3565 100644 --- a/src/gateway/hooks.ts +++ b/src/gateway/hooks.ts @@ -11,6 +11,7 @@ import { type HookMappingResolved, resolveHookMappings } from "./hooks-mapping.j const DEFAULT_HOOKS_PATH = "/hooks"; const DEFAULT_HOOKS_MAX_BODY_BYTES = 256 * 1024; +const MAX_HOOK_IDEMPOTENCY_KEY_LENGTH = 256; export type HooksConfigResolved = { basePath: string; @@ -223,6 +224,7 @@ export type HookAgentPayload = { message: string; name: string; agentId?: string; + idempotencyKey?: string; wakeMode: "now" | "next-heartbeat"; sessionKey?: string; deliver: boolean; @@ -263,6 +265,28 @@ export function resolveHookDeliver(raw: unknown): boolean { return raw !== false; } +function resolveOptionalHookIdempotencyKey(raw: unknown): string | undefined { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim(); + if (!trimmed || trimmed.length > MAX_HOOK_IDEMPOTENCY_KEY_LENGTH) { + return undefined; + } + return trimmed; +} + +export function resolveHookIdempotencyKey(params: { + payload: Record; + headers?: Record; +}): string | undefined { + return ( + resolveOptionalHookIdempotencyKey(params.headers?.["idempotency-key"]) || + resolveOptionalHookIdempotencyKey(params.headers?.["x-openclaw-idempotency-key"]) || + resolveOptionalHookIdempotencyKey(params.payload.idempotencyKey) + ); +} + export function resolveHookTargetAgentId( hooksConfig: HooksConfigResolved, agentId: string | undefined, @@ -366,6 +390,7 @@ export function normalizeAgentPayload(payload: Record): const agentIdRaw = payload.agentId; const agentId = typeof agentIdRaw === "string" && agentIdRaw.trim() ? agentIdRaw.trim() : undefined; + const idempotencyKey = resolveOptionalHookIdempotencyKey(payload.idempotencyKey); const wakeMode = payload.wakeMode === "next-heartbeat" ? "next-heartbeat" : "now"; const sessionKeyRaw = payload.sessionKey; const sessionKey = @@ -396,6 +421,7 @@ export function normalizeAgentPayload(payload: Record): message, name, agentId, + idempotencyKey, wakeMode, sessionKey, deliver, diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index ad3a0e305..4a6fc780d 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import { createServer as createHttpServer, type Server as HttpServer, @@ -42,6 +43,7 @@ import { isHookAgentAllowed, normalizeAgentPayload, normalizeHookHeaders, + resolveHookIdempotencyKey, normalizeWakePayload, readJsonBody, normalizeHookDispatchSessionKey, @@ -55,6 +57,7 @@ import { getBearerToken } from "./http-utils.js"; import { resolveRequestClientIp } from "./net.js"; import { handleOpenAiHttpRequest } from "./openai-http.js"; import { handleOpenResponsesHttpRequest } from "./openresponses-http.js"; +import { DEDUPE_MAX, DEDUPE_TTL_MS } from "./server-constants.js"; import { authorizeCanvasRequest, enforcePluginRouteGatewayAuth, @@ -85,6 +88,18 @@ export type HookClientIpConfig = Readonly<{ allowRealIpFallback?: boolean; }>; +type HookReplayEntry = { + ts: number; + runId: string; +}; + +type HookReplayScope = { + pathKey: string; + token: string | undefined; + idempotencyKey?: string; + dispatchScope: Record; +}; + function sendJson(res: ServerResponse, status: number, body: unknown) { res.statusCode = status; res.setHeader("Content-Type", "application/json; charset=utf-8"); @@ -361,6 +376,7 @@ export function createHooksRequestHandler( } & HookDispatchers, ): HooksRequestHandler { const { getHooksConfig, logHooks, dispatchAgentHook, dispatchWakeHook, getClientIpConfig } = opts; + const hookReplayCache = new Map(); const hookAuthLimiter = createAuthRateLimiter({ maxAttempts: HOOK_AUTH_FAILURE_LIMIT, windowMs: HOOK_AUTH_FAILURE_WINDOW_MS, @@ -381,6 +397,66 @@ export function createHooksRequestHandler( return normalizeRateLimitClientIp(clientIp); }; + const pruneHookReplayCache = (now: number) => { + const cutoff = now - DEDUPE_TTL_MS; + for (const [key, entry] of hookReplayCache) { + if (entry.ts < cutoff) { + hookReplayCache.delete(key); + } + } + while (hookReplayCache.size > DEDUPE_MAX) { + const oldestKey = hookReplayCache.keys().next().value; + if (!oldestKey) { + break; + } + hookReplayCache.delete(oldestKey); + } + }; + + const buildHookReplayCacheKey = (params: HookReplayScope): string | undefined => { + const idem = params.idempotencyKey?.trim(); + if (!idem) { + return undefined; + } + const tokenFingerprint = createHash("sha256") + .update(params.token ?? "", "utf8") + .digest("hex"); + const idempotencyFingerprint = createHash("sha256").update(idem, "utf8").digest("hex"); + const scopeFingerprint = createHash("sha256") + .update( + JSON.stringify({ + pathKey: params.pathKey, + dispatchScope: params.dispatchScope, + }), + "utf8", + ) + .digest("hex"); + return `${tokenFingerprint}:${scopeFingerprint}:${idempotencyFingerprint}`; + }; + + const resolveCachedHookRunId = (key: string | undefined, now: number): string | undefined => { + if (!key) { + return undefined; + } + pruneHookReplayCache(now); + const cached = hookReplayCache.get(key); + if (!cached) { + return undefined; + } + hookReplayCache.delete(key); + hookReplayCache.set(key, cached); + return cached.runId; + }; + + const rememberHookRunId = (key: string | undefined, runId: string, now: number) => { + if (!key) { + return; + } + hookReplayCache.delete(key); + hookReplayCache.set(key, { ts: now, runId }); + pruneHookReplayCache(now); + }; + return async (req, res) => { const hooksConfig = getHooksConfig(); if (!hooksConfig) { @@ -454,6 +530,11 @@ export function createHooksRequestHandler( const payload = typeof body.value === "object" && body.value !== null ? body.value : {}; const headers = normalizeHookHeaders(req); + const idempotencyKey = resolveHookIdempotencyKey({ + payload: payload as Record, + headers, + }); + const now = Date.now(); if (subPath === "wake") { const normalized = normalizeWakePayload(payload as Record); @@ -486,14 +567,41 @@ export function createHooksRequestHandler( return true; } const targetAgentId = resolveHookTargetAgentId(hooksConfig, normalized.value.agentId); + const replayKey = buildHookReplayCacheKey({ + pathKey: "agent", + token, + idempotencyKey, + dispatchScope: { + agentId: targetAgentId ?? null, + sessionKey: + normalized.value.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null, + message: normalized.value.message, + name: normalized.value.name, + wakeMode: normalized.value.wakeMode, + deliver: normalized.value.deliver, + channel: normalized.value.channel, + to: normalized.value.to ?? null, + model: normalized.value.model ?? null, + thinking: normalized.value.thinking ?? null, + timeoutSeconds: normalized.value.timeoutSeconds ?? null, + }, + }); + const cachedRunId = resolveCachedHookRunId(replayKey, now); + if (cachedRunId) { + sendJson(res, 200, { ok: true, runId: cachedRunId }); + return true; + } + const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({ + sessionKey: sessionKey.value, + targetAgentId, + }); const runId = dispatchAgentHook({ ...normalized.value, - sessionKey: normalizeHookDispatchSessionKey({ - sessionKey: sessionKey.value, - targetAgentId, - }), + idempotencyKey, + sessionKey: normalizedDispatchSessionKey, agentId: targetAgentId, }); + rememberHookRunId(replayKey, runId, now); sendJson(res, 200, { ok: true, runId }); return true; } @@ -543,15 +651,41 @@ export function createHooksRequestHandler( return true; } const targetAgentId = resolveHookTargetAgentId(hooksConfig, mapped.action.agentId); + const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({ + sessionKey: sessionKey.value, + targetAgentId, + }); + const replayKey = buildHookReplayCacheKey({ + pathKey: subPath || "mapping", + token, + idempotencyKey, + dispatchScope: { + agentId: targetAgentId ?? null, + sessionKey: + mapped.action.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null, + message: mapped.action.message, + name: mapped.action.name ?? "Hook", + wakeMode: mapped.action.wakeMode, + deliver: resolveHookDeliver(mapped.action.deliver), + channel, + to: mapped.action.to ?? null, + model: mapped.action.model ?? null, + thinking: mapped.action.thinking ?? null, + timeoutSeconds: mapped.action.timeoutSeconds ?? null, + }, + }); + const cachedRunId = resolveCachedHookRunId(replayKey, now); + if (cachedRunId) { + sendJson(res, 200, { ok: true, runId: cachedRunId }); + return true; + } const runId = dispatchAgentHook({ message: mapped.action.message, name: mapped.action.name ?? "Hook", + idempotencyKey, agentId: targetAgentId, wakeMode: mapped.action.wakeMode, - sessionKey: normalizeHookDispatchSessionKey({ - sessionKey: sessionKey.value, - targetAgentId, - }), + sessionKey: normalizedDispatchSessionKey, deliver: resolveHookDeliver(mapped.action.deliver), channel, to: mapped.action.to, @@ -560,6 +694,7 @@ export function createHooksRequestHandler( timeoutSeconds: mapped.action.timeoutSeconds, allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent, }); + rememberHookRunId(replayKey, runId, now); sendJson(res, 200, { ok: true, runId }); return true; } diff --git a/src/gateway/server.hooks.test.ts b/src/gateway/server.hooks.test.ts index 2a4e1c961..612e7db86 100644 --- a/src/gateway/server.hooks.test.ts +++ b/src/gateway/server.hooks.test.ts @@ -1,6 +1,8 @@ -import { describe, expect, test } from "vitest"; +import fs from "node:fs/promises"; +import { afterEach, describe, expect, test, vi } from "vitest"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js"; +import { DEDUPE_TTL_MS } from "./server-constants.js"; import { cronIsolatedRun, installGatewayTestHooks, @@ -14,6 +16,10 @@ installGatewayTestHooks({ scope: "suite" }); const resolveMainKey = () => resolveMainSessionKeyFromConfig(); const HOOK_TOKEN = "hook-secret"; +afterEach(() => { + vi.restoreAllMocks(); +}); + function buildHookJsonHeaders(options?: { token?: string | null; headers?: Record; @@ -279,6 +285,165 @@ describe("gateway server hooks", () => { }); }); + test("dedupes repeated /hooks/agent deliveries by idempotency key", async () => { + testState.hooksConfig = { enabled: true, token: HOOK_TOKEN }; + await withGatewayServer(async ({ port }) => { + cronIsolatedRun.mockClear(); + cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" }); + + const first = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": "hook-idem-1" } }, + ); + expect(first.status).toBe(200); + const firstBody = (await first.json()) as { runId?: string }; + expect(firstBody.runId).toBeTruthy(); + await waitForSystemEvent(); + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + drainSystemEvents(resolveMainKey()); + + const second = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": "hook-idem-1" } }, + ); + expect(second.status).toBe(200); + const secondBody = (await second.json()) as { runId?: string }; + expect(secondBody.runId).toBe(firstBody.runId); + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + expect(peekSystemEvents(resolveMainKey())).toHaveLength(0); + }); + }); + + test("dedupes hook retries even when trusted-proxy client IP changes", async () => { + testState.hooksConfig = { enabled: true, token: HOOK_TOKEN }; + const configPath = process.env.OPENCLAW_CONFIG_PATH; + expect(configPath).toBeTruthy(); + await fs.writeFile( + configPath!, + JSON.stringify({ gateway: { trustedProxies: ["127.0.0.1"] } }, null, 2), + "utf-8", + ); + + await withGatewayServer(async ({ port }) => { + cronIsolatedRun.mockClear(); + cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" }); + + const first = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { + headers: { + "Idempotency-Key": "hook-idem-forwarded", + "X-Forwarded-For": "198.51.100.10", + }, + }, + ); + expect(first.status).toBe(200); + const firstBody = (await first.json()) as { runId?: string }; + await waitForSystemEvent(); + drainSystemEvents(resolveMainKey()); + + const second = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { + headers: { + "Idempotency-Key": "hook-idem-forwarded", + "X-Forwarded-For": "203.0.113.25", + }, + }, + ); + expect(second.status).toBe(200); + const secondBody = (await second.json()) as { runId?: string }; + expect(secondBody.runId).toBe(firstBody.runId); + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + }); + }); + + test("does not retain oversized idempotency keys for replay dedupe", async () => { + testState.hooksConfig = { enabled: true, token: HOOK_TOKEN }; + const oversizedKey = "x".repeat(257); + + await withGatewayServer(async ({ port }) => { + cronIsolatedRun.mockClear(); + cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" }); + + const first = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": oversizedKey } }, + ); + expect(first.status).toBe(200); + await waitForSystemEvent(); + drainSystemEvents(resolveMainKey()); + + const second = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": oversizedKey } }, + ); + expect(second.status).toBe(200); + await waitForSystemEvent(); + + expect(cronIsolatedRun).toHaveBeenCalledTimes(2); + }); + }); + + test("expires hook idempotency entries from first delivery time", async () => { + testState.hooksConfig = { enabled: true, token: HOOK_TOKEN }; + const nowSpy = vi.spyOn(Date, "now"); + nowSpy.mockReturnValue(1_000_000); + + await withGatewayServer(async ({ port }) => { + cronIsolatedRun.mockClear(); + cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" }); + + const first = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": "fixed-window-idem" } }, + ); + expect(first.status).toBe(200); + const firstBody = (await first.json()) as { runId?: string }; + await waitForSystemEvent(); + drainSystemEvents(resolveMainKey()); + + nowSpy.mockReturnValue(1_000_000 + DEDUPE_TTL_MS - 1); + const second = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": "fixed-window-idem" } }, + ); + expect(second.status).toBe(200); + const secondBody = (await second.json()) as { runId?: string }; + expect(secondBody.runId).toBe(firstBody.runId); + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + + nowSpy.mockReturnValue(1_000_000 + DEDUPE_TTL_MS + 1); + const third = await postHook( + port, + "/hooks/agent", + { message: "Do it", name: "Email" }, + { headers: { "Idempotency-Key": "fixed-window-idem" } }, + ); + expect(third.status).toBe(200); + const thirdBody = (await third.json()) as { runId?: string }; + expect(thirdBody.runId).toBeTruthy(); + expect(thirdBody.runId).not.toBe(firstBody.runId); + expect(cronIsolatedRun).toHaveBeenCalledTimes(2); + }); + }); + test("enforces hooks.allowedAgentIds for explicit agent routing", async () => { testState.hooksConfig = { enabled: true,