Files
openclaw/src/discord/monitor/message-handler.queue.test.ts

463 lines
15 KiB
TypeScript

import { describe, expect, it, vi } from "vitest";
import {
createDiscordHandlerParams,
createDiscordPreflightContext,
} from "./message-handler.test-helpers.js";
const preflightDiscordMessageMock = vi.hoisted(() => vi.fn());
const processDiscordMessageMock = vi.hoisted(() => vi.fn());
const eventualReplyDeliveredMock = vi.hoisted(() => vi.fn());
type SetStatusFn = (patch: Record<string, unknown>) => void;
vi.mock("./message-handler.preflight.js", () => ({
preflightDiscordMessage: preflightDiscordMessageMock,
}));
vi.mock("./message-handler.process.js", () => ({
processDiscordMessage: processDiscordMessageMock,
}));
const { createDiscordMessageHandler } = await import("./message-handler.js");
function createDeferred<T = void>() {
let resolve: (value: T | PromiseLike<T>) => void = () => {};
const promise = new Promise<T>((innerResolve) => {
resolve = innerResolve;
});
return { promise, resolve };
}
function createMessageData(messageId: string, channelId = "ch-1") {
return {
channel_id: channelId,
author: { id: "user-1" },
message: {
id: messageId,
author: { id: "user-1", bot: false },
content: "hello",
channel_id: channelId,
attachments: [{ id: `att-${messageId}` }],
},
};
}
function createPreflightContext(channelId = "ch-1") {
return createDiscordPreflightContext(channelId);
}
async function createLifecycleStopScenario(params: {
createHandler: (status: SetStatusFn) => {
handler: (data: never, opts: never) => Promise<void>;
stop: () => void;
};
}) {
const runInFlight = createDeferred();
processDiscordMessageMock.mockImplementation(async () => {
await runInFlight.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (contextParams: { data: { channel_id: string } }) =>
createPreflightContext(contextParams.data.channel_id),
);
const setStatus = vi.fn<SetStatusFn>();
const { handler, stop } = params.createHandler(setStatus);
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
const callsBeforeStop = setStatus.mock.calls.length;
stop();
return {
setStatus,
callsBeforeStop,
finish: async () => {
runInFlight.resolve();
await runInFlight.promise;
await Promise.resolve();
},
};
}
describe("createDiscordMessageHandler queue behavior", () => {
it("resets busy counters when the handler is created", () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const setStatus = vi.fn();
createDiscordMessageHandler(createDiscordHandlerParams({ setStatus }));
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
activeRuns: 0,
busy: false,
}),
);
});
it("returns immediately and tracks busy status while queued runs execute", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstRun = createDeferred();
const secondRun = createDeferred();
processDiscordMessageMock
.mockImplementationOnce(async () => {
await firstRun.promise;
})
.mockImplementationOnce(async () => {
await secondRun.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createDiscordHandlerParams({ setStatus }));
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({
activeRuns: 1,
busy: true,
}),
);
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(2);
});
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
firstRun.resolve();
await firstRun.promise;
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
secondRun.resolve();
await secondRun.promise;
await vi.waitFor(() => {
expect(setStatus).toHaveBeenLastCalledWith(
expect.objectContaining({
activeRuns: 0,
busy: false,
}),
);
});
});
it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
processDiscordMessageMock
.mockImplementationOnce(async (ctx: { abortSignal?: AbortSignal }) => {
await new Promise<void>((resolve) => {
if (ctx.abortSignal?.aborted) {
resolve();
return;
}
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
});
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 50 });
const handler = createDiscordMessageHandler(params);
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await expect(
handler(createMessageData("m-2") as never, {} as never),
).resolves.toBeUndefined();
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
const firstCtx = processDiscordMessageMock.mock.calls[0]?.[0] as
| { abortSignal?: AbortSignal }
| undefined;
expect(firstCtx?.abortSignal?.aborted).toBe(true);
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker timed out after"),
);
} finally {
vi.useRealTimers();
}
});
it("does not time out queued runs when the inbound worker timeout is disabled", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
eventualReplyDeliveredMock.mockReset();
processDiscordMessageMock.mockImplementationOnce(
async (ctx: { abortSignal?: AbortSignal }) => {
await new Promise<void>((resolve) => {
setTimeout(() => {
if (!ctx.abortSignal?.aborted) {
eventualReplyDeliveredMock();
}
resolve();
}, 80);
});
},
);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 0 });
const handler = createDiscordMessageHandler(params);
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await vi.advanceTimersByTimeAsync(80);
await Promise.resolve();
expect(eventualReplyDeliveredMock).toHaveBeenCalledTimes(1);
expect(params.runtime.error).not.toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker timed out after"),
);
} finally {
vi.useRealTimers();
}
});
it("refreshes run activity while active runs are in progress", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const runInFlight = createDeferred();
processDiscordMessageMock.mockImplementation(async () => {
await runInFlight.promise;
});
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
let heartbeatTick: () => void = () => {};
let capturedHeartbeat = false;
const setIntervalSpy = vi
.spyOn(globalThis, "setInterval")
.mockImplementation((callback: TimerHandler) => {
if (typeof callback === "function") {
heartbeatTick = () => {
callback();
};
capturedHeartbeat = true;
}
return 1 as unknown as ReturnType<typeof setInterval>;
});
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval");
try {
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createDiscordHandlerParams({ setStatus }));
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
expect(capturedHeartbeat).toBe(true);
const busyCallsBefore = setStatus.mock.calls.filter(
([patch]) => (patch as { busy?: boolean }).busy === true,
).length;
heartbeatTick();
const busyCallsAfter = setStatus.mock.calls.filter(
([patch]) => (patch as { busy?: boolean }).busy === true,
).length;
expect(busyCallsAfter).toBeGreaterThan(busyCallsBefore);
runInFlight.resolve();
await runInFlight.promise;
await vi.waitFor(() => {
expect(clearIntervalSpy).toHaveBeenCalled();
});
} finally {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
}
});
it("stops status publishing after lifecycle abort", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const { setStatus, callsBeforeStop, finish } = await createLifecycleStopScenario({
createHandler: (status) => {
const abortController = new AbortController();
const handler = createDiscordMessageHandler(
createDiscordHandlerParams({ setStatus: status, abortSignal: abortController.signal }),
);
return { handler, stop: () => abortController.abort() };
},
});
await finish();
expect(setStatus.mock.calls.length).toBe(callsBeforeStop);
});
it("stops status publishing after handler deactivation", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const { setStatus, callsBeforeStop, finish } = await createLifecycleStopScenario({
createHandler: (status) => {
const handler = createDiscordMessageHandler(
createDiscordHandlerParams({ setStatus: status }),
);
return { handler, stop: () => handler.deactivate() };
},
});
await finish();
expect(setStatus.mock.calls.length).toBe(callsBeforeStop);
});
it("skips queued runs that have not started yet after deactivation", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstRun = createDeferred();
processDiscordMessageMock
.mockImplementationOnce(async () => {
await firstRun.promise;
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const handler = createDiscordMessageHandler(createDiscordHandlerParams());
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
handler.deactivate();
firstRun.resolve();
await firstRun.promise;
await Promise.resolve();
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
it("preserves non-debounced message ordering by awaiting debouncer enqueue", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstPreflight = createDeferred();
const processedMessageIds: string[] = [];
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string; message?: { id?: string } } }) => {
const messageId = params.data.message?.id ?? "unknown";
if (messageId === "m-1") {
await firstPreflight.promise;
}
return {
...createPreflightContext(params.data.channel_id),
messageId,
};
},
);
processDiscordMessageMock.mockImplementation(async (ctx: { messageId?: string }) => {
processedMessageIds.push(ctx.messageId ?? "unknown");
});
const handler = createDiscordMessageHandler(createDiscordHandlerParams());
const sequentialDispatch = (async () => {
await handler(createMessageData("m-1") as never, {} as never);
await handler(createMessageData("m-2") as never, {} as never);
})();
await vi.waitFor(() => {
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await Promise.resolve();
expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1);
firstPreflight.resolve();
await sequentialDispatch;
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
expect(processedMessageIds).toEqual(["m-1", "m-2"]);
});
it("recovers queue progress after a run failure without leaving busy state stuck", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
const firstRun = createDeferred();
processDiscordMessageMock
.mockImplementationOnce(async () => {
await firstRun.promise;
throw new Error("simulated run failure");
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const setStatus = vi.fn();
const handler = createDiscordMessageHandler(createDiscordHandlerParams({ setStatus }));
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
firstRun.resolve();
await firstRun.promise.catch(() => undefined);
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
await vi.waitFor(() => {
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({ activeRuns: 0, busy: false }),
);
});
});
});