fix: harden queue retry debounce and add regression tests

This commit is contained in:
Peter Steinberger
2026-02-24 03:52:31 +00:00
parent a216f2dabe
commit 6c1ed9493c
9 changed files with 257 additions and 6 deletions

View File

@@ -39,6 +39,7 @@ vi.mock("zod", () => ({
}));
const { createSynologyChatPlugin } = await import("./channel.js");
const { registerPluginHttpRoute } = await import("openclaw/plugin-sdk");
describe("createSynologyChatPlugin", () => {
it("returns a plugin object with all required sections", () => {
@@ -336,5 +337,39 @@ describe("createSynologyChatPlugin", () => {
const result = await plugin.gateway.startAccount(ctx);
expect(typeof result.stop).toBe("function");
});
it("deregisters stale route before re-registering same account/path", async () => {
const unregisterFirst = vi.fn();
const unregisterSecond = vi.fn();
const registerMock = vi.mocked(registerPluginHttpRoute);
registerMock.mockReturnValueOnce(unregisterFirst).mockReturnValueOnce(unregisterSecond);
const plugin = createSynologyChatPlugin();
const ctx = {
cfg: {
channels: {
"synology-chat": {
enabled: true,
token: "t",
incomingUrl: "https://nas/incoming",
webhookPath: "/webhook/synology",
},
},
},
accountId: "default",
log: { info: vi.fn(), warn: vi.fn(), error: vi.fn() },
};
const first = await plugin.gateway.startAccount(ctx);
const second = await plugin.gateway.startAccount(ctx);
expect(registerMock).toHaveBeenCalledTimes(2);
expect(unregisterFirst).toHaveBeenCalledTimes(1);
expect(unregisterSecond).not.toHaveBeenCalled();
// Clean up active route map so this module-level state doesn't leak across tests.
first.stop();
second.stop();
});
});
});

View File

@@ -1,7 +1,11 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ImageContent } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { injectHistoryImagesIntoMessages, resolvePromptBuildHookResult } from "./attempt.js";
import {
injectHistoryImagesIntoMessages,
resolvePromptBuildHookResult,
resolvePromptModeForSession,
} from "./attempt.js";
describe("injectHistoryImagesIntoMessages", () => {
const image: ImageContent = { type: "image", data: "abc", mimeType: "image/png" };
@@ -103,3 +107,14 @@ describe("resolvePromptBuildHookResult", () => {
expect(result.prependContext).toBe("from-hook");
});
});
describe("resolvePromptModeForSession", () => {
it("uses minimal mode for subagent sessions", () => {
expect(resolvePromptModeForSession("agent:main:subagent:child")).toBe("minimal");
});
it("uses full mode for cron sessions", () => {
expect(resolvePromptModeForSession("agent:main:cron:job-1")).toBe("full");
expect(resolvePromptModeForSession("agent:main:cron:job-1:run:run-abc")).toBe("full");
});
});

View File

