diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index 3c870f017..b97408c91 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -1,20 +1,7 @@ import { describe, expect, it } from "vitest"; -import type { OpenClawConfig } from "../../config/config.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; import { createAcpReplyProjector } from "./acp-projector.js"; - -function createCfg(overrides?: Partial): OpenClawConfig { - return { - acp: { - enabled: true, - stream: { - coalesceIdleMs: 0, - maxChunkChars: 64, - }, - }, - ...overrides, - } as OpenClawConfig; -} +import { createAcpTestConfig as createCfg } from "./test-fixtures/acp-runtime.js"; describe("createAcpReplyProjector", () => { it("coalesces text deltas into bounded block chunks", async () => { diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index e1d5cbc9a..8d0083cef 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -4,34 +4,16 @@ import { formatToolSummary, resolveToolDisplay } from "../../agents/tool-display import type { OpenClawConfig } from "../../config/config.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; import type { ReplyPayload } from "../types.js"; +import { + isAcpTagVisible, + resolveAcpProjectionSettings, + resolveAcpStreamingConfig, +} from "./acp-stream-settings.js"; import { createBlockReplyPipeline } from "./block-reply-pipeline.js"; -import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js"; import type { ReplyDispatchKind } from "./reply-dispatcher.js"; -const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350; -const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800; -const DEFAULT_ACP_META_MODE = "minimal"; -const DEFAULT_ACP_SHOW_USAGE = false; -const DEFAULT_ACP_DELIVERY_MODE = "live"; -const DEFAULT_ACP_MAX_TURN_CHARS = 24_000; -const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320; -const DEFAULT_ACP_MAX_STATUS_CHARS = 320; -const DEFAULT_ACP_MAX_META_EVENTS_PER_TURN = 64; const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000; -const ACP_TAG_VISIBILITY_DEFAULTS: Record = { - agent_message_chunk: true, - tool_call: true, - tool_call_update: true, - usage_update: false, - available_commands_update: false, - current_mode_update: false, - config_option_update: false, - session_info_update: false, - plan: false, - agent_thought_chunk: false, -}; - const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]); export type AcpProjectedDeliveryMeta = { @@ -41,125 +23,12 @@ export type AcpProjectedDeliveryMeta = { allowEdit?: boolean; }; -type AcpDeliveryMode = "live" | "final_only"; -type AcpMetaMode = "off" | "minimal" | "verbose"; - -type AcpProjectionSettings = { - deliveryMode: AcpDeliveryMode; - metaMode: AcpMetaMode; - showUsage: boolean; - maxTurnChars: number; - maxToolSummaryChars: number; - maxStatusChars: number; - maxMetaEventsPerTurn: number; - tagVisibility: Partial>; -}; - type ToolLifecycleState = { started: boolean; terminal: boolean; lastRenderedHash?: string; }; -function clampPositiveInteger( - value: unknown, - fallback: number, - bounds: { min: number; max: number }, -): number { - if (typeof value !== "number" || !Number.isFinite(value)) { - return fallback; - } - const rounded = Math.round(value); - if (rounded < bounds.min) { - return bounds.min; - } - if (rounded > bounds.max) { - return bounds.max; - } - return rounded; -} - -function clampBoolean(value: unknown, fallback: boolean): boolean { - return typeof value === "boolean" ? value : fallback; -} - -function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode { - return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE; -} - -function resolveAcpMetaMode(value: unknown): AcpMetaMode { - if (value === "off" || value === "minimal" || value === "verbose") { - return value; - } - return DEFAULT_ACP_META_MODE; -} - -function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number { - return clampPositiveInteger( - cfg.acp?.stream?.coalesceIdleMs, - DEFAULT_ACP_STREAM_COALESCE_IDLE_MS, - { - min: 0, - max: 5_000, - }, - ); -} - -function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number { - return clampPositiveInteger(cfg.acp?.stream?.maxChunkChars, DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS, { - min: 50, - max: 4_000, - }); -} - -function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings { - const stream = cfg.acp?.stream; - return { - deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode), - metaMode: resolveAcpMetaMode(stream?.metaMode), - showUsage: clampBoolean(stream?.showUsage, DEFAULT_ACP_SHOW_USAGE), - maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, { - min: 1, - max: 500_000, - }), - maxToolSummaryChars: clampPositiveInteger( - stream?.maxToolSummaryChars, - DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS, - { - min: 64, - max: 8_000, - }, - ), - maxStatusChars: clampPositiveInteger(stream?.maxStatusChars, DEFAULT_ACP_MAX_STATUS_CHARS, { - min: 64, - max: 8_000, - }), - maxMetaEventsPerTurn: clampPositiveInteger( - stream?.maxMetaEventsPerTurn, - DEFAULT_ACP_MAX_META_EVENTS_PER_TURN, - { - min: 1, - max: 2_000, - }, - ), - tagVisibility: stream?.tagVisibility ?? {}, - }; -} - -function resolveAcpStreamingConfig(params: { - cfg: OpenClawConfig; - provider?: string; - accountId?: string; -}) { - return resolveEffectiveBlockStreamingConfig({ - cfg: params.cfg, - provider: params.provider, - accountId: params.accountId, - maxChunkChars: resolveAcpStreamMaxChunkChars(params.cfg), - coalesceIdleMs: resolveAcpStreamCoalesceIdleMs(params.cfg), - }); -} - function truncateText(input: string, maxChars: number): string { if (input.length <= maxChars) { return input; @@ -182,23 +51,6 @@ function normalizeToolStatus(status: string | undefined): string | undefined { return normalized || undefined; } -function isTagVisible( - settings: AcpProjectionSettings, - tag: AcpSessionUpdateTag | undefined, -): boolean { - if (!tag) { - return true; - } - const override = settings.tagVisibility[tag]; - if (typeof override === "boolean") { - return override; - } - if (Object.prototype.hasOwnProperty.call(ACP_TAG_VISIBILITY_DEFAULTS, tag)) { - return ACP_TAG_VISIBILITY_DEFAULTS[tag]; - } - return true; -} - function renderToolSummaryText(event: Extract): string { const detailParts: string[] = []; const title = event.title?.trim(); @@ -335,7 +187,7 @@ export function createAcpReplyProjector(params: { if (!params.shouldSendToolSummaries || settings.metaMode === "off") { return; } - if (!isTagVisible(settings, event.tag)) { + if (!isAcpTagVisible(settings, event.tag)) { return; } @@ -419,7 +271,7 @@ export function createAcpReplyProjector(params: { if (event.stream && event.stream !== "output") { return; } - if (!isTagVisible(settings, event.tag)) { + if (!isAcpTagVisible(settings, event.tag)) { return; } const text = event.text; @@ -444,7 +296,7 @@ export function createAcpReplyProjector(params: { } if (event.type === "status") { - if (!isTagVisible(settings, event.tag)) { + if (!isAcpTagVisible(settings, event.tag)) { return; } if (event.tag === "usage_update") { diff --git a/src/auto-reply/reply/acp-stream-settings.test.ts b/src/auto-reply/reply/acp-stream-settings.test.ts new file mode 100644 index 000000000..950b07b27 --- /dev/null +++ b/src/auto-reply/reply/acp-stream-settings.test.ts @@ -0,0 +1,77 @@ +import { describe, expect, it } from "vitest"; +import { + isAcpTagVisible, + resolveAcpProjectionSettings, + resolveAcpStreamingConfig, +} from "./acp-stream-settings.js"; +import { createAcpTestConfig } from "./test-fixtures/acp-runtime.js"; + +describe("acp stream settings", () => { + it("resolves stable defaults", () => { + const settings = resolveAcpProjectionSettings(createAcpTestConfig()); + expect(settings.deliveryMode).toBe("live"); + expect(settings.metaMode).toBe("minimal"); + expect(settings.showUsage).toBe(false); + expect(settings.maxTurnChars).toBe(24_000); + expect(settings.maxMetaEventsPerTurn).toBe(64); + }); + + it("applies explicit stream overrides", () => { + const settings = resolveAcpProjectionSettings( + createAcpTestConfig({ + acp: { + enabled: true, + stream: { + deliveryMode: "final_only", + metaMode: "off", + showUsage: true, + maxTurnChars: 500, + maxMetaEventsPerTurn: 7, + tagVisibility: { + usage_update: true, + }, + }, + }, + }), + ); + expect(settings.deliveryMode).toBe("final_only"); + expect(settings.metaMode).toBe("off"); + expect(settings.showUsage).toBe(true); + expect(settings.maxTurnChars).toBe(500); + expect(settings.maxMetaEventsPerTurn).toBe(7); + expect(settings.tagVisibility.usage_update).toBe(true); + }); + + it("uses default tag visibility when no override is provided", () => { + const settings = resolveAcpProjectionSettings(createAcpTestConfig()); + expect(isAcpTagVisible(settings, "tool_call")).toBe(true); + expect(isAcpTagVisible(settings, "usage_update")).toBe(false); + }); + + it("respects tag visibility overrides", () => { + const settings = resolveAcpProjectionSettings( + createAcpTestConfig({ + acp: { + enabled: true, + stream: { + tagVisibility: { + usage_update: true, + tool_call: false, + }, + }, + }, + }), + ); + expect(isAcpTagVisible(settings, "usage_update")).toBe(true); + expect(isAcpTagVisible(settings, "tool_call")).toBe(false); + }); + + it("resolves chunking/coalescing from ACP stream controls", () => { + const streaming = resolveAcpStreamingConfig({ + cfg: createAcpTestConfig(), + provider: "discord", + }); + expect(streaming.chunking.maxChars).toBe(64); + expect(streaming.coalescing.idleMs).toBe(0); + }); +}); diff --git a/src/auto-reply/reply/acp-stream-settings.ts b/src/auto-reply/reply/acp-stream-settings.ts new file mode 100644 index 000000000..6655ba9f3 --- /dev/null +++ b/src/auto-reply/reply/acp-stream-settings.ts @@ -0,0 +1,156 @@ +import type { AcpSessionUpdateTag } from "../../acp/runtime/types.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js"; + +const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350; +const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800; +const DEFAULT_ACP_META_MODE = "minimal"; +const DEFAULT_ACP_SHOW_USAGE = false; +const DEFAULT_ACP_DELIVERY_MODE = "live"; +const DEFAULT_ACP_MAX_TURN_CHARS = 24_000; +const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320; +const DEFAULT_ACP_MAX_STATUS_CHARS = 320; +const DEFAULT_ACP_MAX_META_EVENTS_PER_TURN = 64; + +export const ACP_TAG_VISIBILITY_DEFAULTS: Record = { + agent_message_chunk: true, + tool_call: true, + tool_call_update: true, + usage_update: false, + available_commands_update: false, + current_mode_update: false, + config_option_update: false, + session_info_update: false, + plan: false, + agent_thought_chunk: false, +}; + +export type AcpDeliveryMode = "live" | "final_only"; +export type AcpMetaMode = "off" | "minimal" | "verbose"; + +export type AcpProjectionSettings = { + deliveryMode: AcpDeliveryMode; + metaMode: AcpMetaMode; + showUsage: boolean; + maxTurnChars: number; + maxToolSummaryChars: number; + maxStatusChars: number; + maxMetaEventsPerTurn: number; + tagVisibility: Partial>; +}; + +function clampPositiveInteger( + value: unknown, + fallback: number, + bounds: { min: number; max: number }, +): number { + if (typeof value !== "number" || !Number.isFinite(value)) { + return fallback; + } + const rounded = Math.round(value); + if (rounded < bounds.min) { + return bounds.min; + } + if (rounded > bounds.max) { + return bounds.max; + } + return rounded; +} + +function clampBoolean(value: unknown, fallback: boolean): boolean { + return typeof value === "boolean" ? value : fallback; +} + +function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode { + return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE; +} + +function resolveAcpMetaMode(value: unknown): AcpMetaMode { + if (value === "off" || value === "minimal" || value === "verbose") { + return value; + } + return DEFAULT_ACP_META_MODE; +} + +function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number { + return clampPositiveInteger( + cfg.acp?.stream?.coalesceIdleMs, + DEFAULT_ACP_STREAM_COALESCE_IDLE_MS, + { + min: 0, + max: 5_000, + }, + ); +} + +function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number { + return clampPositiveInteger(cfg.acp?.stream?.maxChunkChars, DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS, { + min: 50, + max: 4_000, + }); +} + +export function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings { + const stream = cfg.acp?.stream; + return { + deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode), + metaMode: resolveAcpMetaMode(stream?.metaMode), + showUsage: clampBoolean(stream?.showUsage, DEFAULT_ACP_SHOW_USAGE), + maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, { + min: 1, + max: 500_000, + }), + maxToolSummaryChars: clampPositiveInteger( + stream?.maxToolSummaryChars, + DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS, + { + min: 64, + max: 8_000, + }, + ), + maxStatusChars: clampPositiveInteger(stream?.maxStatusChars, DEFAULT_ACP_MAX_STATUS_CHARS, { + min: 64, + max: 8_000, + }), + maxMetaEventsPerTurn: clampPositiveInteger( + stream?.maxMetaEventsPerTurn, + DEFAULT_ACP_MAX_META_EVENTS_PER_TURN, + { + min: 1, + max: 2_000, + }, + ), + tagVisibility: stream?.tagVisibility ?? {}, + }; +} + +export function resolveAcpStreamingConfig(params: { + cfg: OpenClawConfig; + provider?: string; + accountId?: string; +}) { + return resolveEffectiveBlockStreamingConfig({ + cfg: params.cfg, + provider: params.provider, + accountId: params.accountId, + maxChunkChars: resolveAcpStreamMaxChunkChars(params.cfg), + coalesceIdleMs: resolveAcpStreamCoalesceIdleMs(params.cfg), + }); +} + +export function isAcpTagVisible( + settings: AcpProjectionSettings, + tag: AcpSessionUpdateTag | undefined, +): boolean { + if (!tag) { + return true; + } + const override = settings.tagVisibility[tag]; + if (typeof override === "boolean") { + return override; + } + if (Object.prototype.hasOwnProperty.call(ACP_TAG_VISIBILITY_DEFAULTS, tag)) { + return ACP_TAG_VISIBILITY_DEFAULTS[tag]; + } + return true; +} diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts new file mode 100644 index 000000000..84d8f33a5 --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -0,0 +1,199 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import type { TtsAutoMode } from "../../config/types.tts.js"; +import { logVerbose } from "../../globals.js"; +import { runMessageAction } from "../../infra/outbound/message-action-runner.js"; +import { maybeApplyTtsToPayload } from "../../tts/tts.js"; +import type { FinalizedMsgContext } from "../templating.js"; +import type { ReplyPayload } from "../types.js"; +import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; +import { routeReply } from "./route-reply.js"; + +export type AcpDispatchDeliveryMeta = { + toolCallId?: string; + allowEdit?: boolean; +}; + +type ToolMessageHandle = { + channel: string; + accountId?: string; + to: string; + threadId?: string | number; + messageId: string; +}; + +type AcpDispatchDeliveryState = { + startedReplyLifecycle: boolean; + accumulatedBlockText: string; + blockCount: number; + routedCounts: Record; + toolMessageByCallId: Map; +}; + +export type AcpDispatchDeliveryCoordinator = { + deliver: ( + kind: ReplyDispatchKind, + payload: ReplyPayload, + meta?: AcpDispatchDeliveryMeta, + ) => Promise; + getBlockCount: () => number; + getAccumulatedBlockText: () => string; + getRoutedCounts: () => Record; + applyRoutedCounts: (counts: Record) => void; +}; + +export function createAcpDispatchDeliveryCoordinator(params: { + cfg: OpenClawConfig; + ctx: FinalizedMsgContext; + dispatcher: ReplyDispatcher; + inboundAudio: boolean; + sessionTtsAuto?: TtsAutoMode; + ttsChannel?: string; + shouldRouteToOriginating: boolean; + originatingChannel?: string; + originatingTo?: string; + onReplyStart?: () => Promise | void; +}): AcpDispatchDeliveryCoordinator { + const state: AcpDispatchDeliveryState = { + startedReplyLifecycle: false, + accumulatedBlockText: "", + blockCount: 0, + routedCounts: { + tool: 0, + block: 0, + final: 0, + }, + toolMessageByCallId: new Map(), + }; + + const ensureReplyLifecycleStarted = async () => { + if (state.startedReplyLifecycle) { + return; + } + state.startedReplyLifecycle = true; + await params.onReplyStart?.(); + }; + + const tryEditToolMessage = async ( + payload: ReplyPayload, + toolCallId: string, + ): Promise => { + if (!params.shouldRouteToOriginating || !params.originatingChannel || !params.originatingTo) { + return false; + } + const handle = state.toolMessageByCallId.get(toolCallId); + if (!handle?.messageId) { + return false; + } + const message = payload.text?.trim(); + if (!message) { + return false; + } + + try { + await runMessageAction({ + cfg: params.cfg, + action: "edit", + params: { + channel: handle.channel, + accountId: handle.accountId, + to: handle.to, + threadId: handle.threadId, + messageId: handle.messageId, + message, + }, + sessionKey: params.ctx.SessionKey, + }); + state.routedCounts.tool += 1; + return true; + } catch (error) { + logVerbose( + `dispatch-acp: tool message edit failed for ${toolCallId}: ${error instanceof Error ? error.message : String(error)}`, + ); + return false; + } + }; + + const deliver = async ( + kind: ReplyDispatchKind, + payload: ReplyPayload, + meta?: AcpDispatchDeliveryMeta, + ): Promise => { + if (kind === "block" && payload.text?.trim()) { + if (state.accumulatedBlockText.length > 0) { + state.accumulatedBlockText += "\n"; + } + state.accumulatedBlockText += payload.text; + state.blockCount += 1; + } + + if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) { + await ensureReplyLifecycleStarted(); + } + + const ttsPayload = await maybeApplyTtsToPayload({ + payload, + cfg: params.cfg, + channel: params.ttsChannel, + kind, + inboundAudio: params.inboundAudio, + ttsAuto: params.sessionTtsAuto, + }); + + if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) { + const toolCallId = meta?.toolCallId?.trim(); + if (kind === "tool" && meta?.allowEdit === true && toolCallId) { + const edited = await tryEditToolMessage(ttsPayload, toolCallId); + if (edited) { + return true; + } + } + + const result = await routeReply({ + payload: ttsPayload, + channel: params.originatingChannel, + to: params.originatingTo, + sessionKey: params.ctx.SessionKey, + accountId: params.ctx.AccountId, + threadId: params.ctx.MessageThreadId, + cfg: params.cfg, + }); + if (!result.ok) { + logVerbose( + `dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`, + ); + return false; + } + if (kind === "tool" && meta?.toolCallId && result.messageId) { + state.toolMessageByCallId.set(meta.toolCallId, { + channel: params.originatingChannel, + accountId: params.ctx.AccountId, + to: params.originatingTo, + ...(params.ctx.MessageThreadId != null ? { threadId: params.ctx.MessageThreadId } : {}), + messageId: result.messageId, + }); + } + state.routedCounts[kind] += 1; + return true; + } + + if (kind === "tool") { + return params.dispatcher.sendToolResult(ttsPayload); + } + if (kind === "block") { + return params.dispatcher.sendBlockReply(ttsPayload); + } + return params.dispatcher.sendFinalReply(ttsPayload); + }; + + return { + deliver, + getBlockCount: () => state.blockCount, + getAccumulatedBlockText: () => state.accumulatedBlockText, + getRoutedCounts: () => ({ ...state.routedCounts }), + applyRoutedCounts: (counts) => { + counts.tool += state.routedCounts.tool; + counts.block += state.routedCounts.block; + counts.final += state.routedCounts.final; + }, + }; +} diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts new file mode 100644 index 000000000..92342d503 --- /dev/null +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -0,0 +1,378 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { AcpRuntimeError } from "../../acp/runtime/errors.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { ReplyDispatcher } from "./reply-dispatcher.js"; +import { buildTestCtx } from "./test-ctx.js"; +import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js"; + +const managerMocks = vi.hoisted(() => ({ + resolveSession: vi.fn(), + runTurn: vi.fn(), + getObservabilitySnapshot: vi.fn(() => ({ + turns: { queueDepth: 0 }, + runtimeCache: { activeSessions: 0 }, + })), +})); + +const policyMocks = vi.hoisted(() => ({ + resolveAcpDispatchPolicyError: vi.fn(() => null), + resolveAcpAgentPolicyError: vi.fn(() => null), +})); + +const routeMocks = vi.hoisted(() => ({ + routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })), +})); + +const messageActionMocks = vi.hoisted(() => ({ + runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })), +})); + +const ttsMocks = vi.hoisted(() => ({ + maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: unknown }; + return params.payload; + }), + resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })), +})); + +const sessionMetaMocks = vi.hoisted(() => ({ + readAcpSessionEntry: vi.fn(() => null), +})); + +const bindingServiceMocks = vi.hoisted(() => ({ + listBySession: vi.fn(() => []), +})); + +vi.mock("../../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => managerMocks, +})); + +vi.mock("../../acp/policy.js", () => ({ + resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) => + policyMocks.resolveAcpDispatchPolicyError(cfg), + resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) => + policyMocks.resolveAcpAgentPolicyError(cfg, agent), +})); + +vi.mock("./route-reply.js", () => ({ + routeReply: (params: unknown) => routeMocks.routeReply(params), +})); + +vi.mock("../../infra/outbound/message-action-runner.js", () => ({ + runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params), +})); + +vi.mock("../../tts/tts.js", () => ({ + maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params), + resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), +})); + +vi.mock("../../acp/runtime/session-meta.js", () => ({ + readAcpSessionEntry: (params: unknown) => sessionMetaMocks.readAcpSessionEntry(params), +})); + +vi.mock("../../infra/outbound/session-binding-service.js", () => ({ + getSessionBindingService: () => ({ + listBySession: (sessionKey: string) => bindingServiceMocks.listBySession(sessionKey), + }), +})); + +const { tryDispatchAcpReply } = await import("./dispatch-acp.js"); + +function createDispatcher(): { + dispatcher: ReplyDispatcher; + counts: Record<"tool" | "block" | "final", number>; +} { + const counts = { tool: 0, block: 0, final: 0 }; + const dispatcher: ReplyDispatcher = { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => true), + sendFinalReply: vi.fn(() => true), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => counts), + markComplete: vi.fn(), + }; + return { dispatcher, counts }; +} + +function setReadyAcpResolution() { + managerMocks.resolveSession.mockReturnValue({ + kind: "ready", + sessionKey: "agent:codex-acp:session-1", + meta: createAcpSessionMeta(), + }); +} + +describe("tryDispatchAcpReply", () => { + beforeEach(() => { + managerMocks.resolveSession.mockReset(); + managerMocks.runTurn.mockReset(); + managerMocks.getObservabilitySnapshot.mockReset(); + managerMocks.getObservabilitySnapshot.mockReturnValue({ + turns: { queueDepth: 0 }, + runtimeCache: { activeSessions: 0 }, + }); + policyMocks.resolveAcpDispatchPolicyError.mockReset(); + policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(null); + policyMocks.resolveAcpAgentPolicyError.mockReset(); + policyMocks.resolveAcpAgentPolicyError.mockReturnValue(null); + routeMocks.routeReply.mockReset(); + routeMocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" }); + messageActionMocks.runMessageAction.mockReset(); + messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const }); + ttsMocks.maybeApplyTtsToPayload.mockClear(); + ttsMocks.resolveTtsConfig.mockReset(); + ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); + sessionMetaMocks.readAcpSessionEntry.mockReset(); + sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null); + bindingServiceMocks.listBySession.mockReset(); + bindingServiceMocks.listBySession.mockReturnValue([]); + }); + + it("routes ACP block output to originating channel", async () => { + setReadyAcpResolution(); + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "hello", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + + const { dispatcher } = createDispatcher(); + const result = await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "reply", + }), + cfg: createAcpTestConfig(), + dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: true, + originatingChannel: "telegram", + originatingTo: "telegram:thread-1", + shouldSendToolSummaries: true, + bypassForCommand: false, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + + expect(result?.counts.block).toBe(1); + expect(routeMocks.routeReply).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "telegram:thread-1", + }), + ); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + }); + + it("edits ACP tool lifecycle updates in place when supported", async () => { + setReadyAcpResolution(); + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "call-1", + status: "in_progress", + title: "Run command", + text: "Run command (in_progress)", + }); + await onEvent({ + type: "tool_call", + tag: "tool_call_update", + toolCallId: "call-1", + status: "completed", + title: "Run command", + text: "Run command (completed)", + }); + await onEvent({ type: "done" }); + }, + ); + routeMocks.routeReply.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-1" }); + + const { dispatcher } = createDispatcher(); + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "run tool", + }), + cfg: createAcpTestConfig(), + dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: true, + originatingChannel: "telegram", + originatingTo: "telegram:thread-1", + shouldSendToolSummaries: true, + bypassForCommand: false, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + + expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); + expect(messageActionMocks.runMessageAction).toHaveBeenCalledWith( + expect.objectContaining({ + action: "edit", + params: expect.objectContaining({ + messageId: "tool-msg-1", + }), + }), + ); + }); + + it("falls back to new tool message when edit fails", async () => { + setReadyAcpResolution(); + managerMocks.runTurn.mockImplementation( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "call-2", + status: "in_progress", + title: "Run command", + text: "Run command (in_progress)", + }); + await onEvent({ + type: "tool_call", + tag: "tool_call_update", + toolCallId: "call-2", + status: "completed", + title: "Run command", + text: "Run command (completed)", + }); + await onEvent({ type: "done" }); + }, + ); + routeMocks.routeReply + .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2" }) + .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2-fallback" }); + messageActionMocks.runMessageAction.mockRejectedValueOnce(new Error("edit unsupported")); + + const { dispatcher } = createDispatcher(); + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "run tool", + }), + cfg: createAcpTestConfig(), + dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: true, + originatingChannel: "telegram", + originatingTo: "telegram:thread-1", + shouldSendToolSummaries: true, + bypassForCommand: false, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + + expect(messageActionMocks.runMessageAction).toHaveBeenCalledTimes(1); + expect(routeMocks.routeReply).toHaveBeenCalledTimes(2); + }); + + it("starts reply lifecycle only when visible projected output is emitted", async () => { + setReadyAcpResolution(); + const onReplyStart = vi.fn(); + const { dispatcher } = createDispatcher(); + + managerMocks.runTurn.mockImplementationOnce( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ + type: "status", + tag: "usage_update", + text: "usage updated: 1/100", + used: 1, + size: 100, + }); + await onEvent({ type: "done" }); + }, + ); + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "hidden", + }), + cfg: createAcpTestConfig(), + dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: false, + shouldSendToolSummaries: true, + bypassForCommand: false, + onReplyStart, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + expect(onReplyStart).not.toHaveBeenCalled(); + + managerMocks.runTurn.mockImplementationOnce( + async ({ onEvent }: { onEvent: (event: unknown) => Promise }) => { + await onEvent({ type: "text_delta", text: "visible", tag: "agent_message_chunk" }); + await onEvent({ type: "done" }); + }, + ); + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "visible", + }), + cfg: createAcpTestConfig(), + dispatcher: createDispatcher().dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: false, + shouldSendToolSummaries: true, + bypassForCommand: false, + onReplyStart, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); + + it("surfaces ACP policy errors as final error replies", async () => { + setReadyAcpResolution(); + policyMocks.resolveAcpDispatchPolicyError.mockReturnValue( + new AcpRuntimeError("ACP_DISPATCH_DISABLED", "ACP dispatch is disabled by policy."), + ); + const { dispatcher } = createDispatcher(); + + await tryDispatchAcpReply({ + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + SessionKey: "agent:codex-acp:session-1", + BodyForAgent: "test", + }), + cfg: createAcpTestConfig(), + dispatcher, + sessionKey: "agent:codex-acp:session-1", + inboundAudio: false, + shouldRouteToOriginating: false, + shouldSendToolSummaries: true, + bypassForCommand: false, + recordProcessed: vi.fn(), + markIdle: vi.fn(), + }); + + expect(managerMocks.runTurn).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( + expect.objectContaining({ + text: expect.stringContaining("ACP_DISPATCH_DISABLED"), + }), + ); + }); +}); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 573504f69..c40965a76 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -11,7 +11,6 @@ import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; -import { runMessageAction } from "../../infra/outbound/message-action-runner.js"; import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; import { generateSecureUuid } from "../../infra/secure-random.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; @@ -23,10 +22,9 @@ import { shouldHandleTextCommands, } from "../commands-registry.js"; import type { FinalizedMsgContext } from "../templating.js"; -import type { ReplyPayload } from "../types.js"; import { createAcpReplyProjector } from "./acp-projector.js"; +import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; -import { routeReply } from "./route-reply.js"; type DispatchProcessedRecorder = ( outcome: "completed" | "skipped" | "error", @@ -176,148 +174,24 @@ export async function tryDispatchAcpReply(params: { return null; } - const routedCounts: Record = { - tool: 0, - block: 0, - final: 0, - }; let queuedFinal = false; - let acpAccumulatedBlockText = ""; - let acpBlockCount = 0; - let startedReplyLifecycle = false; - const toolUpdateMessageById = new Map< - string, - { - channel: string; - accountId?: string; - to: string; - threadId?: string | number; - messageId: string; - } - >(); - - const ensureReplyLifecycleStarted = async () => { - if (startedReplyLifecycle) { - return; - } - startedReplyLifecycle = true; - await params.onReplyStart?.(); - }; - - const tryEditToolUpdate = async (payload: ReplyPayload, toolCallId: string): Promise => { - if (!params.shouldRouteToOriginating || !params.originatingChannel || !params.originatingTo) { - return false; - } - const handle = toolUpdateMessageById.get(toolCallId); - if (!handle?.messageId) { - return false; - } - const message = payload.text?.trim(); - if (!message) { - return false; - } - try { - await runMessageAction({ - cfg: params.cfg, - action: "edit", - params: { - channel: handle.channel, - accountId: handle.accountId, - to: handle.to, - threadId: handle.threadId, - messageId: handle.messageId, - message, - }, - sessionKey: params.ctx.SessionKey, - }); - routedCounts.tool += 1; - return true; - } catch (error) { - logVerbose( - `dispatch-acp: tool message edit failed for ${toolCallId}: ${error instanceof Error ? error.message : String(error)}`, - ); - return false; - } - }; - - const deliverAcpPayload = async ( - kind: ReplyDispatchKind, - payload: ReplyPayload, - meta?: { - toolCallId?: string; - allowEdit?: boolean; - }, - ): Promise => { - if (kind === "block" && payload.text?.trim()) { - if (acpAccumulatedBlockText.length > 0) { - acpAccumulatedBlockText += "\n"; - } - acpAccumulatedBlockText += payload.text; - acpBlockCount += 1; - } - if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) { - await ensureReplyLifecycleStarted(); - } - - const ttsPayload = await maybeApplyTtsToPayload({ - payload, - cfg: params.cfg, - channel: params.ttsChannel, - kind, - inboundAudio: params.inboundAudio, - ttsAuto: params.sessionTtsAuto, - }); - - if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) { - const toolCallId = meta?.toolCallId?.trim(); - if (kind === "tool" && meta?.allowEdit === true && toolCallId) { - const edited = await tryEditToolUpdate(ttsPayload, toolCallId); - if (edited) { - return true; - } - } - const result = await routeReply({ - payload: ttsPayload, - channel: params.originatingChannel, - to: params.originatingTo, - sessionKey: params.ctx.SessionKey, - accountId: params.ctx.AccountId, - threadId: params.ctx.MessageThreadId, - cfg: params.cfg, - }); - if (!result.ok) { - logVerbose( - `dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`, - ); - return false; - } - if (kind === "tool" && meta?.toolCallId && result.messageId) { - toolUpdateMessageById.set(meta.toolCallId, { - channel: params.originatingChannel, - accountId: params.ctx.AccountId, - to: params.originatingTo, - ...(params.ctx.MessageThreadId != null ? { threadId: params.ctx.MessageThreadId } : {}), - messageId: result.messageId, - }); - } - routedCounts[kind] += 1; - return true; - } - if (kind === "tool") { - return params.dispatcher.sendToolResult(ttsPayload); - } - if (kind === "block") { - return params.dispatcher.sendBlockReply(ttsPayload); - } - return params.dispatcher.sendFinalReply(ttsPayload); - }; + const delivery = createAcpDispatchDeliveryCoordinator({ + cfg: params.cfg, + ctx: params.ctx, + dispatcher: params.dispatcher, + inboundAudio: params.inboundAudio, + sessionTtsAuto: params.sessionTtsAuto, + ttsChannel: params.ttsChannel, + shouldRouteToOriginating: params.shouldRouteToOriginating, + originatingChannel: params.originatingChannel, + originatingTo: params.originatingTo, + onReplyStart: params.onReplyStart, + }); const promptText = resolveAcpPromptText(params.ctx); if (!promptText) { const counts = params.dispatcher.getQueuedCounts(); - counts.tool += routedCounts.tool; - counts.block += routedCounts.block; - counts.final += routedCounts.final; + delivery.applyRoutedCounts(counts); params.recordProcessed("completed", { reason: "acp_empty_prompt" }); params.markIdle("message_completed"); return { queuedFinal: false, counts }; @@ -346,7 +220,7 @@ export async function tryDispatchAcpReply(params: { const projector = createAcpReplyProjector({ cfg: params.cfg, shouldSendToolSummaries: params.shouldSendToolSummaries, - deliver: deliverAcpPayload, + deliver: delivery.deliver, provider: params.ctx.Surface ?? params.ctx.Provider, accountId: params.ctx.AccountId, }); @@ -376,10 +250,11 @@ export async function tryDispatchAcpReply(params: { await projector.flush(true); const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final"; - if (ttsMode === "final" && acpBlockCount > 0 && acpAccumulatedBlockText.trim()) { + const accumulatedBlockText = delivery.getAccumulatedBlockText(); + if (ttsMode === "final" && delivery.getBlockCount() > 0 && accumulatedBlockText.trim()) { try { const ttsSyntheticReply = await maybeApplyTtsToPayload({ - payload: { text: acpAccumulatedBlockText }, + payload: { text: accumulatedBlockText }, cfg: params.cfg, channel: params.ttsChannel, kind: "final", @@ -387,7 +262,7 @@ export async function tryDispatchAcpReply(params: { ttsAuto: params.sessionTtsAuto, }); if (ttsSyntheticReply.mediaUrl) { - const delivered = await deliverAcpPayload("final", { + const delivered = await delivery.deliver("final", { mediaUrl: ttsSyntheticReply.mediaUrl, audioAsVoice: ttsSyntheticReply.audioAsVoice, }); @@ -412,7 +287,7 @@ export async function tryDispatchAcpReply(params: { meta: currentMeta, }); if (resolvedDetails.length > 0) { - const delivered = await deliverAcpPayload("final", { + const delivered = await delivery.deliver("final", { text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")), }); queuedFinal = queuedFinal || delivered; @@ -421,9 +296,7 @@ export async function tryDispatchAcpReply(params: { } const counts = params.dispatcher.getQueuedCounts(); - counts.tool += routedCounts.tool; - counts.block += routedCounts.block; - counts.final += routedCounts.final; + delivery.applyRoutedCounts(counts); const acpStats = acpManager.getObservabilitySnapshot(params.cfg); logVerbose( `acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, @@ -438,15 +311,13 @@ export async function tryDispatchAcpReply(params: { fallbackCode: "ACP_TURN_FAILED", fallbackMessage: "ACP turn failed before completion.", }); - const delivered = await deliverAcpPayload("final", { + const delivered = await delivery.deliver("final", { text: formatAcpRuntimeErrorText(acpError), isError: true, }); queuedFinal = queuedFinal || delivered; const counts = params.dispatcher.getQueuedCounts(); - counts.tool += routedCounts.tool; - counts.block += routedCounts.block; - counts.final += routedCounts.final; + delivery.applyRoutedCounts(counts); const acpStats = acpManager.getObservabilitySnapshot(params.cfg); logVerbose( `acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`, diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 1e8f17c4d..95968ea95 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -41,9 +41,6 @@ const acpMocks = vi.hoisted(() => ({ const sessionBindingMocks = vi.hoisted(() => ({ listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []), })); -const messageActionMocks = vi.hoisted(() => ({ - runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })), -})); const ttsMocks = vi.hoisted(() => { const state = { synthesizeFinalAudio: false, @@ -145,9 +142,6 @@ vi.mock("../../tts/tts.js", () => ({ normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value), resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg), })); -vi.mock("../../infra/outbound/message-action-runner.js", () => ({ - runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params), -})); const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); const { resetInboundDedupe } = await import("./inbound-dedupe.js"); @@ -230,8 +224,6 @@ describe("dispatchReplyFromConfig", () => { acpMocks.upsertAcpSessionMeta.mockReset(); acpMocks.upsertAcpSessionMeta.mockResolvedValue(null); acpMocks.requireAcpRuntimeBackend.mockReset(); - messageActionMocks.runMessageAction.mockReset(); - messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const }); sessionBindingMocks.listBySession.mockReset(); sessionBindingMocks.listBySession.mockReturnValue([]); ttsMocks.state.synthesizeFinalAudio = false; @@ -1172,306 +1164,6 @@ describe("dispatchReplyFromConfig", () => { expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); - it("edits ACP tool lifecycle updates in place when channel edit is available", async () => { - setNoAbort(); - mocks.routeReply.mockClear(); - const runtime = createAcpRuntime([ - { - type: "tool_call", - tag: "tool_call", - toolCallId: "call-1", - status: "in_progress", - title: "Run command", - text: "Run command (in_progress)", - }, - { - type: "tool_call", - tag: "tool_call_update", - toolCallId: "call-1", - status: "completed", - title: "Run command", - text: "Run command (completed)", - }, - { type: "done" }, - ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - mocks.routeReply - .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-1" }) - .mockResolvedValueOnce({ ok: true, messageId: "final-msg-1" }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - OriginatingChannel: "telegram", - OriginatingTo: "telegram:thread-1", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "run tool", - }); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); - - expect(messageActionMocks.runMessageAction).toHaveBeenCalledWith( - expect.objectContaining({ - action: "edit", - params: expect.objectContaining({ - channel: "telegram", - to: "telegram:thread-1", - messageId: "tool-msg-1", - }), - }), - ); - expect(mocks.routeReply).toHaveBeenCalledTimes(1); - expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); - }); - - it("falls back to new ACP tool message when edit action fails", async () => { - setNoAbort(); - mocks.routeReply.mockClear(); - messageActionMocks.runMessageAction.mockRejectedValueOnce(new Error("edit unsupported")); - const runtime = createAcpRuntime([ - { - type: "tool_call", - tag: "tool_call", - toolCallId: "call-2", - status: "in_progress", - title: "Run command", - text: "Run command (in_progress)", - }, - { - type: "tool_call", - tag: "tool_call_update", - toolCallId: "call-2", - status: "completed", - title: "Run command", - text: "Run command (completed)", - }, - { type: "done" }, - ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - mocks.routeReply - .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2" }) - .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2-fallback" }) - .mockResolvedValueOnce({ ok: true, messageId: "final-msg-2" }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - OriginatingChannel: "telegram", - OriginatingTo: "telegram:thread-1", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "run tool", - }); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); - - expect(messageActionMocks.runMessageAction).toHaveBeenCalledTimes(1); - expect(mocks.routeReply).toHaveBeenCalledTimes(2); - expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); - }); - - it("falls back to new ACP tool message when first tool send has no message id", async () => { - setNoAbort(); - mocks.routeReply.mockClear(); - const runtime = createAcpRuntime([ - { - type: "tool_call", - tag: "tool_call", - toolCallId: "call-3", - status: "in_progress", - title: "Run command", - text: "Run command (in_progress)", - }, - { - type: "tool_call", - tag: "tool_call_update", - toolCallId: "call-3", - status: "completed", - title: "Run command", - text: "Run command (completed)", - }, - { type: "done" }, - ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime, - }); - mocks.routeReply - .mockResolvedValueOnce({ ok: true }) - .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-3-fallback" }) - .mockResolvedValueOnce({ ok: true, messageId: "final-msg-3" }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const ctx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - OriginatingChannel: "telegram", - OriginatingTo: "telegram:thread-1", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "run tool", - }); - - await dispatchReplyFromConfig({ ctx, cfg, dispatcher }); - - expect(messageActionMocks.runMessageAction).not.toHaveBeenCalled(); - expect(mocks.routeReply).toHaveBeenCalledTimes(2); - expect(dispatcher.sendToolResult).not.toHaveBeenCalled(); - }); - - it("starts ACP typing lifecycle only when visible output is projected", async () => { - setNoAbort(); - const hiddenRuntime = createAcpRuntime([ - { - type: "status", - tag: "usage_update", - text: "usage updated: 10/100", - used: 10, - size: 100, - }, - { type: "done" }, - ]); - acpMocks.readAcpSessionEntry.mockReturnValue({ - sessionKey: "agent:codex-acp:session-1", - storeSessionKey: "agent:codex-acp:session-1", - cfg: {}, - storePath: "/tmp/mock-sessions.json", - entry: {}, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime:1", - mode: "persistent", - state: "idle", - lastActivityAt: Date.now(), - }, - }); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime: hiddenRuntime, - }); - - const cfg = { - acp: { - enabled: true, - dispatch: { enabled: true }, - }, - } as OpenClawConfig; - const dispatcher = createDispatcher(); - const onReplyStart = vi.fn(); - const hiddenCtx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "hidden-only", - MessageSid: "acp-hidden-1", - }); - await dispatchReplyFromConfig({ - ctx: hiddenCtx, - cfg, - dispatcher, - replyOptions: { onReplyStart }, - }); - expect(onReplyStart).not.toHaveBeenCalled(); - - acpManagerTesting.resetAcpSessionManagerForTests(); - - const visibleRuntime = createAcpRuntime([ - { - type: "text_delta", - text: "visible output", - }, - { type: "done" }, - ]); - acpMocks.requireAcpRuntimeBackend.mockReturnValue({ - id: "acpx", - runtime: visibleRuntime, - }); - const visibleCtx = buildTestCtx({ - Provider: "discord", - Surface: "discord", - SessionKey: "agent:codex-acp:session-1", - BodyForAgent: "visible", - MessageSid: "acp-visible-1", - }); - await dispatchReplyFromConfig({ - ctx: visibleCtx, - cfg, - dispatcher: createDispatcher(), - replyOptions: { onReplyStart }, - }); - expect(onReplyStart).toHaveBeenCalledTimes(1); - }); - it("closes oneshot ACP sessions after the turn completes", async () => { setNoAbort(); const runtime = createAcpRuntime([{ type: "done" }]); diff --git a/src/auto-reply/reply/test-fixtures/acp-runtime.ts b/src/auto-reply/reply/test-fixtures/acp-runtime.ts new file mode 100644 index 000000000..4e4e034a0 --- /dev/null +++ b/src/auto-reply/reply/test-fixtures/acp-runtime.ts @@ -0,0 +1,33 @@ +import type { OpenClawConfig } from "../../../config/config.js"; +import type { SessionAcpMeta } from "../../../config/sessions/types.js"; + +export function createAcpTestConfig(overrides?: Partial): OpenClawConfig { + return { + acp: { + enabled: true, + stream: { + coalesceIdleMs: 0, + maxChunkChars: 64, + }, + }, + ...overrides, + } as OpenClawConfig; +} + +export function createAcpSessionMeta(overrides?: Partial): SessionAcpMeta { + return { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime:1", + mode: "persistent", + state: "idle", + lastActivityAt: Date.now(), + identity: { + state: "resolved", + acpxSessionId: "acpx-session-1", + source: "status", + lastUpdatedAt: Date.now(), + }, + ...overrides, + }; +}