fix(context-engine): guard compact() throw + fire hooks for ownsCompaction engines (#41361)
Merged via squash. Prepared head SHA: 0957b32dc63b16d710403565953b77bfbd2bd987 Co-authored-by: davidrudduck <47308254+davidrudduck@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman
This commit is contained in:
@@ -121,6 +121,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/config errors: surface up to three validation issues in top-level `config.set`, `config.patch`, and `config.apply` error messages while preserving structured issue details. (#42664) Thanks @huntharo.
|
||||
- Hooks/plugin context parity followup: pass `trigger` and `channelId` through embedded `llm_input`, `agent_end`, and `llm_output` hook contexts so plugins receive the same agent metadata across hook phases. (#42362) Thanks @zhoulf1006.
|
||||
- ACP/main session aliases: canonicalize `main` before ACP session lookup so restarted ACP main sessions rehydrate instead of failing closed with `Session is not ACP-enabled: main`. (#43285, fixes #25692)
|
||||
- Agents/context-engine compaction: guard thrown engine-owned overflow compaction attempts and fire compaction hooks for `ownsCompaction` engines so overflow recovery no longer crashes and plugin subscribers still observe compact runs. (#41361) thanks @davidrudduck.
|
||||
|
||||
## 2026.3.8
|
||||
|
||||
@@ -4036,6 +4037,7 @@ Thanks @AlexMikhalev, @CoreyH, @John-Rood, @KrauseFx, @MaudeBot, @Nachx639, @Nic
|
||||
- Gateway/Daemon/Doctor: atomic config writes; repair gateway service entrypoint + install switches; non-interactive legacy migrations; systemd unit alignment + KillMode=process; node bridge keepalive/pings; Launch at Login persistence; bundle MoltbotKit resources + Swift 6.2 compat dylib; relay version check + remove smoke test; regen Swift GatewayModels + keep agent provider string; cron jobId alias + channel alias migration + main session key normalization; heartbeat Telegram accountId resolution; avoid WhatsApp fallback for internal runs; gateway listener error wording; serveBaseUrl param; honor gateway --dev; fix wide-area discovery updates; align agents.defaults schema; provider account metadata in daemon status; refresh Carbon patch for gateway fixes; restore doctor prompter initialValue handling.
|
||||
- Control UI/TUI: persist per-session verbose off + hide tool cards; logs tab opens at bottom; relative asset paths + landing cleanup; session labels lookup/persistence; stop pinning main session in recents; start logs at bottom; TUI status bar refresh + timeout handling + hide reasoning label when off.
|
||||
- Onboarding/Configure: QuickStart single-select provider picker; avoid Codex CLI false-expiry warnings; clarify WhatsApp owner prompt; fix Minimax hosted onboarding (agents.defaults + msteams heartbeat target); remove configure Control UI prompt; honor gateway --dev flag.
|
||||
- Agent loop: guard overflow compaction throws and restore compaction hooks for engine-owned context engines. (#41361) — thanks @davidrudduck
|
||||
|
||||
### Maintenance
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ const {
|
||||
sessionCompactImpl,
|
||||
triggerInternalHook,
|
||||
sanitizeSessionHistoryMock,
|
||||
contextEngineCompactMock,
|
||||
} = vi.hoisted(() => ({
|
||||
hookRunner: {
|
||||
hasHooks: vi.fn(),
|
||||
@@ -28,6 +29,14 @@ const {
|
||||
})),
|
||||
triggerInternalHook: vi.fn(),
|
||||
sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages),
|
||||
contextEngineCompactMock: vi.fn(async () => ({
|
||||
ok: true as boolean,
|
||||
compacted: true as boolean,
|
||||
reason: undefined as string | undefined,
|
||||
result: { summary: "engine-summary", tokensAfter: 50 } as
|
||||
| { summary: string; tokensAfter: number }
|
||||
| undefined,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../../plugins/hook-runner-global.js", () => ({
|
||||
@@ -123,6 +132,27 @@ vi.mock("../session-write-lock.js", () => ({
|
||||
resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 0),
|
||||
}));
|
||||
|
||||
vi.mock("../../context-engine/index.js", () => ({
|
||||
ensureContextEnginesInitialized: vi.fn(),
|
||||
resolveContextEngine: vi.fn(async () => ({
|
||||
info: { ownsCompaction: true },
|
||||
compact: contextEngineCompactMock,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../../process/command-queue.js", () => ({
|
||||
enqueueCommandInLane: vi.fn((_lane: unknown, task: () => unknown) => task()),
|
||||
}));
|
||||
|
||||
vi.mock("./lanes.js", () => ({
|
||||
resolveSessionLane: vi.fn(() => "test-session-lane"),
|
||||
resolveGlobalLane: vi.fn(() => "test-global-lane"),
|
||||
}));
|
||||
|
||||
vi.mock("../context-window-guard.js", () => ({
|
||||
resolveContextWindowInfo: vi.fn(() => ({ tokens: 128_000 })),
|
||||
}));
|
||||
|
||||
vi.mock("../bootstrap-files.js", () => ({
|
||||
makeBootstrapWarn: vi.fn(() => () => {}),
|
||||
resolveBootstrapContextForRun: vi.fn(async () => ({ contextFiles: [] })),
|
||||
@@ -160,7 +190,7 @@ vi.mock("../transcript-policy.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./extensions.js", () => ({
|
||||
buildEmbeddedExtensionFactories: vi.fn(() => []),
|
||||
buildEmbeddedExtensionFactories: vi.fn(() => ({ factories: [] })),
|
||||
}));
|
||||
|
||||
vi.mock("./history.js", () => ({
|
||||
@@ -251,7 +281,7 @@ vi.mock("./utils.js", () => ({
|
||||
|
||||
import { getApiProvider, unregisterApiProviders } from "@mariozechner/pi-ai";
|
||||
import { getCustomApiRegistrySourceId } from "../custom-api-registry.js";
|
||||
import { compactEmbeddedPiSessionDirect } from "./compact.js";
|
||||
import { compactEmbeddedPiSessionDirect, compactEmbeddedPiSession } from "./compact.js";
|
||||
|
||||
const sessionHook = (action: string) =>
|
||||
triggerInternalHook.mock.calls.find(
|
||||
@@ -436,3 +466,103 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
|
||||
beforeEach(() => {
|
||||
hookRunner.hasHooks.mockReset();
|
||||
hookRunner.runBeforeCompaction.mockReset();
|
||||
hookRunner.runAfterCompaction.mockReset();
|
||||
contextEngineCompactMock.mockReset();
|
||||
contextEngineCompactMock.mockResolvedValue({
|
||||
ok: true,
|
||||
compacted: true,
|
||||
reason: undefined,
|
||||
result: { summary: "engine-summary", tokensAfter: 50 },
|
||||
});
|
||||
resolveModelMock.mockReset();
|
||||
resolveModelMock.mockReturnValue({
|
||||
model: { provider: "openai", api: "responses", id: "fake", input: [] },
|
||||
error: null,
|
||||
authStorage: { setRuntimeApiKey: vi.fn() },
|
||||
modelRegistry: {},
|
||||
});
|
||||
});
|
||||
|
||||
it("fires before_compaction with sentinel -1 and after_compaction on success", async () => {
|
||||
hookRunner.hasHooks.mockReturnValue(true);
|
||||
|
||||
const result = await compactEmbeddedPiSession({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/tmp",
|
||||
messageChannel: "telegram",
|
||||
customInstructions: "focus on decisions",
|
||||
enqueue: (task) => task(),
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.compacted).toBe(true);
|
||||
|
||||
expect(hookRunner.runBeforeCompaction).toHaveBeenCalledWith(
|
||||
{ messageCount: -1, sessionFile: "/tmp/session.jsonl" },
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:session-1",
|
||||
messageProvider: "telegram",
|
||||
}),
|
||||
);
|
||||
expect(hookRunner.runAfterCompaction).toHaveBeenCalledWith(
|
||||
{
|
||||
messageCount: -1,
|
||||
compactedCount: -1,
|
||||
tokenCount: 50,
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
},
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:session-1",
|
||||
messageProvider: "telegram",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not fire after_compaction when compaction fails", async () => {
|
||||
hookRunner.hasHooks.mockReturnValue(true);
|
||||
contextEngineCompactMock.mockResolvedValue({
|
||||
ok: false,
|
||||
compacted: false,
|
||||
reason: "nothing to compact",
|
||||
result: undefined,
|
||||
});
|
||||
|
||||
const result = await compactEmbeddedPiSession({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/tmp",
|
||||
customInstructions: "focus on decisions",
|
||||
enqueue: (task) => task(),
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(false);
|
||||
expect(hookRunner.runBeforeCompaction).toHaveBeenCalled();
|
||||
expect(hookRunner.runAfterCompaction).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("catches and logs hook exceptions without aborting compaction", async () => {
|
||||
hookRunner.hasHooks.mockReturnValue(true);
|
||||
hookRunner.runBeforeCompaction.mockRejectedValue(new Error("hook boom"));
|
||||
|
||||
const result = await compactEmbeddedPiSession({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
workspaceDir: "/tmp",
|
||||
customInstructions: "focus on decisions",
|
||||
enqueue: (task) => task(),
|
||||
});
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
expect(result.compacted).toBe(true);
|
||||
expect(contextEngineCompactMock).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -936,6 +936,43 @@ export async function compactEmbeddedPiSession(
|
||||
modelContextWindow: ceModel?.contextWindow,
|
||||
defaultTokens: DEFAULT_CONTEXT_TOKENS,
|
||||
});
|
||||
// When the context engine owns compaction, its compact() implementation
|
||||
// bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally).
|
||||
// Fire before_compaction / after_compaction hooks here so plugin subscribers
|
||||
// are notified regardless of which engine is active.
|
||||
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
|
||||
const hookRunner = engineOwnsCompaction ? getGlobalHookRunner() : null;
|
||||
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
|
||||
const { sessionAgentId } = resolveSessionAgentIds({
|
||||
sessionKey: params.sessionKey,
|
||||
config: params.config,
|
||||
});
|
||||
const resolvedMessageProvider = params.messageChannel ?? params.messageProvider;
|
||||
const hookCtx = {
|
||||
sessionId: params.sessionId,
|
||||
agentId: sessionAgentId,
|
||||
sessionKey: hookSessionKey,
|
||||
workspaceDir: resolveUserPath(params.workspaceDir),
|
||||
messageProvider: resolvedMessageProvider,
|
||||
};
|
||||
// Engine-owned compaction doesn't load the transcript at this level, so
|
||||
// message counts are unavailable. We pass sessionFile so hook subscribers
|
||||
// can read the transcript themselves if they need exact counts.
|
||||
if (hookRunner?.hasHooks("before_compaction")) {
|
||||
try {
|
||||
await hookRunner.runBeforeCompaction(
|
||||
{
|
||||
messageCount: -1,
|
||||
sessionFile: params.sessionFile,
|
||||
},
|
||||
hookCtx,
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn("before_compaction hook failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
const result = await contextEngine.compact({
|
||||
sessionId: params.sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
@@ -944,6 +981,23 @@ export async function compactEmbeddedPiSession(
|
||||
force: params.trigger === "manual",
|
||||
runtimeContext: params as Record<string, unknown>,
|
||||
});
|
||||
if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) {
|
||||
try {
|
||||
await hookRunner.runAfterCompaction(
|
||||
{
|
||||
messageCount: -1,
|
||||
compactedCount: -1,
|
||||
tokenCount: result.result?.tokensAfter,
|
||||
sessionFile: params.sessionFile,
|
||||
},
|
||||
hookCtx,
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn("after_compaction hook failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
return {
|
||||
ok: result.ok,
|
||||
compacted: result.compacted,
|
||||
|
||||
@@ -9,16 +9,18 @@ export function makeOverflowError(message: string = DEFAULT_OVERFLOW_ERROR_MESSA
|
||||
|
||||
export function makeCompactionSuccess(params: {
|
||||
summary: string;
|
||||
firstKeptEntryId: string;
|
||||
tokensBefore: number;
|
||||
firstKeptEntryId?: string;
|
||||
tokensBefore?: number;
|
||||
tokensAfter?: number;
|
||||
}) {
|
||||
return {
|
||||
ok: true as const,
|
||||
compacted: true as const,
|
||||
result: {
|
||||
summary: params.summary,
|
||||
firstKeptEntryId: params.firstKeptEntryId,
|
||||
tokensBefore: params.tokensBefore,
|
||||
...(params.firstKeptEntryId ? { firstKeptEntryId: params.firstKeptEntryId } : {}),
|
||||
...(params.tokensBefore !== undefined ? { tokensBefore: params.tokensBefore } : {}),
|
||||
...(params.tokensAfter !== undefined ? { tokensAfter: params.tokensAfter } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -55,8 +57,9 @@ type MockCompactDirect = {
|
||||
compacted: true;
|
||||
result: {
|
||||
summary: string;
|
||||
firstKeptEntryId: string;
|
||||
tokensBefore: number;
|
||||
firstKeptEntryId?: string;
|
||||
tokensBefore?: number;
|
||||
tokensAfter?: number;
|
||||
};
|
||||
}) => unknown;
|
||||
};
|
||||
|
||||
@@ -2,9 +2,13 @@ import "./run.overflow-compaction.mocks.shared.js";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { isCompactionFailureError, isLikelyContextOverflowError } from "../pi-embedded-helpers.js";
|
||||
|
||||
vi.mock("../../utils.js", () => ({
|
||||
resolveUserPath: vi.fn((p: string) => p),
|
||||
}));
|
||||
vi.mock(import("../../utils.js"), async (importOriginal) => {
|
||||
const actual = await importOriginal();
|
||||
return {
|
||||
...actual,
|
||||
resolveUserPath: vi.fn((p: string) => p),
|
||||
};
|
||||
});
|
||||
|
||||
import { log } from "./logger.js";
|
||||
import { runEmbeddedPiAgent } from "./run.js";
|
||||
@@ -16,6 +20,7 @@ import {
|
||||
queueOverflowAttemptWithOversizedToolOutput,
|
||||
} from "./run.overflow-compaction.fixture.js";
|
||||
import {
|
||||
mockedContextEngine,
|
||||
mockedCompactDirect,
|
||||
mockedRunEmbeddedAttempt,
|
||||
mockedSessionLikelyHasOversizedToolResults,
|
||||
@@ -30,6 +35,11 @@ const mockedIsLikelyContextOverflowError = vi.mocked(isLikelyContextOverflowErro
|
||||
describe("overflow compaction in run loop", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockedRunEmbeddedAttempt.mockReset();
|
||||
mockedCompactDirect.mockReset();
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReset();
|
||||
mockedTruncateOversizedToolResultsInSession.mockReset();
|
||||
mockedContextEngine.info.ownsCompaction = false;
|
||||
mockedIsCompactionFailureError.mockImplementation((msg?: string) => {
|
||||
if (!msg) {
|
||||
return false;
|
||||
@@ -72,7 +82,9 @@ describe("overflow compaction in run loop", () => {
|
||||
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedCompactDirect).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ authProfileId: "test-profile" }),
|
||||
expect.objectContaining({
|
||||
runtimeContext: expect.objectContaining({ authProfileId: "test-profile" }),
|
||||
}),
|
||||
);
|
||||
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2);
|
||||
expect(log.warn).toHaveBeenCalledWith(
|
||||
|
||||
@@ -6,6 +6,25 @@ import type {
|
||||
PluginHookBeforePromptBuildResult,
|
||||
} from "../../plugins/types.js";
|
||||
|
||||
type MockCompactionResult =
|
||||
| {
|
||||
ok: true;
|
||||
compacted: true;
|
||||
result: {
|
||||
summary: string;
|
||||
firstKeptEntryId?: string;
|
||||
tokensBefore?: number;
|
||||
tokensAfter?: number;
|
||||
};
|
||||
reason?: string;
|
||||
}
|
||||
| {
|
||||
ok: false;
|
||||
compacted: false;
|
||||
reason: string;
|
||||
result?: undefined;
|
||||
};
|
||||
|
||||
export const mockedGlobalHookRunner = {
|
||||
hasHooks: vi.fn((_hookName: string) => false),
|
||||
runBeforeAgentStart: vi.fn(
|
||||
@@ -26,12 +45,35 @@ export const mockedGlobalHookRunner = {
|
||||
_ctx: PluginHookAgentContext,
|
||||
): Promise<PluginHookBeforeModelResolveResult | undefined> => undefined,
|
||||
),
|
||||
runBeforeCompaction: vi.fn(async () => undefined),
|
||||
runAfterCompaction: vi.fn(async () => undefined),
|
||||
};
|
||||
|
||||
export const mockedContextEngine = {
|
||||
info: { ownsCompaction: false as boolean },
|
||||
compact: vi.fn<(params: unknown) => Promise<MockCompactionResult>>(async () => ({
|
||||
ok: false as const,
|
||||
compacted: false as const,
|
||||
reason: "nothing to compact",
|
||||
})),
|
||||
};
|
||||
|
||||
export const mockedContextEngineCompact = vi.mocked(mockedContextEngine.compact);
|
||||
export const mockedEnsureRuntimePluginsLoaded: (...args: unknown[]) => void = vi.fn();
|
||||
|
||||
vi.mock("../../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: vi.fn(() => mockedGlobalHookRunner),
|
||||
}));
|
||||
|
||||
vi.mock("../../context-engine/index.js", () => ({
|
||||
ensureContextEnginesInitialized: vi.fn(),
|
||||
resolveContextEngine: vi.fn(async () => mockedContextEngine),
|
||||
}));
|
||||
|
||||
vi.mock("../runtime-plugins.js", () => ({
|
||||
ensureRuntimePluginsLoaded: mockedEnsureRuntimePluginsLoaded,
|
||||
}));
|
||||
|
||||
vi.mock("../auth-profiles.js", () => ({
|
||||
isProfileInCooldown: vi.fn(() => false),
|
||||
markAuthProfileFailure: vi.fn(async () => {}),
|
||||
@@ -141,9 +183,13 @@ vi.mock("../../process/command-queue.js", () => ({
|
||||
enqueueCommandInLane: vi.fn((_lane: string, task: () => unknown) => task()),
|
||||
}));
|
||||
|
||||
vi.mock("../../utils/message-channel.js", () => ({
|
||||
isMarkdownCapableMessageChannel: vi.fn(() => true),
|
||||
}));
|
||||
vi.mock(import("../../utils/message-channel.js"), async (importOriginal) => {
|
||||
const actual = await importOriginal();
|
||||
return {
|
||||
...actual,
|
||||
isMarkdownCapableMessageChannel: vi.fn(() => true),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../agent-paths.js", () => ({
|
||||
resolveOpenClawAgentDir: vi.fn(() => "/tmp/agent-dir"),
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import { vi } from "vitest";
|
||||
import { compactEmbeddedPiSessionDirect } from "./compact.js";
|
||||
import {
|
||||
mockedContextEngine,
|
||||
mockedContextEngineCompact,
|
||||
} from "./run.overflow-compaction.mocks.shared.js";
|
||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||
import {
|
||||
sessionLikelyHasOversizedToolResults,
|
||||
@@ -7,13 +10,14 @@ import {
|
||||
} from "./tool-result-truncation.js";
|
||||
|
||||
export const mockedRunEmbeddedAttempt = vi.mocked(runEmbeddedAttempt);
|
||||
export const mockedCompactDirect = vi.mocked(compactEmbeddedPiSessionDirect);
|
||||
export const mockedCompactDirect = mockedContextEngineCompact;
|
||||
export const mockedSessionLikelyHasOversizedToolResults = vi.mocked(
|
||||
sessionLikelyHasOversizedToolResults,
|
||||
);
|
||||
export const mockedTruncateOversizedToolResultsInSession = vi.mocked(
|
||||
truncateOversizedToolResultsInSession,
|
||||
);
|
||||
export { mockedContextEngine };
|
||||
|
||||
export const overflowBaseRunParams = {
|
||||
sessionId: "test-session",
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
} from "./run.overflow-compaction.fixture.js";
|
||||
import { mockedGlobalHookRunner } from "./run.overflow-compaction.mocks.shared.js";
|
||||
import {
|
||||
mockedContextEngine,
|
||||
mockedCompactDirect,
|
||||
mockedRunEmbeddedAttempt,
|
||||
mockedSessionLikelyHasOversizedToolResults,
|
||||
@@ -22,6 +23,25 @@ const mockedPickFallbackThinkingLevel = vi.mocked(pickFallbackThinkingLevel);
|
||||
describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockedRunEmbeddedAttempt.mockReset();
|
||||
mockedCompactDirect.mockReset();
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReset();
|
||||
mockedTruncateOversizedToolResultsInSession.mockReset();
|
||||
mockedGlobalHookRunner.runBeforeAgentStart.mockReset();
|
||||
mockedGlobalHookRunner.runBeforeCompaction.mockReset();
|
||||
mockedGlobalHookRunner.runAfterCompaction.mockReset();
|
||||
mockedContextEngine.info.ownsCompaction = false;
|
||||
mockedCompactDirect.mockResolvedValue({
|
||||
ok: false,
|
||||
compacted: false,
|
||||
reason: "nothing to compact",
|
||||
});
|
||||
mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false);
|
||||
mockedTruncateOversizedToolResultsInSession.mockResolvedValue({
|
||||
truncated: false,
|
||||
truncatedCount: 0,
|
||||
reason: "no oversized tool results",
|
||||
});
|
||||
mockedGlobalHookRunner.hasHooks.mockImplementation(() => false);
|
||||
});
|
||||
|
||||
@@ -81,8 +101,12 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedCompactDirect).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
trigger: "overflow",
|
||||
authProfileId: "test-profile",
|
||||
sessionId: "test-session",
|
||||
sessionFile: "/tmp/session.json",
|
||||
runtimeContext: expect.objectContaining({
|
||||
trigger: "overflow",
|
||||
authProfileId: "test-profile",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
@@ -132,6 +156,63 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
|
||||
expect(result.meta.error?.kind).toBe("context_overflow");
|
||||
});
|
||||
|
||||
it("fires compaction hooks during overflow recovery for ownsCompaction engines", async () => {
|
||||
mockedContextEngine.info.ownsCompaction = true;
|
||||
mockedGlobalHookRunner.hasHooks.mockImplementation(
|
||||
(hookName) => hookName === "before_compaction" || hookName === "after_compaction",
|
||||
);
|
||||
mockedRunEmbeddedAttempt
|
||||
.mockResolvedValueOnce(makeAttemptResult({ promptError: makeOverflowError() }))
|
||||
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
|
||||
mockedCompactDirect.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
compacted: true,
|
||||
result: {
|
||||
summary: "engine-owned compaction",
|
||||
tokensAfter: 50,
|
||||
},
|
||||
});
|
||||
|
||||
await runEmbeddedPiAgent(overflowBaseRunParams);
|
||||
|
||||
expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledWith(
|
||||
{ messageCount: -1, sessionFile: "/tmp/session.json" },
|
||||
expect.objectContaining({
|
||||
sessionKey: "test-key",
|
||||
}),
|
||||
);
|
||||
expect(mockedGlobalHookRunner.runAfterCompaction).toHaveBeenCalledWith(
|
||||
{
|
||||
messageCount: -1,
|
||||
compactedCount: -1,
|
||||
tokenCount: 50,
|
||||
sessionFile: "/tmp/session.json",
|
||||
},
|
||||
expect.objectContaining({
|
||||
sessionKey: "test-key",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("guards thrown engine-owned overflow compaction attempts", async () => {
|
||||
mockedContextEngine.info.ownsCompaction = true;
|
||||
mockedGlobalHookRunner.hasHooks.mockImplementation(
|
||||
(hookName) => hookName === "before_compaction" || hookName === "after_compaction",
|
||||
);
|
||||
mockedRunEmbeddedAttempt.mockResolvedValueOnce(
|
||||
makeAttemptResult({ promptError: makeOverflowError() }),
|
||||
);
|
||||
mockedCompactDirect.mockRejectedValueOnce(new Error("engine boom"));
|
||||
|
||||
const result = await runEmbeddedPiAgent(overflowBaseRunParams);
|
||||
|
||||
expect(mockedCompactDirect).toHaveBeenCalledTimes(1);
|
||||
expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledTimes(1);
|
||||
expect(mockedGlobalHookRunner.runAfterCompaction).not.toHaveBeenCalled();
|
||||
expect(result.meta.error?.kind).toBe("context_overflow");
|
||||
expect(result.payloads?.[0]?.isError).toBe(true);
|
||||
});
|
||||
|
||||
it("returns retry_limit when repeated retries never converge", async () => {
|
||||
mockedRunEmbeddedAttempt.mockClear();
|
||||
mockedCompactDirect.mockClear();
|
||||
|
||||
@@ -1028,37 +1028,84 @@ export async function runEmbeddedPiAgent(
|
||||
log.warn(
|
||||
`context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`,
|
||||
);
|
||||
const compactResult = await contextEngine.compact({
|
||||
sessionId: params.sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
tokenBudget: ctxInfo.tokens,
|
||||
force: true,
|
||||
compactionTarget: "budget",
|
||||
runtimeContext: {
|
||||
sessionKey: params.sessionKey,
|
||||
messageChannel: params.messageChannel,
|
||||
messageProvider: params.messageProvider,
|
||||
agentAccountId: params.agentAccountId,
|
||||
authProfileId: lastProfileId,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
agentDir,
|
||||
config: params.config,
|
||||
skillsSnapshot: params.skillsSnapshot,
|
||||
senderIsOwner: params.senderIsOwner,
|
||||
provider,
|
||||
model: modelId,
|
||||
runId: params.runId,
|
||||
thinkLevel,
|
||||
reasoningLevel: params.reasoningLevel,
|
||||
bashElevated: params.bashElevated,
|
||||
extraSystemPrompt: params.extraSystemPrompt,
|
||||
ownerNumbers: params.ownerNumbers,
|
||||
trigger: "overflow",
|
||||
diagId: overflowDiagId,
|
||||
attempt: overflowCompactionAttempts,
|
||||
maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS,
|
||||
},
|
||||
});
|
||||
let compactResult: Awaited<ReturnType<typeof contextEngine.compact>>;
|
||||
// When the engine owns compaction, hooks are not fired inside
|
||||
// compactEmbeddedPiSessionDirect (which is bypassed). Fire them
|
||||
// here so subscribers (memory extensions, usage trackers) are
|
||||
// notified even on overflow-recovery compactions.
|
||||
const overflowEngineOwnsCompaction = contextEngine.info.ownsCompaction === true;
|
||||
const overflowHookRunner = overflowEngineOwnsCompaction ? hookRunner : null;
|
||||
if (overflowHookRunner?.hasHooks("before_compaction")) {
|
||||
try {
|
||||
await overflowHookRunner.runBeforeCompaction(
|
||||
{ messageCount: -1, sessionFile: params.sessionFile },
|
||||
hookCtx,
|
||||
);
|
||||
} catch (hookErr) {
|
||||
log.warn(
|
||||
`before_compaction hook failed during overflow recovery: ${String(hookErr)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
try {
|
||||
compactResult = await contextEngine.compact({
|
||||
sessionId: params.sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
tokenBudget: ctxInfo.tokens,
|
||||
force: true,
|
||||
compactionTarget: "budget",
|
||||
runtimeContext: {
|
||||
sessionKey: params.sessionKey,
|
||||
messageChannel: params.messageChannel,
|
||||
messageProvider: params.messageProvider,
|
||||
agentAccountId: params.agentAccountId,
|
||||
authProfileId: lastProfileId,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
agentDir,
|
||||
config: params.config,
|
||||
skillsSnapshot: params.skillsSnapshot,
|
||||
senderIsOwner: params.senderIsOwner,
|
||||
provider,
|
||||
model: modelId,
|
||||
runId: params.runId,
|
||||
thinkLevel,
|
||||
reasoningLevel: params.reasoningLevel,
|
||||
bashElevated: params.bashElevated,
|
||||
extraSystemPrompt: params.extraSystemPrompt,
|
||||
ownerNumbers: params.ownerNumbers,
|
||||
trigger: "overflow",
|
||||
diagId: overflowDiagId,
|
||||
attempt: overflowCompactionAttempts,
|
||||
maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS,
|
||||
},
|
||||
});
|
||||
} catch (compactErr) {
|
||||
log.warn(
|
||||
`contextEngine.compact() threw during overflow recovery for ${provider}/${modelId}: ${String(compactErr)}`,
|
||||
);
|
||||
compactResult = { ok: false, compacted: false, reason: String(compactErr) };
|
||||
}
|
||||
if (
|
||||
compactResult.ok &&
|
||||
compactResult.compacted &&
|
||||
overflowHookRunner?.hasHooks("after_compaction")
|
||||
) {
|
||||
try {
|
||||
await overflowHookRunner.runAfterCompaction(
|
||||
{
|
||||
messageCount: -1,
|
||||
compactedCount: -1,
|
||||
tokenCount: compactResult.result?.tokensAfter,
|
||||
sessionFile: params.sessionFile,
|
||||
},
|
||||
hookCtx,
|
||||
);
|
||||
} catch (hookErr) {
|
||||
log.warn(
|
||||
`after_compaction hook failed during overflow recovery: ${String(hookErr)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (compactResult.compacted) {
|
||||
autoCompactionCount += 1;
|
||||
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);
|
||||
|
||||
Reference in New Issue
Block a user