fix(hooks): dedupe repeated agent deliveries by idempotency key (#44438)
* Hooks: add hook idempotency key resolution * Hooks: dedupe repeated agent deliveries by idempotency key * Tests: cover hook idempotency dedupe * Changelog: note hook idempotency dedupe * Hooks: cap hook idempotency key length * Gateway: hash hook replay cache keys * Tests: cover hook replay key hardening
This commit is contained in:
@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Windows/native update: make package installs use the npm update path instead of the git path, carry portable Git into native Windows updates, and mirror the installer's Windows npm env so `openclaw update` no longer dies early on missing `git` or `node-llama-cpp` download setup.
|
||||
- Sandbox/write: preserve pinned mutation-helper payload stdin so sandboxed `write` no longer reports success while creating empty files. (#43876) Thanks @glitch418x.
|
||||
- Security/exec approvals: escape invisible Unicode format characters in approval prompts so zero-width command text renders as visible `\u{...}` escapes instead of spoofing the reviewed command. (`GHSA-pcqg-f7rg-xfvv`)(#43687) Thanks @EkiXu and @vincentkoc.
|
||||
- Hooks/agent deliveries: dedupe repeated hook requests by optional idempotency key so webhook retries can reuse the first run instead of launching duplicate agent executions. (#44438) Thanks @vincentkoc.
|
||||
- Security/exec detection: normalize compatibility Unicode and strip invisible formatting code points before obfuscation checks so zero-width and fullwidth command tricks no longer suppress heuristic detection. (`GHSA-9r3v-37xh-2cf6`)(#44091) Thanks @wooluo and @vincentkoc.
|
||||
- Security/exec allowlist: preserve POSIX case sensitivity and keep `?` within a single path segment so exact-looking allowlist patterns no longer overmatch executables across case or directory boundaries. (`GHSA-f8r2-vg7x-gh8m`)(#43798) Thanks @zpbrent and @vincentkoc.
|
||||
- Security/commands: require sender ownership for `/config` and `/debug` so authorized non-owner senders can no longer reach owner-only config and runtime debug surfaces. (`GHSA-r7vr-gr74-94p8`)(#44305) Thanks @tdjackey and @vincentkoc.
|
||||
|
||||
@@ -11,6 +11,7 @@ import { type HookMappingResolved, resolveHookMappings } from "./hooks-mapping.j
|
||||
|
||||
const DEFAULT_HOOKS_PATH = "/hooks";
|
||||
const DEFAULT_HOOKS_MAX_BODY_BYTES = 256 * 1024;
|
||||
const MAX_HOOK_IDEMPOTENCY_KEY_LENGTH = 256;
|
||||
|
||||
export type HooksConfigResolved = {
|
||||
basePath: string;
|
||||
@@ -223,6 +224,7 @@ export type HookAgentPayload = {
|
||||
message: string;
|
||||
name: string;
|
||||
agentId?: string;
|
||||
idempotencyKey?: string;
|
||||
wakeMode: "now" | "next-heartbeat";
|
||||
sessionKey?: string;
|
||||
deliver: boolean;
|
||||
@@ -263,6 +265,28 @@ export function resolveHookDeliver(raw: unknown): boolean {
|
||||
return raw !== false;
|
||||
}
|
||||
|
||||
function resolveOptionalHookIdempotencyKey(raw: unknown): string | undefined {
|
||||
if (typeof raw !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = raw.trim();
|
||||
if (!trimmed || trimmed.length > MAX_HOOK_IDEMPOTENCY_KEY_LENGTH) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
export function resolveHookIdempotencyKey(params: {
|
||||
payload: Record<string, unknown>;
|
||||
headers?: Record<string, string>;
|
||||
}): string | undefined {
|
||||
return (
|
||||
resolveOptionalHookIdempotencyKey(params.headers?.["idempotency-key"]) ||
|
||||
resolveOptionalHookIdempotencyKey(params.headers?.["x-openclaw-idempotency-key"]) ||
|
||||
resolveOptionalHookIdempotencyKey(params.payload.idempotencyKey)
|
||||
);
|
||||
}
|
||||
|
||||
export function resolveHookTargetAgentId(
|
||||
hooksConfig: HooksConfigResolved,
|
||||
agentId: string | undefined,
|
||||
@@ -366,6 +390,7 @@ export function normalizeAgentPayload(payload: Record<string, unknown>):
|
||||
const agentIdRaw = payload.agentId;
|
||||
const agentId =
|
||||
typeof agentIdRaw === "string" && agentIdRaw.trim() ? agentIdRaw.trim() : undefined;
|
||||
const idempotencyKey = resolveOptionalHookIdempotencyKey(payload.idempotencyKey);
|
||||
const wakeMode = payload.wakeMode === "next-heartbeat" ? "next-heartbeat" : "now";
|
||||
const sessionKeyRaw = payload.sessionKey;
|
||||
const sessionKey =
|
||||
@@ -396,6 +421,7 @@ export function normalizeAgentPayload(payload: Record<string, unknown>):
|
||||
message,
|
||||
name,
|
||||
agentId,
|
||||
idempotencyKey,
|
||||
wakeMode,
|
||||
sessionKey,
|
||||
deliver,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import {
|
||||
createServer as createHttpServer,
|
||||
type Server as HttpServer,
|
||||
@@ -42,6 +43,7 @@ import {
|
||||
isHookAgentAllowed,
|
||||
normalizeAgentPayload,
|
||||
normalizeHookHeaders,
|
||||
resolveHookIdempotencyKey,
|
||||
normalizeWakePayload,
|
||||
readJsonBody,
|
||||
normalizeHookDispatchSessionKey,
|
||||
@@ -55,6 +57,7 @@ import { getBearerToken } from "./http-utils.js";
|
||||
import { resolveRequestClientIp } from "./net.js";
|
||||
import { handleOpenAiHttpRequest } from "./openai-http.js";
|
||||
import { handleOpenResponsesHttpRequest } from "./openresponses-http.js";
|
||||
import { DEDUPE_MAX, DEDUPE_TTL_MS } from "./server-constants.js";
|
||||
import {
|
||||
authorizeCanvasRequest,
|
||||
enforcePluginRouteGatewayAuth,
|
||||
@@ -85,6 +88,18 @@ export type HookClientIpConfig = Readonly<{
|
||||
allowRealIpFallback?: boolean;
|
||||
}>;
|
||||
|
||||
type HookReplayEntry = {
|
||||
ts: number;
|
||||
runId: string;
|
||||
};
|
||||
|
||||
type HookReplayScope = {
|
||||
pathKey: string;
|
||||
token: string | undefined;
|
||||
idempotencyKey?: string;
|
||||
dispatchScope: Record<string, unknown>;
|
||||
};
|
||||
|
||||
function sendJson(res: ServerResponse, status: number, body: unknown) {
|
||||
res.statusCode = status;
|
||||
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
||||
@@ -361,6 +376,7 @@ export function createHooksRequestHandler(
|
||||
} & HookDispatchers,
|
||||
): HooksRequestHandler {
|
||||
const { getHooksConfig, logHooks, dispatchAgentHook, dispatchWakeHook, getClientIpConfig } = opts;
|
||||
const hookReplayCache = new Map<string, HookReplayEntry>();
|
||||
const hookAuthLimiter = createAuthRateLimiter({
|
||||
maxAttempts: HOOK_AUTH_FAILURE_LIMIT,
|
||||
windowMs: HOOK_AUTH_FAILURE_WINDOW_MS,
|
||||
@@ -381,6 +397,66 @@ export function createHooksRequestHandler(
|
||||
return normalizeRateLimitClientIp(clientIp);
|
||||
};
|
||||
|
||||
const pruneHookReplayCache = (now: number) => {
|
||||
const cutoff = now - DEDUPE_TTL_MS;
|
||||
for (const [key, entry] of hookReplayCache) {
|
||||
if (entry.ts < cutoff) {
|
||||
hookReplayCache.delete(key);
|
||||
}
|
||||
}
|
||||
while (hookReplayCache.size > DEDUPE_MAX) {
|
||||
const oldestKey = hookReplayCache.keys().next().value;
|
||||
if (!oldestKey) {
|
||||
break;
|
||||
}
|
||||
hookReplayCache.delete(oldestKey);
|
||||
}
|
||||
};
|
||||
|
||||
const buildHookReplayCacheKey = (params: HookReplayScope): string | undefined => {
|
||||
const idem = params.idempotencyKey?.trim();
|
||||
if (!idem) {
|
||||
return undefined;
|
||||
}
|
||||
const tokenFingerprint = createHash("sha256")
|
||||
.update(params.token ?? "", "utf8")
|
||||
.digest("hex");
|
||||
const idempotencyFingerprint = createHash("sha256").update(idem, "utf8").digest("hex");
|
||||
const scopeFingerprint = createHash("sha256")
|
||||
.update(
|
||||
JSON.stringify({
|
||||
pathKey: params.pathKey,
|
||||
dispatchScope: params.dispatchScope,
|
||||
}),
|
||||
"utf8",
|
||||
)
|
||||
.digest("hex");
|
||||
return `${tokenFingerprint}:${scopeFingerprint}:${idempotencyFingerprint}`;
|
||||
};
|
||||
|
||||
const resolveCachedHookRunId = (key: string | undefined, now: number): string | undefined => {
|
||||
if (!key) {
|
||||
return undefined;
|
||||
}
|
||||
pruneHookReplayCache(now);
|
||||
const cached = hookReplayCache.get(key);
|
||||
if (!cached) {
|
||||
return undefined;
|
||||
}
|
||||
hookReplayCache.delete(key);
|
||||
hookReplayCache.set(key, cached);
|
||||
return cached.runId;
|
||||
};
|
||||
|
||||
const rememberHookRunId = (key: string | undefined, runId: string, now: number) => {
|
||||
if (!key) {
|
||||
return;
|
||||
}
|
||||
hookReplayCache.delete(key);
|
||||
hookReplayCache.set(key, { ts: now, runId });
|
||||
pruneHookReplayCache(now);
|
||||
};
|
||||
|
||||
return async (req, res) => {
|
||||
const hooksConfig = getHooksConfig();
|
||||
if (!hooksConfig) {
|
||||
@@ -454,6 +530,11 @@ export function createHooksRequestHandler(
|
||||
|
||||
const payload = typeof body.value === "object" && body.value !== null ? body.value : {};
|
||||
const headers = normalizeHookHeaders(req);
|
||||
const idempotencyKey = resolveHookIdempotencyKey({
|
||||
payload: payload as Record<string, unknown>,
|
||||
headers,
|
||||
});
|
||||
const now = Date.now();
|
||||
|
||||
if (subPath === "wake") {
|
||||
const normalized = normalizeWakePayload(payload as Record<string, unknown>);
|
||||
@@ -486,14 +567,41 @@ export function createHooksRequestHandler(
|
||||
return true;
|
||||
}
|
||||
const targetAgentId = resolveHookTargetAgentId(hooksConfig, normalized.value.agentId);
|
||||
const replayKey = buildHookReplayCacheKey({
|
||||
pathKey: "agent",
|
||||
token,
|
||||
idempotencyKey,
|
||||
dispatchScope: {
|
||||
agentId: targetAgentId ?? null,
|
||||
sessionKey:
|
||||
normalized.value.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null,
|
||||
message: normalized.value.message,
|
||||
name: normalized.value.name,
|
||||
wakeMode: normalized.value.wakeMode,
|
||||
deliver: normalized.value.deliver,
|
||||
channel: normalized.value.channel,
|
||||
to: normalized.value.to ?? null,
|
||||
model: normalized.value.model ?? null,
|
||||
thinking: normalized.value.thinking ?? null,
|
||||
timeoutSeconds: normalized.value.timeoutSeconds ?? null,
|
||||
},
|
||||
});
|
||||
const cachedRunId = resolveCachedHookRunId(replayKey, now);
|
||||
if (cachedRunId) {
|
||||
sendJson(res, 200, { ok: true, runId: cachedRunId });
|
||||
return true;
|
||||
}
|
||||
const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
});
|
||||
const runId = dispatchAgentHook({
|
||||
...normalized.value,
|
||||
sessionKey: normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
}),
|
||||
idempotencyKey,
|
||||
sessionKey: normalizedDispatchSessionKey,
|
||||
agentId: targetAgentId,
|
||||
});
|
||||
rememberHookRunId(replayKey, runId, now);
|
||||
sendJson(res, 200, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
@@ -543,15 +651,41 @@ export function createHooksRequestHandler(
|
||||
return true;
|
||||
}
|
||||
const targetAgentId = resolveHookTargetAgentId(hooksConfig, mapped.action.agentId);
|
||||
const normalizedDispatchSessionKey = normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
});
|
||||
const replayKey = buildHookReplayCacheKey({
|
||||
pathKey: subPath || "mapping",
|
||||
token,
|
||||
idempotencyKey,
|
||||
dispatchScope: {
|
||||
agentId: targetAgentId ?? null,
|
||||
sessionKey:
|
||||
mapped.action.sessionKey ?? hooksConfig.sessionPolicy.defaultSessionKey ?? null,
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
deliver: resolveHookDeliver(mapped.action.deliver),
|
||||
channel,
|
||||
to: mapped.action.to ?? null,
|
||||
model: mapped.action.model ?? null,
|
||||
thinking: mapped.action.thinking ?? null,
|
||||
timeoutSeconds: mapped.action.timeoutSeconds ?? null,
|
||||
},
|
||||
});
|
||||
const cachedRunId = resolveCachedHookRunId(replayKey, now);
|
||||
if (cachedRunId) {
|
||||
sendJson(res, 200, { ok: true, runId: cachedRunId });
|
||||
return true;
|
||||
}
|
||||
const runId = dispatchAgentHook({
|
||||
message: mapped.action.message,
|
||||
name: mapped.action.name ?? "Hook",
|
||||
idempotencyKey,
|
||||
agentId: targetAgentId,
|
||||
wakeMode: mapped.action.wakeMode,
|
||||
sessionKey: normalizeHookDispatchSessionKey({
|
||||
sessionKey: sessionKey.value,
|
||||
targetAgentId,
|
||||
}),
|
||||
sessionKey: normalizedDispatchSessionKey,
|
||||
deliver: resolveHookDeliver(mapped.action.deliver),
|
||||
channel,
|
||||
to: mapped.action.to,
|
||||
@@ -560,6 +694,7 @@ export function createHooksRequestHandler(
|
||||
timeoutSeconds: mapped.action.timeoutSeconds,
|
||||
allowUnsafeExternalContent: mapped.action.allowUnsafeExternalContent,
|
||||
});
|
||||
rememberHookRunId(replayKey, runId, now);
|
||||
sendJson(res, 200, { ok: true, runId });
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { describe, expect, test } from "vitest";
|
||||
import fs from "node:fs/promises";
|
||||
import { afterEach, describe, expect, test, vi } from "vitest";
|
||||
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
|
||||
import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
|
||||
import { DEDUPE_TTL_MS } from "./server-constants.js";
|
||||
import {
|
||||
cronIsolatedRun,
|
||||
installGatewayTestHooks,
|
||||
@@ -14,6 +16,10 @@ installGatewayTestHooks({ scope: "suite" });
|
||||
const resolveMainKey = () => resolveMainSessionKeyFromConfig();
|
||||
const HOOK_TOKEN = "hook-secret";
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
function buildHookJsonHeaders(options?: {
|
||||
token?: string | null;
|
||||
headers?: Record<string, string>;
|
||||
@@ -279,6 +285,165 @@ describe("gateway server hooks", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("dedupes repeated /hooks/agent deliveries by idempotency key", async () => {
|
||||
testState.hooksConfig = { enabled: true, token: HOOK_TOKEN };
|
||||
await withGatewayServer(async ({ port }) => {
|
||||
cronIsolatedRun.mockClear();
|
||||
cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" });
|
||||
|
||||
const first = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": "hook-idem-1" } },
|
||||
);
|
||||
expect(first.status).toBe(200);
|
||||
const firstBody = (await first.json()) as { runId?: string };
|
||||
expect(firstBody.runId).toBeTruthy();
|
||||
await waitForSystemEvent();
|
||||
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
|
||||
drainSystemEvents(resolveMainKey());
|
||||
|
||||
const second = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": "hook-idem-1" } },
|
||||
);
|
||||
expect(second.status).toBe(200);
|
||||
const secondBody = (await second.json()) as { runId?: string };
|
||||
expect(secondBody.runId).toBe(firstBody.runId);
|
||||
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
|
||||
expect(peekSystemEvents(resolveMainKey())).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
test("dedupes hook retries even when trusted-proxy client IP changes", async () => {
|
||||
testState.hooksConfig = { enabled: true, token: HOOK_TOKEN };
|
||||
const configPath = process.env.OPENCLAW_CONFIG_PATH;
|
||||
expect(configPath).toBeTruthy();
|
||||
await fs.writeFile(
|
||||
configPath!,
|
||||
JSON.stringify({ gateway: { trustedProxies: ["127.0.0.1"] } }, null, 2),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await withGatewayServer(async ({ port }) => {
|
||||
cronIsolatedRun.mockClear();
|
||||
cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" });
|
||||
|
||||
const first = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{
|
||||
headers: {
|
||||
"Idempotency-Key": "hook-idem-forwarded",
|
||||
"X-Forwarded-For": "198.51.100.10",
|
||||
},
|
||||
},
|
||||
);
|
||||
expect(first.status).toBe(200);
|
||||
const firstBody = (await first.json()) as { runId?: string };
|
||||
await waitForSystemEvent();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
|
||||
const second = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{
|
||||
headers: {
|
||||
"Idempotency-Key": "hook-idem-forwarded",
|
||||
"X-Forwarded-For": "203.0.113.25",
|
||||
},
|
||||
},
|
||||
);
|
||||
expect(second.status).toBe(200);
|
||||
const secondBody = (await second.json()) as { runId?: string };
|
||||
expect(secondBody.runId).toBe(firstBody.runId);
|
||||
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
test("does not retain oversized idempotency keys for replay dedupe", async () => {
|
||||
testState.hooksConfig = { enabled: true, token: HOOK_TOKEN };
|
||||
const oversizedKey = "x".repeat(257);
|
||||
|
||||
await withGatewayServer(async ({ port }) => {
|
||||
cronIsolatedRun.mockClear();
|
||||
cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" });
|
||||
|
||||
const first = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": oversizedKey } },
|
||||
);
|
||||
expect(first.status).toBe(200);
|
||||
await waitForSystemEvent();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
|
||||
const second = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": oversizedKey } },
|
||||
);
|
||||
expect(second.status).toBe(200);
|
||||
await waitForSystemEvent();
|
||||
|
||||
expect(cronIsolatedRun).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
test("expires hook idempotency entries from first delivery time", async () => {
|
||||
testState.hooksConfig = { enabled: true, token: HOOK_TOKEN };
|
||||
const nowSpy = vi.spyOn(Date, "now");
|
||||
nowSpy.mockReturnValue(1_000_000);
|
||||
|
||||
await withGatewayServer(async ({ port }) => {
|
||||
cronIsolatedRun.mockClear();
|
||||
cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "done" });
|
||||
|
||||
const first = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": "fixed-window-idem" } },
|
||||
);
|
||||
expect(first.status).toBe(200);
|
||||
const firstBody = (await first.json()) as { runId?: string };
|
||||
await waitForSystemEvent();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
|
||||
nowSpy.mockReturnValue(1_000_000 + DEDUPE_TTL_MS - 1);
|
||||
const second = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": "fixed-window-idem" } },
|
||||
);
|
||||
expect(second.status).toBe(200);
|
||||
const secondBody = (await second.json()) as { runId?: string };
|
||||
expect(secondBody.runId).toBe(firstBody.runId);
|
||||
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
|
||||
|
||||
nowSpy.mockReturnValue(1_000_000 + DEDUPE_TTL_MS + 1);
|
||||
const third = await postHook(
|
||||
port,
|
||||
"/hooks/agent",
|
||||
{ message: "Do it", name: "Email" },
|
||||
{ headers: { "Idempotency-Key": "fixed-window-idem" } },
|
||||
);
|
||||
expect(third.status).toBe(200);
|
||||
const thirdBody = (await third.json()) as { runId?: string };
|
||||
expect(thirdBody.runId).toBeTruthy();
|
||||
expect(thirdBody.runId).not.toBe(firstBody.runId);
|
||||
expect(cronIsolatedRun).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
test("enforces hooks.allowedAgentIds for explicit agent routing", async () => {
|
||||
testState.hooksConfig = {
|
||||
enabled: true,
|
||||
|
||||
Reference in New Issue
Block a user