Refactor: split ACP dispatch delivery and settings

This commit is contained in:
Onur
2026-03-01 10:17:44 +01:00
committed by Onur Solmaz
parent 54ed2efc20
commit 752398a6ba
9 changed files with 875 additions and 630 deletions

View File

@@ -1,20 +1,7 @@
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { createAcpReplyProjector } from "./acp-projector.js";
function createCfg(overrides?: Partial<OpenClawConfig>): OpenClawConfig {
return {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 64,
},
},
...overrides,
} as OpenClawConfig;
}
import { createAcpTestConfig as createCfg } from "./test-fixtures/acp-runtime.js";
describe("createAcpReplyProjector", () => {
it("coalesces text deltas into bounded block chunks", async () => {

View File

@@ -4,34 +4,16 @@ import { formatToolSummary, resolveToolDisplay } from "../../agents/tool-display
import type { OpenClawConfig } from "../../config/config.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import type { ReplyPayload } from "../types.js";
import {
isAcpTagVisible,
resolveAcpProjectionSettings,
resolveAcpStreamingConfig,
} from "./acp-stream-settings.js";
import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
import type { ReplyDispatchKind } from "./reply-dispatcher.js";
const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350;
const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800;
const DEFAULT_ACP_META_MODE = "minimal";
const DEFAULT_ACP_SHOW_USAGE = false;
const DEFAULT_ACP_DELIVERY_MODE = "live";
const DEFAULT_ACP_MAX_TURN_CHARS = 24_000;
const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320;
const DEFAULT_ACP_MAX_STATUS_CHARS = 320;
const DEFAULT_ACP_MAX_META_EVENTS_PER_TURN = 64;
const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000;
const ACP_TAG_VISIBILITY_DEFAULTS: Record<string, boolean> = {
agent_message_chunk: true,
tool_call: true,
tool_call_update: true,
usage_update: false,
available_commands_update: false,
current_mode_update: false,
config_option_update: false,
session_info_update: false,
plan: false,
agent_thought_chunk: false,
};
const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]);
export type AcpProjectedDeliveryMeta = {
@@ -41,125 +23,12 @@ export type AcpProjectedDeliveryMeta = {
allowEdit?: boolean;
};
type AcpDeliveryMode = "live" | "final_only";
type AcpMetaMode = "off" | "minimal" | "verbose";
type AcpProjectionSettings = {
deliveryMode: AcpDeliveryMode;
metaMode: AcpMetaMode;
showUsage: boolean;
maxTurnChars: number;
maxToolSummaryChars: number;
maxStatusChars: number;
maxMetaEventsPerTurn: number;
tagVisibility: Partial<Record<AcpSessionUpdateTag, boolean>>;
};
type ToolLifecycleState = {
started: boolean;
terminal: boolean;
lastRenderedHash?: string;
};
function clampPositiveInteger(
value: unknown,
fallback: number,
bounds: { min: number; max: number },
): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const rounded = Math.round(value);
if (rounded < bounds.min) {
return bounds.min;
}
if (rounded > bounds.max) {
return bounds.max;
}
return rounded;
}
function clampBoolean(value: unknown, fallback: boolean): boolean {
return typeof value === "boolean" ? value : fallback;
}
function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode {
return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE;
}
function resolveAcpMetaMode(value: unknown): AcpMetaMode {
if (value === "off" || value === "minimal" || value === "verbose") {
return value;
}
return DEFAULT_ACP_META_MODE;
}
function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number {
return clampPositiveInteger(
cfg.acp?.stream?.coalesceIdleMs,
DEFAULT_ACP_STREAM_COALESCE_IDLE_MS,
{
min: 0,
max: 5_000,
},
);
}
function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number {
return clampPositiveInteger(cfg.acp?.stream?.maxChunkChars, DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS, {
min: 50,
max: 4_000,
});
}
function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings {
const stream = cfg.acp?.stream;
return {
deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode),
metaMode: resolveAcpMetaMode(stream?.metaMode),
showUsage: clampBoolean(stream?.showUsage, DEFAULT_ACP_SHOW_USAGE),
maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, {
min: 1,
max: 500_000,
}),
maxToolSummaryChars: clampPositiveInteger(
stream?.maxToolSummaryChars,
DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS,
{
min: 64,
max: 8_000,
},
),
maxStatusChars: clampPositiveInteger(stream?.maxStatusChars, DEFAULT_ACP_MAX_STATUS_CHARS, {
min: 64,
max: 8_000,
}),
maxMetaEventsPerTurn: clampPositiveInteger(
stream?.maxMetaEventsPerTurn,
DEFAULT_ACP_MAX_META_EVENTS_PER_TURN,
{
min: 1,
max: 2_000,
},
),
tagVisibility: stream?.tagVisibility ?? {},
};
}
function resolveAcpStreamingConfig(params: {
cfg: OpenClawConfig;
provider?: string;
accountId?: string;
}) {
return resolveEffectiveBlockStreamingConfig({
cfg: params.cfg,
provider: params.provider,
accountId: params.accountId,
maxChunkChars: resolveAcpStreamMaxChunkChars(params.cfg),
coalesceIdleMs: resolveAcpStreamCoalesceIdleMs(params.cfg),
});
}
function truncateText(input: string, maxChars: number): string {
if (input.length <= maxChars) {
return input;
@@ -182,23 +51,6 @@ function normalizeToolStatus(status: string | undefined): string | undefined {
return normalized || undefined;
}
function isTagVisible(
settings: AcpProjectionSettings,
tag: AcpSessionUpdateTag | undefined,
): boolean {
if (!tag) {
return true;
}
const override = settings.tagVisibility[tag];
if (typeof override === "boolean") {
return override;
}
if (Object.prototype.hasOwnProperty.call(ACP_TAG_VISIBILITY_DEFAULTS, tag)) {
return ACP_TAG_VISIBILITY_DEFAULTS[tag];
}
return true;
}
function renderToolSummaryText(event: Extract<AcpRuntimeEvent, { type: "tool_call" }>): string {
const detailParts: string[] = [];
const title = event.title?.trim();
@@ -335,7 +187,7 @@ export function createAcpReplyProjector(params: {
if (!params.shouldSendToolSummaries || settings.metaMode === "off") {
return;
}
if (!isTagVisible(settings, event.tag)) {
if (!isAcpTagVisible(settings, event.tag)) {
return;
}
@@ -419,7 +271,7 @@ export function createAcpReplyProjector(params: {
if (event.stream && event.stream !== "output") {
return;
}
if (!isTagVisible(settings, event.tag)) {
if (!isAcpTagVisible(settings, event.tag)) {
return;
}
const text = event.text;
@@ -444,7 +296,7 @@ export function createAcpReplyProjector(params: {
}
if (event.type === "status") {
if (!isTagVisible(settings, event.tag)) {
if (!isAcpTagVisible(settings, event.tag)) {
return;
}
if (event.tag === "usage_update") {

View File

@@ -0,0 +1,77 @@
import { describe, expect, it } from "vitest";
import {
isAcpTagVisible,
resolveAcpProjectionSettings,
resolveAcpStreamingConfig,
} from "./acp-stream-settings.js";
import { createAcpTestConfig } from "./test-fixtures/acp-runtime.js";
describe("acp stream settings", () => {
it("resolves stable defaults", () => {
const settings = resolveAcpProjectionSettings(createAcpTestConfig());
expect(settings.deliveryMode).toBe("live");
expect(settings.metaMode).toBe("minimal");
expect(settings.showUsage).toBe(false);
expect(settings.maxTurnChars).toBe(24_000);
expect(settings.maxMetaEventsPerTurn).toBe(64);
});
it("applies explicit stream overrides", () => {
const settings = resolveAcpProjectionSettings(
createAcpTestConfig({
acp: {
enabled: true,
stream: {
deliveryMode: "final_only",
metaMode: "off",
showUsage: true,
maxTurnChars: 500,
maxMetaEventsPerTurn: 7,
tagVisibility: {
usage_update: true,
},
},
},
}),
);
expect(settings.deliveryMode).toBe("final_only");
expect(settings.metaMode).toBe("off");
expect(settings.showUsage).toBe(true);
expect(settings.maxTurnChars).toBe(500);
expect(settings.maxMetaEventsPerTurn).toBe(7);
expect(settings.tagVisibility.usage_update).toBe(true);
});
it("uses default tag visibility when no override is provided", () => {
const settings = resolveAcpProjectionSettings(createAcpTestConfig());
expect(isAcpTagVisible(settings, "tool_call")).toBe(true);
expect(isAcpTagVisible(settings, "usage_update")).toBe(false);
});
it("respects tag visibility overrides", () => {
const settings = resolveAcpProjectionSettings(
createAcpTestConfig({
acp: {
enabled: true,
stream: {
tagVisibility: {
usage_update: true,
tool_call: false,
},
},
},
}),
);
expect(isAcpTagVisible(settings, "usage_update")).toBe(true);
expect(isAcpTagVisible(settings, "tool_call")).toBe(false);
});
it("resolves chunking/coalescing from ACP stream controls", () => {
const streaming = resolveAcpStreamingConfig({
cfg: createAcpTestConfig(),
provider: "discord",
});
expect(streaming.chunking.maxChars).toBe(64);
expect(streaming.coalescing.idleMs).toBe(0);
});
});

View File

@@ -0,0 +1,156 @@
import type { AcpSessionUpdateTag } from "../../acp/runtime/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350;
const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800;
const DEFAULT_ACP_META_MODE = "minimal";
const DEFAULT_ACP_SHOW_USAGE = false;
const DEFAULT_ACP_DELIVERY_MODE = "live";
const DEFAULT_ACP_MAX_TURN_CHARS = 24_000;
const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320;
const DEFAULT_ACP_MAX_STATUS_CHARS = 320;
const DEFAULT_ACP_MAX_META_EVENTS_PER_TURN = 64;
export const ACP_TAG_VISIBILITY_DEFAULTS: Record<AcpSessionUpdateTag, boolean> = {
agent_message_chunk: true,
tool_call: true,
tool_call_update: true,
usage_update: false,
available_commands_update: false,
current_mode_update: false,
config_option_update: false,
session_info_update: false,
plan: false,
agent_thought_chunk: false,
};
export type AcpDeliveryMode = "live" | "final_only";
export type AcpMetaMode = "off" | "minimal" | "verbose";
export type AcpProjectionSettings = {
deliveryMode: AcpDeliveryMode;
metaMode: AcpMetaMode;
showUsage: boolean;
maxTurnChars: number;
maxToolSummaryChars: number;
maxStatusChars: number;
maxMetaEventsPerTurn: number;
tagVisibility: Partial<Record<AcpSessionUpdateTag, boolean>>;
};
function clampPositiveInteger(
value: unknown,
fallback: number,
bounds: { min: number; max: number },
): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const rounded = Math.round(value);
if (rounded < bounds.min) {
return bounds.min;
}
if (rounded > bounds.max) {
return bounds.max;
}
return rounded;
}
function clampBoolean(value: unknown, fallback: boolean): boolean {
return typeof value === "boolean" ? value : fallback;
}
function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode {
return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE;
}
function resolveAcpMetaMode(value: unknown): AcpMetaMode {
if (value === "off" || value === "minimal" || value === "verbose") {
return value;
}
return DEFAULT_ACP_META_MODE;
}
function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number {
return clampPositiveInteger(
cfg.acp?.stream?.coalesceIdleMs,
DEFAULT_ACP_STREAM_COALESCE_IDLE_MS,
{
min: 0,
max: 5_000,
},
);
}
function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number {
return clampPositiveInteger(cfg.acp?.stream?.maxChunkChars, DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS, {
min: 50,
max: 4_000,
});
}
export function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings {
const stream = cfg.acp?.stream;
return {
deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode),
metaMode: resolveAcpMetaMode(stream?.metaMode),
showUsage: clampBoolean(stream?.showUsage, DEFAULT_ACP_SHOW_USAGE),
maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, {
min: 1,
max: 500_000,
}),
maxToolSummaryChars: clampPositiveInteger(
stream?.maxToolSummaryChars,
DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS,
{
min: 64,
max: 8_000,
},
),
maxStatusChars: clampPositiveInteger(stream?.maxStatusChars, DEFAULT_ACP_MAX_STATUS_CHARS, {
min: 64,
max: 8_000,
}),
maxMetaEventsPerTurn: clampPositiveInteger(
stream?.maxMetaEventsPerTurn,
DEFAULT_ACP_MAX_META_EVENTS_PER_TURN,
{
min: 1,
max: 2_000,
},
),
tagVisibility: stream?.tagVisibility ?? {},
};
}
export function resolveAcpStreamingConfig(params: {
cfg: OpenClawConfig;
provider?: string;
accountId?: string;
}) {
return resolveEffectiveBlockStreamingConfig({
cfg: params.cfg,
provider: params.provider,
accountId: params.accountId,
maxChunkChars: resolveAcpStreamMaxChunkChars(params.cfg),
coalesceIdleMs: resolveAcpStreamCoalesceIdleMs(params.cfg),
});
}
export function isAcpTagVisible(
settings: AcpProjectionSettings,
tag: AcpSessionUpdateTag | undefined,
): boolean {
if (!tag) {
return true;
}
const override = settings.tagVisibility[tag];
if (typeof override === "boolean") {
return override;
}
if (Object.prototype.hasOwnProperty.call(ACP_TAG_VISIBILITY_DEFAULTS, tag)) {
return ACP_TAG_VISIBILITY_DEFAULTS[tag];
}
return true;
}

View File

@@ -0,0 +1,199 @@
import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { runMessageAction } from "../../infra/outbound/message-action-runner.js";
import { maybeApplyTtsToPayload } from "../../tts/tts.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { routeReply } from "./route-reply.js";
export type AcpDispatchDeliveryMeta = {
toolCallId?: string;
allowEdit?: boolean;
};
type ToolMessageHandle = {
channel: string;
accountId?: string;
to: string;
threadId?: string | number;
messageId: string;
};
type AcpDispatchDeliveryState = {
startedReplyLifecycle: boolean;
accumulatedBlockText: string;
blockCount: number;
routedCounts: Record<ReplyDispatchKind, number>;
toolMessageByCallId: Map<string, ToolMessageHandle>;
};
export type AcpDispatchDeliveryCoordinator = {
deliver: (
kind: ReplyDispatchKind,
payload: ReplyPayload,
meta?: AcpDispatchDeliveryMeta,
) => Promise<boolean>;
getBlockCount: () => number;
getAccumulatedBlockText: () => string;
getRoutedCounts: () => Record<ReplyDispatchKind, number>;
applyRoutedCounts: (counts: Record<ReplyDispatchKind, number>) => void;
};
export function createAcpDispatchDeliveryCoordinator(params: {
cfg: OpenClawConfig;
ctx: FinalizedMsgContext;
dispatcher: ReplyDispatcher;
inboundAudio: boolean;
sessionTtsAuto?: TtsAutoMode;
ttsChannel?: string;
shouldRouteToOriginating: boolean;
originatingChannel?: string;
originatingTo?: string;
onReplyStart?: () => Promise<void> | void;
}): AcpDispatchDeliveryCoordinator {
const state: AcpDispatchDeliveryState = {
startedReplyLifecycle: false,
accumulatedBlockText: "",
blockCount: 0,
routedCounts: {
tool: 0,
block: 0,
final: 0,
},
toolMessageByCallId: new Map(),
};
const ensureReplyLifecycleStarted = async () => {
if (state.startedReplyLifecycle) {
return;
}
state.startedReplyLifecycle = true;
await params.onReplyStart?.();
};
const tryEditToolMessage = async (
payload: ReplyPayload,
toolCallId: string,
): Promise<boolean> => {
if (!params.shouldRouteToOriginating || !params.originatingChannel || !params.originatingTo) {
return false;
}
const handle = state.toolMessageByCallId.get(toolCallId);
if (!handle?.messageId) {
return false;
}
const message = payload.text?.trim();
if (!message) {
return false;
}
try {
await runMessageAction({
cfg: params.cfg,
action: "edit",
params: {
channel: handle.channel,
accountId: handle.accountId,
to: handle.to,
threadId: handle.threadId,
messageId: handle.messageId,
message,
},
sessionKey: params.ctx.SessionKey,
});
state.routedCounts.tool += 1;
return true;
} catch (error) {
logVerbose(
`dispatch-acp: tool message edit failed for ${toolCallId}: ${error instanceof Error ? error.message : String(error)}`,
);
return false;
}
};
const deliver = async (
kind: ReplyDispatchKind,
payload: ReplyPayload,
meta?: AcpDispatchDeliveryMeta,
): Promise<boolean> => {
if (kind === "block" && payload.text?.trim()) {
if (state.accumulatedBlockText.length > 0) {
state.accumulatedBlockText += "\n";
}
state.accumulatedBlockText += payload.text;
state.blockCount += 1;
}
if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) {
await ensureReplyLifecycleStarted();
}
const ttsPayload = await maybeApplyTtsToPayload({
payload,
cfg: params.cfg,
channel: params.ttsChannel,
kind,
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) {
const toolCallId = meta?.toolCallId?.trim();
if (kind === "tool" && meta?.allowEdit === true && toolCallId) {
const edited = await tryEditToolMessage(ttsPayload, toolCallId);
if (edited) {
return true;
}
}
const result = await routeReply({
payload: ttsPayload,
channel: params.originatingChannel,
to: params.originatingTo,
sessionKey: params.ctx.SessionKey,
accountId: params.ctx.AccountId,
threadId: params.ctx.MessageThreadId,
cfg: params.cfg,
});
if (!result.ok) {
logVerbose(
`dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`,
);
return false;
}
if (kind === "tool" && meta?.toolCallId && result.messageId) {
state.toolMessageByCallId.set(meta.toolCallId, {
channel: params.originatingChannel,
accountId: params.ctx.AccountId,
to: params.originatingTo,
...(params.ctx.MessageThreadId != null ? { threadId: params.ctx.MessageThreadId } : {}),
messageId: result.messageId,
});
}
state.routedCounts[kind] += 1;
return true;
}
if (kind === "tool") {
return params.dispatcher.sendToolResult(ttsPayload);
}
if (kind === "block") {
return params.dispatcher.sendBlockReply(ttsPayload);
}
return params.dispatcher.sendFinalReply(ttsPayload);
};
return {
deliver,
getBlockCount: () => state.blockCount,
getAccumulatedBlockText: () => state.accumulatedBlockText,
getRoutedCounts: () => ({ ...state.routedCounts }),
applyRoutedCounts: (counts) => {
counts.tool += state.routedCounts.tool;
counts.block += state.routedCounts.block;
counts.final += state.routedCounts.final;
},
};
}

View File

@@ -0,0 +1,378 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { AcpRuntimeError } from "../../acp/runtime/errors.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { ReplyDispatcher } from "./reply-dispatcher.js";
import { buildTestCtx } from "./test-ctx.js";
import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js";
const managerMocks = vi.hoisted(() => ({
resolveSession: vi.fn(),
runTurn: vi.fn(),
getObservabilitySnapshot: vi.fn(() => ({
turns: { queueDepth: 0 },
runtimeCache: { activeSessions: 0 },
})),
}));
const policyMocks = vi.hoisted(() => ({
resolveAcpDispatchPolicyError: vi.fn(() => null),
resolveAcpAgentPolicyError: vi.fn(() => null),
}));
const routeMocks = vi.hoisted(() => ({
routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })),
}));
const messageActionMocks = vi.hoisted(() => ({
runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })),
}));
const ttsMocks = vi.hoisted(() => ({
maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => {
const params = paramsUnknown as { payload: unknown };
return params.payload;
}),
resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })),
}));
const sessionMetaMocks = vi.hoisted(() => ({
readAcpSessionEntry: vi.fn(() => null),
}));
const bindingServiceMocks = vi.hoisted(() => ({
listBySession: vi.fn(() => []),
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => managerMocks,
}));
vi.mock("../../acp/policy.js", () => ({
resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) =>
policyMocks.resolveAcpDispatchPolicyError(cfg),
resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) =>
policyMocks.resolveAcpAgentPolicyError(cfg, agent),
}));
vi.mock("./route-reply.js", () => ({
routeReply: (params: unknown) => routeMocks.routeReply(params),
}));
vi.mock("../../infra/outbound/message-action-runner.js", () => ({
runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params),
}));
vi.mock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg),
}));
vi.mock("../../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: (params: unknown) => sessionMetaMocks.readAcpSessionEntry(params),
}));
vi.mock("../../infra/outbound/session-binding-service.js", () => ({
getSessionBindingService: () => ({
listBySession: (sessionKey: string) => bindingServiceMocks.listBySession(sessionKey),
}),
}));
const { tryDispatchAcpReply } = await import("./dispatch-acp.js");
function createDispatcher(): {
dispatcher: ReplyDispatcher;
counts: Record<"tool" | "block" | "final", number>;
} {
const counts = { tool: 0, block: 0, final: 0 };
const dispatcher: ReplyDispatcher = {
sendToolResult: vi.fn(() => true),
sendBlockReply: vi.fn(() => true),
sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => counts),
markComplete: vi.fn(),
};
return { dispatcher, counts };
}
function setReadyAcpResolution() {
managerMocks.resolveSession.mockReturnValue({
kind: "ready",
sessionKey: "agent:codex-acp:session-1",
meta: createAcpSessionMeta(),
});
}
describe("tryDispatchAcpReply", () => {
beforeEach(() => {
managerMocks.resolveSession.mockReset();
managerMocks.runTurn.mockReset();
managerMocks.getObservabilitySnapshot.mockReset();
managerMocks.getObservabilitySnapshot.mockReturnValue({
turns: { queueDepth: 0 },
runtimeCache: { activeSessions: 0 },
});
policyMocks.resolveAcpDispatchPolicyError.mockReset();
policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(null);
policyMocks.resolveAcpAgentPolicyError.mockReset();
policyMocks.resolveAcpAgentPolicyError.mockReturnValue(null);
routeMocks.routeReply.mockReset();
routeMocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" });
messageActionMocks.runMessageAction.mockReset();
messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const });
ttsMocks.maybeApplyTtsToPayload.mockClear();
ttsMocks.resolveTtsConfig.mockReset();
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
sessionMetaMocks.readAcpSessionEntry.mockReset();
sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null);
bindingServiceMocks.listBySession.mockReset();
bindingServiceMocks.listBySession.mockReturnValue([]);
});
it("routes ACP block output to originating channel", async () => {
setReadyAcpResolution();
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "hello", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
const { dispatcher } = createDispatcher();
const result = await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "reply",
}),
cfg: createAcpTestConfig(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: true,
originatingChannel: "telegram",
originatingTo: "telegram:thread-1",
shouldSendToolSummaries: true,
bypassForCommand: false,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(result?.counts.block).toBe(1);
expect(routeMocks.routeReply).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:thread-1",
}),
);
expect(dispatcher.sendBlockReply).not.toHaveBeenCalled();
});
it("edits ACP tool lifecycle updates in place when supported", async () => {
setReadyAcpResolution();
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call-1",
status: "in_progress",
title: "Run command",
text: "Run command (in_progress)",
});
await onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call-1",
status: "completed",
title: "Run command",
text: "Run command (completed)",
});
await onEvent({ type: "done" });
},
);
routeMocks.routeReply.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-1" });
const { dispatcher } = createDispatcher();
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
}),
cfg: createAcpTestConfig(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: true,
originatingChannel: "telegram",
originatingTo: "telegram:thread-1",
shouldSendToolSummaries: true,
bypassForCommand: false,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
expect(messageActionMocks.runMessageAction).toHaveBeenCalledWith(
expect.objectContaining({
action: "edit",
params: expect.objectContaining({
messageId: "tool-msg-1",
}),
}),
);
});
it("falls back to new tool message when edit fails", async () => {
setReadyAcpResolution();
managerMocks.runTurn.mockImplementation(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call-2",
status: "in_progress",
title: "Run command",
text: "Run command (in_progress)",
});
await onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call-2",
status: "completed",
title: "Run command",
text: "Run command (completed)",
});
await onEvent({ type: "done" });
},
);
routeMocks.routeReply
.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2" })
.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2-fallback" });
messageActionMocks.runMessageAction.mockRejectedValueOnce(new Error("edit unsupported"));
const { dispatcher } = createDispatcher();
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
}),
cfg: createAcpTestConfig(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: true,
originatingChannel: "telegram",
originatingTo: "telegram:thread-1",
shouldSendToolSummaries: true,
bypassForCommand: false,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(messageActionMocks.runMessageAction).toHaveBeenCalledTimes(1);
expect(routeMocks.routeReply).toHaveBeenCalledTimes(2);
});
it("starts reply lifecycle only when visible projected output is emitted", async () => {
setReadyAcpResolution();
const onReplyStart = vi.fn();
const { dispatcher } = createDispatcher();
managerMocks.runTurn.mockImplementationOnce(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({
type: "status",
tag: "usage_update",
text: "usage updated: 1/100",
used: 1,
size: 100,
});
await onEvent({ type: "done" });
},
);
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "hidden",
}),
cfg: createAcpTestConfig(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: false,
shouldSendToolSummaries: true,
bypassForCommand: false,
onReplyStart,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(onReplyStart).not.toHaveBeenCalled();
managerMocks.runTurn.mockImplementationOnce(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "visible", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "visible",
}),
cfg: createAcpTestConfig(),
dispatcher: createDispatcher().dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: false,
shouldSendToolSummaries: true,
bypassForCommand: false,
onReplyStart,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("surfaces ACP policy errors as final error replies", async () => {
setReadyAcpResolution();
policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(
new AcpRuntimeError("ACP_DISPATCH_DISABLED", "ACP dispatch is disabled by policy."),
);
const { dispatcher } = createDispatcher();
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "test",
}),
cfg: createAcpTestConfig(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: false,
shouldSendToolSummaries: true,
bypassForCommand: false,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(managerMocks.runTurn).not.toHaveBeenCalled();
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
expect.objectContaining({
text: expect.stringContaining("ACP_DISPATCH_DISABLED"),
}),
);
});
});