@@ -221,6 +221,13 @@ export async function resolvePromptBuildHookResult(params: {
};
}
export function resolvePromptModeForSession(sessionKey?: string): "minimal" | "full" {
if (!sessionKey) {
return "full";
}
return isSubagentSessionKey(sessionKey) ? "minimal" : "full";
}
function summarizeMessagePayload(msg: AgentMessage): { textChars: number; imageBlocks: number } {
const content = (msg as { content?: unknown }).content;
if (typeof content === "string") {
@@ -494,7 +501,7 @@ export async function runEmbeddedAttempt(
},
});
const isDefaultAgent = sessionAgentId === defaultAgentId;
const promptMode = isSubagentSessionKey(params.sessionKey) ? "minimal" : "full";
const promptMode = resolvePromptModeForSession(params.sessionKey);
const docsPath = await resolveOpenClawDocsPath({
workspaceDir: effectiveWorkspace,
argv1: process.argv[1],

View File

@@ -27,6 +27,7 @@ function createRetryingSend() {
describe("subagent-announce-queue", () => {
afterEach(() => {
vi.useRealTimers();
resetAnnounceQueuesForTests();
});
@@ -116,4 +117,52 @@ describe("subagent-announce-queue", () => {
expect(sender.prompts[1]).toContain("Queued #2");
expect(sender.prompts[1]).toContain("queued item two");
});
it("uses debounce floor for retries when debounce exceeds backoff", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
const previousFast = process.env.OPENCLAW_TEST_FAST;
delete process.env.OPENCLAW_TEST_FAST;
try {
const attempts: number[] = [];
const send = vi.fn(async () => {
attempts.push(Date.now());
if (attempts.length === 1) {
throw new Error("transient timeout");
}
});
enqueueAnnounce({
key: "announce:test:retry-debounce-floor",
item: {
prompt: "subagent completed",
enqueuedAt: Date.now(),
sessionKey: "agent:main:telegram:dm:u1",
},
settings: { mode: "followup", debounceMs: 5_000 },
send,
});
await vi.advanceTimersByTimeAsync(5_000);
expect(send).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(4_999);
expect(send).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1);
expect(send).toHaveBeenCalledTimes(2);
const [firstAttempt, secondAttempt] = attempts;
if (firstAttempt === undefined || secondAttempt === undefined) {
throw new Error("expected two retry attempts");
}
expect(secondAttempt - firstAttempt).toBeGreaterThanOrEqual(5_000);
} finally {
if (previousFast === undefined) {
delete process.env.OPENCLAW_TEST_FAST;
} else {
process.env.OPENCLAW_TEST_FAST = previousFast;
}
}
});
});

View File

@@ -183,9 +183,10 @@ function scheduleAnnounceDrain(key: string) {
queue.consecutiveFailures++;
// Exponential backoff on consecutive failures: 2s, 4s, 8s, ... capped at 60s.
const errorBackoffMs = Math.min(1000 * Math.pow(2, queue.consecutiveFailures), 60_000);
queue.lastEnqueuedAt = Date.now() + errorBackoffMs - queue.debounceMs;
const retryDelayMs = Math.max(errorBackoffMs, queue.debounceMs);
queue.lastEnqueuedAt = Date.now() + retryDelayMs - queue.debounceMs;
defaultRuntime.error?.(
`announce queue drain failed for ${key} (attempt ${queue.consecutiveFailures}, retry in ${Math.round(errorBackoffMs / 1000)}s): ${String(err)}`,
`announce queue drain failed for ${key} (attempt ${queue.consecutiveFailures}, retry in ${Math.round(retryDelayMs / 1000)}s): ${String(err)}`,
);
} finally {
queue.draining = false;
@@ -205,7 +206,8 @@ export function enqueueAnnounce(params: {
send: (item: AnnounceQueueItem) => Promise<void>;
}): boolean {
const queue = getAnnounceQueue(params.key, params.settings, params.send);
queue.lastEnqueuedAt = Date.now();
// Preserve any retry backoff marker already encoded in lastEnqueuedAt.
queue.lastEnqueuedAt = Math.max(queue.lastEnqueuedAt, Date.now());
const shouldEnqueue = applyQueueDropPolicy({
queue,

View File

@@ -31,6 +31,7 @@ const deliverDiscordReply = deliveryMocks.deliverDiscordReply;
const createDiscordDraftStream = deliveryMocks.createDiscordDraftStream;
type DispatchInboundParams = {
dispatcher: {
sendBlockReply: (payload: { text?: string }) => boolean | Promise<boolean>;
sendFinalReply: (payload: { text?: string }) => boolean | Promise<boolean>;
};
replyOptions?: {
@@ -75,7 +76,10 @@ vi.mock("../../auto-reply/reply/reply-dispatcher.js", () => ({
(opts: { deliver: (payload: unknown, info: { kind: string }) => Promise<void> | void }) => ({
dispatcher: {
sendToolResult: vi.fn(() => true),
sendBlockReply: vi.fn(() => true),
sendBlockReply: vi.fn((payload: unknown) => {
void opts.deliver(payload as never, { kind: "block" });
return true;
}),
sendFinalReply: vi.fn((payload: unknown) => {
void opts.deliver(payload as never, { kind: "final" });
return true;
@@ -423,6 +427,20 @@ describe("processDiscordMessage draft streaming", () => {
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});
it("suppresses block-kind payload delivery to Discord", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendBlockReply({ text: "thinking..." });
return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } };
});
const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } });
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
expect(deliverDiscordReply).not.toHaveBeenCalled();
});
it("streams block previews using draft chunking", async () => {
const draftStream = createMockDraftStream();
createDiscordDraftStream.mockReturnValueOnce(draftStream);

View File

@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import { buildHistoryContextFromEntries } from "../auto-reply/reply/history.js";
import { extractTextFromChatContent } from "../shared/chat-content.js";
import { buildAgentMessageFromConversationEntries } from "./agent-prompt.js";
describe("gateway agent prompt", () => {
@@ -15,6 +16,24 @@ describe("gateway agent prompt", () => {
).toBe("hi");
});
it("extracts text from content-array body when there is no history", () => {
expect(
buildAgentMessageFromConversationEntries([
{
role: "user",
entry: {
sender: "User",
body: [
{ type: "text", text: "hi" },
{ type: "image", data: "base64-image", mimeType: "image/png" },
{ type: "text", text: "there" },
] as unknown as string,
},
},
]),
).toBe("hi there");
});
it("uses history context when there is history", () => {
const entries = [
{ role: "assistant", entry: { sender: "Assistant", body: "prev" } },
@@ -45,4 +64,34 @@ describe("gateway agent prompt", () => {
expect(buildAgentMessageFromConversationEntries([...entries])).toBe(expected);
});
it("normalizes content-array bodies in history and current message", () => {
const entries = [
{
role: "assistant",
entry: {
sender: "Assistant",
body: [{ type: "text", text: "prev" }] as unknown as string,
},
},
{
role: "user",
entry: {
sender: "User",
body: [
{ type: "text", text: "next" },
{ type: "text", text: "step" },
] as unknown as string,
},
},
] as const;
const expected = buildHistoryContextFromEntries({
entries: entries.map((e) => e.entry),
currentMessage: "User: next step",
formatEntry: (e) => `${e.sender}: ${extractTextFromChatContent(e.body) ?? ""}`,
});
expect(buildAgentMessageFromConversationEntries([...entries])).toBe(expected);
});
});

View File

@@ -87,6 +87,38 @@ describe("gateway sessions patch", () => {
expect(res.entry.thinkingLevel).toBeUndefined();
});
test("persists reasoningLevel=off (does not clear)", async () => {
const store: Record<string, SessionEntry> = {};
const res = await applySessionsPatchToStore({
cfg: {} as OpenClawConfig,
store,
storeKey: "agent:main:main",
patch: { key: "agent:main:main", reasoningLevel: "off" },
});
expect(res.ok).toBe(true);
if (!res.ok) {
return;
}
expect(res.entry.reasoningLevel).toBe("off");
});
test("clears reasoningLevel when patch sets null", async () => {
const store: Record<string, SessionEntry> = {
"agent:main:main": { reasoningLevel: "stream" } as SessionEntry,
};
const res = await applySessionsPatchToStore({
cfg: {} as OpenClawConfig,
store,
storeKey: "agent:main:main",
patch: { key: "agent:main:main", reasoningLevel: null },
});
expect(res.ok).toBe(true);
if (!res.ok) {
return;
}
expect(res.entry.reasoningLevel).toBeUndefined();
});
test("persists elevatedLevel=off (does not clear)", async () => {
const store: Record<string, SessionEntry> = {};
const res = await applySessionsPatchToStore({

View File

@@ -513,6 +513,50 @@ describe("installPluginFromDir", () => {
expect(manifest.devDependencies?.openclaw).toBeUndefined();
expect(manifest.devDependencies?.vitest).toBe("^3.0.0");
});
it("uses openclaw.plugin.json id as install key when it differs from package name", async () => {
const { pluginDir, extensionsDir } = setupPluginInstallDirs();
fs.mkdirSync(path.join(pluginDir, "dist"), { recursive: true });
fs.writeFileSync(
path.join(pluginDir, "package.json"),
JSON.stringify({
name: "@openclaw/cognee-openclaw",
version: "0.0.1",
openclaw: { extensions: ["./dist/index.js"] },
}),
"utf-8",
);
fs.writeFileSync(path.join(pluginDir, "dist", "index.js"), "export {};", "utf-8");
fs.writeFileSync(
path.join(pluginDir, "openclaw.plugin.json"),
JSON.stringify({
id: "memory-cognee",
configSchema: { type: "object", properties: {} },
}),
"utf-8",
);
const infoMessages: string[] = [];
const res = await installPluginFromDir({
dirPath: pluginDir,
extensionsDir,
logger: { info: (msg: string) => infoMessages.push(msg), warn: () => {} },
});
expect(res.ok).toBe(true);
if (!res.ok) {
return;
}
expect(res.pluginId).toBe("memory-cognee");
expect(res.targetDir).toBe(path.join(extensionsDir, "memory-cognee"));
expect(
infoMessages.some((msg) =>
msg.includes(
'Plugin manifest id "memory-cognee" differs from npm package name "cognee-openclaw"',
),
),
).toBe(true);
});
});
describe("installPluginFromNpmSpec", () => {