View File

@@ -11,7 +11,6 @@ import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { runMessageAction } from "../../infra/outbound/message-action-runner.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
@@ -23,10 +22,9 @@ import {
shouldHandleTextCommands,
} from "../commands-registry.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
import { createAcpReplyProjector } from "./acp-projector.js";
import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { routeReply } from "./route-reply.js";
type DispatchProcessedRecorder = (
outcome: "completed" | "skipped" | "error",
@@ -176,148 +174,24 @@ export async function tryDispatchAcpReply(params: {
return null;
}
const routedCounts: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0,
final: 0,
};
let queuedFinal = false;
let acpAccumulatedBlockText = "";
let acpBlockCount = 0;
let startedReplyLifecycle = false;
const toolUpdateMessageById = new Map<
string,
{
channel: string;
accountId?: string;
to: string;
threadId?: string | number;
messageId: string;
}
>();
const ensureReplyLifecycleStarted = async () => {
if (startedReplyLifecycle) {
return;
}
startedReplyLifecycle = true;
await params.onReplyStart?.();
};
const tryEditToolUpdate = async (payload: ReplyPayload, toolCallId: string): Promise<boolean> => {
if (!params.shouldRouteToOriginating || !params.originatingChannel || !params.originatingTo) {
return false;
}
const handle = toolUpdateMessageById.get(toolCallId);
if (!handle?.messageId) {
return false;
}
const message = payload.text?.trim();
if (!message) {
return false;
}
try {
await runMessageAction({
cfg: params.cfg,
action: "edit",
params: {
channel: handle.channel,
accountId: handle.accountId,
to: handle.to,
threadId: handle.threadId,
messageId: handle.messageId,
message,
},
sessionKey: params.ctx.SessionKey,
});
routedCounts.tool += 1;
return true;
} catch (error) {
logVerbose(
`dispatch-acp: tool message edit failed for ${toolCallId}: ${error instanceof Error ? error.message : String(error)}`,
);
return false;
}
};
const deliverAcpPayload = async (
kind: ReplyDispatchKind,
payload: ReplyPayload,
meta?: {
toolCallId?: string;
allowEdit?: boolean;
},
): Promise<boolean> => {
if (kind === "block" && payload.text?.trim()) {
if (acpAccumulatedBlockText.length > 0) {
acpAccumulatedBlockText += "\n";
}
acpAccumulatedBlockText += payload.text;
acpBlockCount += 1;
}
if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) {
await ensureReplyLifecycleStarted();
}
const ttsPayload = await maybeApplyTtsToPayload({
payload,
cfg: params.cfg,
channel: params.ttsChannel,
kind,
inboundAudio: params.inboundAudio,
ttsAuto: params.sessionTtsAuto,
});
if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) {
const toolCallId = meta?.toolCallId?.trim();
if (kind === "tool" && meta?.allowEdit === true && toolCallId) {
const edited = await tryEditToolUpdate(ttsPayload, toolCallId);
if (edited) {
return true;
}
}
const result = await routeReply({
payload: ttsPayload,
channel: params.originatingChannel,
to: params.originatingTo,
sessionKey: params.ctx.SessionKey,
accountId: params.ctx.AccountId,
threadId: params.ctx.MessageThreadId,
cfg: params.cfg,
});
if (!result.ok) {
logVerbose(
`dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`,
);
return false;
}
if (kind === "tool" && meta?.toolCallId && result.messageId) {
toolUpdateMessageById.set(meta.toolCallId, {
channel: params.originatingChannel,
accountId: params.ctx.AccountId,
to: params.originatingTo,
...(params.ctx.MessageThreadId != null ? { threadId: params.ctx.MessageThreadId } : {}),
messageId: result.messageId,
});
}
routedCounts[kind] += 1;
return true;
}
if (kind === "tool") {
return params.dispatcher.sendToolResult(ttsPayload);
}
if (kind === "block") {
return params.dispatcher.sendBlockReply(ttsPayload);
}
return params.dispatcher.sendFinalReply(ttsPayload);
};
const delivery = createAcpDispatchDeliveryCoordinator({
cfg: params.cfg,
ctx: params.ctx,
dispatcher: params.dispatcher,
inboundAudio: params.inboundAudio,
sessionTtsAuto: params.sessionTtsAuto,
ttsChannel: params.ttsChannel,
shouldRouteToOriginating: params.shouldRouteToOriginating,
originatingChannel: params.originatingChannel,
originatingTo: params.originatingTo,
onReplyStart: params.onReplyStart,
});
const promptText = resolveAcpPromptText(params.ctx);
if (!promptText) {
const counts = params.dispatcher.getQueuedCounts();
counts.tool += routedCounts.tool;
counts.block += routedCounts.block;
counts.final += routedCounts.final;
delivery.applyRoutedCounts(counts);
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
params.markIdle("message_completed");
return { queuedFinal: false, counts };
@@ -346,7 +220,7 @@ export async function tryDispatchAcpReply(params: {
const projector = createAcpReplyProjector({
cfg: params.cfg,
shouldSendToolSummaries: params.shouldSendToolSummaries,
deliver: deliverAcpPayload,
deliver: delivery.deliver,
provider: params.ctx.Surface ?? params.ctx.Provider,
accountId: params.ctx.AccountId,
});
@@ -376,10 +250,11 @@ export async function tryDispatchAcpReply(params: {
await projector.flush(true);
const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final";
if (ttsMode === "final" && acpBlockCount > 0 && acpAccumulatedBlockText.trim()) {
const accumulatedBlockText = delivery.getAccumulatedBlockText();
if (ttsMode === "final" && delivery.getBlockCount() > 0 && accumulatedBlockText.trim()) {
try {
const ttsSyntheticReply = await maybeApplyTtsToPayload({
payload: { text: acpAccumulatedBlockText },
payload: { text: accumulatedBlockText },
cfg: params.cfg,
channel: params.ttsChannel,
kind: "final",
@@ -387,7 +262,7 @@ export async function tryDispatchAcpReply(params: {
ttsAuto: params.sessionTtsAuto,
});
if (ttsSyntheticReply.mediaUrl) {
const delivered = await deliverAcpPayload("final", {
const delivered = await delivery.deliver("final", {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
});
@@ -412,7 +287,7 @@ export async function tryDispatchAcpReply(params: {
meta: currentMeta,
});
if (resolvedDetails.length > 0) {
const delivered = await deliverAcpPayload("final", {
const delivered = await delivery.deliver("final", {
text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")),
});
queuedFinal = queuedFinal || delivered;
@@ -421,9 +296,7 @@ export async function tryDispatchAcpReply(params: {
}
const counts = params.dispatcher.getQueuedCounts();
counts.tool += routedCounts.tool;
counts.block += routedCounts.block;
counts.final += routedCounts.final;
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
@@ -438,15 +311,13 @@ export async function tryDispatchAcpReply(params: {
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP turn failed before completion.",
});
const delivered = await deliverAcpPayload("final", {
const delivered = await delivery.deliver("final", {
text: formatAcpRuntimeErrorText(acpError),
isError: true,
});
queuedFinal = queuedFinal || delivered;
const counts = params.dispatcher.getQueuedCounts();
counts.tool += routedCounts.tool;
counts.block += routedCounts.block;
counts.final += routedCounts.final;
delivery.applyRoutedCounts(counts);
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
logVerbose(
`acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,

View File

@@ -41,9 +41,6 @@ const acpMocks = vi.hoisted(() => ({
const sessionBindingMocks = vi.hoisted(() => ({
listBySession: vi.fn<(targetSessionKey: string) => SessionBindingRecord[]>(() => []),
}));
const messageActionMocks = vi.hoisted(() => ({
runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })),
}));
const ttsMocks = vi.hoisted(() => {
const state = {
synthesizeFinalAudio: false,
@@ -145,9 +142,6 @@ vi.mock("../../tts/tts.js", () => ({
normalizeTtsAutoMode: (value: unknown) => ttsMocks.normalizeTtsAutoMode(value),
resolveTtsConfig: (cfg: OpenClawConfig) => ttsMocks.resolveTtsConfig(cfg),
}));
vi.mock("../../infra/outbound/message-action-runner.js", () => ({
runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params),
}));
const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js");
const { resetInboundDedupe } = await import("./inbound-dedupe.js");
@@ -230,8 +224,6 @@ describe("dispatchReplyFromConfig", () => {
acpMocks.upsertAcpSessionMeta.mockReset();
acpMocks.upsertAcpSessionMeta.mockResolvedValue(null);
acpMocks.requireAcpRuntimeBackend.mockReset();
messageActionMocks.runMessageAction.mockReset();
messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const });
sessionBindingMocks.listBySession.mockReset();
sessionBindingMocks.listBySession.mockReturnValue([]);
ttsMocks.state.synthesizeFinalAudio = false;
@@ -1172,306 +1164,6 @@ describe("dispatchReplyFromConfig", () => {
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("edits ACP tool lifecycle updates in place when channel edit is available", async () => {
setNoAbort();
mocks.routeReply.mockClear();
const runtime = createAcpRuntime([
{
type: "tool_call",
tag: "tool_call",
toolCallId: "call-1",
status: "in_progress",
title: "Run command",
text: "Run command (in_progress)",
},
{
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call-1",
status: "completed",
title: "Run command",
text: "Run command (completed)",
},
{ type: "done" },
]);
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime,
});
mocks.routeReply
.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-1" })
.mockResolvedValueOnce({ ok: true, messageId: "final-msg-1" });
const cfg = {
acp: {
enabled: true,
dispatch: { enabled: true },
},
} as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:thread-1",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
});
await dispatchReplyFromConfig({ ctx, cfg, dispatcher });
expect(messageActionMocks.runMessageAction).toHaveBeenCalledWith(
expect.objectContaining({
action: "edit",
params: expect.objectContaining({
channel: "telegram",
to: "telegram:thread-1",
messageId: "tool-msg-1",
}),
}),
);
expect(mocks.routeReply).toHaveBeenCalledTimes(1);
expect(dispatcher.sendToolResult).not.toHaveBeenCalled();
});
it("falls back to new ACP tool message when edit action fails", async () => {
setNoAbort();
mocks.routeReply.mockClear();
messageActionMocks.runMessageAction.mockRejectedValueOnce(new Error("edit unsupported"));
const runtime = createAcpRuntime([
{
type: "tool_call",
tag: "tool_call",
toolCallId: "call-2",
status: "in_progress",
title: "Run command",
text: "Run command (in_progress)",
},
{
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call-2",
status: "completed",
title: "Run command",
text: "Run command (completed)",
},
{ type: "done" },
]);
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime,
});
mocks.routeReply
.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2" })
.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2-fallback" })
.mockResolvedValueOnce({ ok: true, messageId: "final-msg-2" });
const cfg = {
acp: {
enabled: true,
dispatch: { enabled: true },
},
} as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:thread-1",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
});
await dispatchReplyFromConfig({ ctx, cfg, dispatcher });
expect(messageActionMocks.runMessageAction).toHaveBeenCalledTimes(1);
expect(mocks.routeReply).toHaveBeenCalledTimes(2);
expect(dispatcher.sendToolResult).not.toHaveBeenCalled();
});
it("falls back to new ACP tool message when first tool send has no message id", async () => {
setNoAbort();
mocks.routeReply.mockClear();
const runtime = createAcpRuntime([
{
type: "tool_call",
tag: "tool_call",
toolCallId: "call-3",
status: "in_progress",
title: "Run command",
text: "Run command (in_progress)",
},
{
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call-3",
status: "completed",
title: "Run command",
text: "Run command (completed)",
},
{ type: "done" },
]);
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime,
});
mocks.routeReply
.mockResolvedValueOnce({ ok: true })
.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-3-fallback" })
.mockResolvedValueOnce({ ok: true, messageId: "final-msg-3" });
const cfg = {
acp: {
enabled: true,
dispatch: { enabled: true },
},
} as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:thread-1",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
});
await dispatchReplyFromConfig({ ctx, cfg, dispatcher });
expect(messageActionMocks.runMessageAction).not.toHaveBeenCalled();
expect(mocks.routeReply).toHaveBeenCalledTimes(2);
expect(dispatcher.sendToolResult).not.toHaveBeenCalled();
});
it("starts ACP typing lifecycle only when visible output is projected", async () => {
setNoAbort();
const hiddenRuntime = createAcpRuntime([
{
type: "status",
tag: "usage_update",
text: "usage updated: 10/100",
used: 10,
size: 100,
},
{ type: "done" },
]);
acpMocks.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex-acp:session-1",
storeSessionKey: "agent:codex-acp:session-1",
cfg: {},
storePath: "/tmp/mock-sessions.json",
entry: {},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
},
});
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime: hiddenRuntime,
});
const cfg = {
acp: {
enabled: true,
dispatch: { enabled: true },
},
} as OpenClawConfig;
const dispatcher = createDispatcher();
const onReplyStart = vi.fn();
const hiddenCtx = buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "hidden-only",
MessageSid: "acp-hidden-1",
});
await dispatchReplyFromConfig({
ctx: hiddenCtx,
cfg,
dispatcher,
replyOptions: { onReplyStart },
});
expect(onReplyStart).not.toHaveBeenCalled();
acpManagerTesting.resetAcpSessionManagerForTests();
const visibleRuntime = createAcpRuntime([
{
type: "text_delta",
text: "visible output",
},
{ type: "done" },
]);
acpMocks.requireAcpRuntimeBackend.mockReturnValue({
id: "acpx",
runtime: visibleRuntime,
});
const visibleCtx = buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "visible",
MessageSid: "acp-visible-1",
});
await dispatchReplyFromConfig({
ctx: visibleCtx,
cfg,
dispatcher: createDispatcher(),
replyOptions: { onReplyStart },
});
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("closes oneshot ACP sessions after the turn completes", async () => {
setNoAbort();
const runtime = createAcpRuntime([{ type: "done" }]);

View File

@@ -0,0 +1,33 @@
import type { OpenClawConfig } from "../../../config/config.js";
import type { SessionAcpMeta } from "../../../config/sessions/types.js";
export function createAcpTestConfig(overrides?: Partial<OpenClawConfig>): OpenClawConfig {
return {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 64,
},
},
...overrides,
} as OpenClawConfig;
}
export function createAcpSessionMeta(overrides?: Partial<SessionAcpMeta>): SessionAcpMeta {
return {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:1",
mode: "persistent",
state: "idle",
lastActivityAt: Date.now(),
identity: {
state: "resolved",
acpxSessionId: "acpx-session-1",
source: "status",
lastUpdatedAt: Date.now(),
},
...overrides,
};
}