* docs: add ACP thread-bound agents plan doc * docs: expand ACP implementation specification * feat(acp): route ACP sessions through core dispatch and lifecycle cleanup * feat(acp): add /acp commands and Discord spawn gate * ACP: add acpx runtime plugin backend * fix(subagents): defer transient lifecycle errors before announce * Agents: harden ACP sessions_spawn and tighten spawn guidance * Agents: require explicit ACP target for runtime spawns * docs: expand ACP control-plane implementation plan * ACP: harden metadata seeding and spawn guidance * ACP: centralize runtime control-plane manager and fail-closed dispatch * ACP: harden runtime manager and unify spawn helpers * Commands: route ACP sessions through ACP runtime in agent command * ACP: require persisted metadata for runtime spawns * Sessions: preserve ACP metadata when updating entries * Plugins: harden ACP backend registry across loaders * ACPX: make availability probe compatible with adapters * E2E: add manual Discord ACP plain-language smoke script * ACPX: preserve streamed spacing across Discord delivery * Docs: add ACP Discord streaming strategy * ACP: harden Discord stream buffering for thread replies * ACP: reuse shared block reply pipeline for projector * ACP: unify streaming config and adopt coalesceIdleMs * Docs: add temporary ACP production hardening plan * Docs: trim temporary ACP hardening plan goals * Docs: gate ACP thread controls by backend capabilities * ACP: add capability-gated runtime controls and /acp operator commands * Docs: remove temporary ACP hardening plan * ACP: fix spawn target validation and close cache cleanup * ACP: harden runtime dispatch and recovery paths * ACP: split ACP command/runtime internals and centralize policy * ACP: harden runtime lifecycle, validation, and observability * ACP: surface runtime and backend session IDs in thread bindings * docs: add temp plan for binding-service migration * ACP: migrate thread binding flows to SessionBindingService * ACP: address review feedback and preserve prompt wording * ACPX plugin: pin runtime dependency and prefer bundled CLI * Discord: complete binding-service migration cleanup and restore ACP plan * Docs: add standalone ACP agents guide * ACP: route harness intents to thread-bound ACP sessions * ACP: fix spawn thread routing and queue-owner stall * ACP: harden startup reconciliation and command bypass handling * ACP: fix dispatch bypass type narrowing * ACP: align runtime metadata to agentSessionId * ACP: normalize session identifier handling and labels * ACP: mark thread banner session ids provisional until first reply * ACP: stabilize session identity mapping and startup reconciliation * ACP: add resolved session-id notices and cwd in thread intros * Discord: prefix thread meta notices consistently * Discord: unify ACP/thread meta notices with gear prefix * Discord: split thread persona naming from meta formatting * Extensions: bump acpx plugin dependency to 0.1.9 * Agents: gate ACP prompt guidance behind acp.enabled * Docs: remove temp experiment plan docs * Docs: scope streaming plan to holy grail refactor * Docs: refactor ACP agents guide for human-first flow * Docs/Skill: add ACP feature-flag guidance and direct acpx telephone-game flow * Docs/Skill: add OpenCode and Pi to ACP harness lists * Docs/Skill: align ACP harness list with current acpx registry * Dev/Test: move ACP plain-language smoke script and mark as keep * Docs/Skill: reorder ACP harness lists with Pi first * ACP: split control-plane manager into core/types/utils modules * Docs: refresh ACP thread-bound agents plan * ACP: extract dispatch lane and split manager domains * ACP: centralize binding context and remove reverse deps * Infra: unify system message formatting * ACP: centralize error boundaries and session id rendering * ACP: enforce init concurrency cap and strict meta clear * Tests: fix ACP dispatch binding mock typing * Tests: fix Discord thread-binding mock drift and ACP request id * ACP: gate slash bypass and persist cleared overrides * ACPX: await pre-abort cancel before runTurn return * Extension: pin acpx runtime dependency to 0.1.11 * Docs: add pinned acpx install strategy for ACP extension * Extensions/acpx: enforce strict local pinned startup * Extensions/acpx: tighten acp-router install guidance * ACPX: retry runtime test temp-dir cleanup * Extensions/acpx: require proactive ACPX repair for thread spawns * Extensions/acpx: require restart offer after acpx reinstall * extensions/acpx: remove workspace protocol devDependency * extensions/acpx: bump pinned acpx to 0.1.13 * extensions/acpx: sync lockfile after dependency bump * ACPX: make runtime spawn Windows-safe * fix: align doctor-config-flow repair tests with default-account migration (#23580) (thanks @osolmaz)
400 lines
12 KiB
TypeScript
400 lines
12 KiB
TypeScript
import crypto from "node:crypto";
|
|
import fs from "node:fs/promises";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
import type { AgentTool } from "@mariozechner/pi-agent-core";
|
|
import type { ImageContent } from "@mariozechner/pi-ai";
|
|
import type { ThinkLevel } from "../../auto-reply/thinking.js";
|
|
import type { OpenClawConfig } from "../../config/config.js";
|
|
import type { CliBackendConfig } from "../../config/types.js";
|
|
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
|
|
import { isRecord } from "../../utils.js";
|
|
import { buildModelAliasLines } from "../model-alias-lines.js";
|
|
import { resolveDefaultModelForAgent } from "../model-selection.js";
|
|
import { resolveOwnerDisplaySetting } from "../owner-display.js";
|
|
import type { EmbeddedContextFile } from "../pi-embedded-helpers.js";
|
|
import { detectRuntimeShell } from "../shell-utils.js";
|
|
import { buildSystemPromptParams } from "../system-prompt-params.js";
|
|
import { buildAgentSystemPrompt } from "../system-prompt.js";
|
|
export { buildCliSupervisorScopeKey, resolveCliNoOutputTimeoutMs } from "./reliability.js";
|
|
|
|
const CLI_RUN_QUEUE = new Map<string, Promise<unknown>>();
|
|
export function enqueueCliRun<T>(key: string, task: () => Promise<T>): Promise<T> {
|
|
const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve();
|
|
const chained = prior.catch(() => undefined).then(task);
|
|
// Keep queue continuity even when a run rejects, without emitting unhandled rejections.
|
|
const tracked = chained
|
|
.catch(() => undefined)
|
|
.finally(() => {
|
|
if (CLI_RUN_QUEUE.get(key) === tracked) {
|
|
CLI_RUN_QUEUE.delete(key);
|
|
}
|
|
});
|
|
CLI_RUN_QUEUE.set(key, tracked);
|
|
return chained;
|
|
}
|
|
|
|
type CliUsage = {
|
|
input?: number;
|
|
output?: number;
|
|
cacheRead?: number;
|
|
cacheWrite?: number;
|
|
total?: number;
|
|
};
|
|
|
|
export type CliOutput = {
|
|
text: string;
|
|
sessionId?: string;
|
|
usage?: CliUsage;
|
|
};
|
|
|
|
export function buildSystemPrompt(params: {
|
|
workspaceDir: string;
|
|
config?: OpenClawConfig;
|
|
defaultThinkLevel?: ThinkLevel;
|
|
extraSystemPrompt?: string;
|
|
ownerNumbers?: string[];
|
|
heartbeatPrompt?: string;
|
|
docsPath?: string;
|
|
tools: AgentTool[];
|
|
contextFiles?: EmbeddedContextFile[];
|
|
modelDisplay: string;
|
|
agentId?: string;
|
|
}) {
|
|
const defaultModelRef = resolveDefaultModelForAgent({
|
|
cfg: params.config ?? {},
|
|
agentId: params.agentId,
|
|
});
|
|
const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`;
|
|
const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({
|
|
config: params.config,
|
|
agentId: params.agentId,
|
|
workspaceDir: params.workspaceDir,
|
|
cwd: process.cwd(),
|
|
runtime: {
|
|
host: "openclaw",
|
|
os: `${os.type()} ${os.release()}`,
|
|
arch: os.arch(),
|
|
node: process.version,
|
|
model: params.modelDisplay,
|
|
defaultModel: defaultModelLabel,
|
|
shell: detectRuntimeShell(),
|
|
},
|
|
});
|
|
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
|
|
const ownerDisplay = resolveOwnerDisplaySetting(params.config);
|
|
return buildAgentSystemPrompt({
|
|
workspaceDir: params.workspaceDir,
|
|
defaultThinkLevel: params.defaultThinkLevel,
|
|
extraSystemPrompt: params.extraSystemPrompt,
|
|
ownerNumbers: params.ownerNumbers,
|
|
ownerDisplay: ownerDisplay.ownerDisplay,
|
|
ownerDisplaySecret: ownerDisplay.ownerDisplaySecret,
|
|
reasoningTagHint: false,
|
|
heartbeatPrompt: params.heartbeatPrompt,
|
|
docsPath: params.docsPath,
|
|
acpEnabled: params.config?.acp?.enabled !== false,
|
|
runtimeInfo,
|
|
toolNames: params.tools.map((tool) => tool.name),
|
|
modelAliasLines: buildModelAliasLines(params.config),
|
|
userTimezone,
|
|
userTime,
|
|
userTimeFormat,
|
|
contextFiles: params.contextFiles,
|
|
ttsHint,
|
|
memoryCitationsMode: params.config?.memory?.citations,
|
|
});
|
|
}
|
|
|
|
export function normalizeCliModel(modelId: string, backend: CliBackendConfig): string {
|
|
const trimmed = modelId.trim();
|
|
if (!trimmed) {
|
|
return trimmed;
|
|
}
|
|
const direct = backend.modelAliases?.[trimmed];
|
|
if (direct) {
|
|
return direct;
|
|
}
|
|
const lower = trimmed.toLowerCase();
|
|
const mapped = backend.modelAliases?.[lower];
|
|
if (mapped) {
|
|
return mapped;
|
|
}
|
|
return trimmed;
|
|
}
|
|
|
|
function toUsage(raw: Record<string, unknown>): CliUsage | undefined {
|
|
const pick = (key: string) =>
|
|
typeof raw[key] === "number" && raw[key] > 0 ? raw[key] : undefined;
|
|
const input = pick("input_tokens") ?? pick("inputTokens");
|
|
const output = pick("output_tokens") ?? pick("outputTokens");
|
|
const cacheRead =
|
|
pick("cache_read_input_tokens") ?? pick("cached_input_tokens") ?? pick("cacheRead");
|
|
const cacheWrite = pick("cache_write_input_tokens") ?? pick("cacheWrite");
|
|
const total = pick("total_tokens") ?? pick("total");
|
|
if (!input && !output && !cacheRead && !cacheWrite && !total) {
|
|
return undefined;
|
|
}
|
|
return { input, output, cacheRead, cacheWrite, total };
|
|
}
|
|
|
|
function collectText(value: unknown): string {
|
|
if (!value) {
|
|
return "";
|
|
}
|
|
if (typeof value === "string") {
|
|
return value;
|
|
}
|
|
if (Array.isArray(value)) {
|
|
return value.map((entry) => collectText(entry)).join("");
|
|
}
|
|
if (!isRecord(value)) {
|
|
return "";
|
|
}
|
|
if (typeof value.text === "string") {
|
|
return value.text;
|
|
}
|
|
if (typeof value.content === "string") {
|
|
return value.content;
|
|
}
|
|
if (Array.isArray(value.content)) {
|
|
return value.content.map((entry) => collectText(entry)).join("");
|
|
}
|
|
if (isRecord(value.message)) {
|
|
return collectText(value.message);
|
|
}
|
|
return "";
|
|
}
|
|
|
|
function pickSessionId(
|
|
parsed: Record<string, unknown>,
|
|
backend: CliBackendConfig,
|
|
): string | undefined {
|
|
const fields = backend.sessionIdFields ?? [
|
|
"session_id",
|
|
"sessionId",
|
|
"conversation_id",
|
|
"conversationId",
|
|
];
|
|
for (const field of fields) {
|
|
const value = parsed[field];
|
|
if (typeof value === "string" && value.trim()) {
|
|
return value.trim();
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
export function parseCliJson(raw: string, backend: CliBackendConfig): CliOutput | null {
|
|
const trimmed = raw.trim();
|
|
if (!trimmed) {
|
|
return null;
|
|
}
|
|
let parsed: unknown;
|
|
try {
|
|
parsed = JSON.parse(trimmed);
|
|
} catch {
|
|
return null;
|
|
}
|
|
if (!isRecord(parsed)) {
|
|
return null;
|
|
}
|
|
const sessionId = pickSessionId(parsed, backend);
|
|
const usage = isRecord(parsed.usage) ? toUsage(parsed.usage) : undefined;
|
|
const text =
|
|
collectText(parsed.message) ||
|
|
collectText(parsed.content) ||
|
|
collectText(parsed.result) ||
|
|
collectText(parsed);
|
|
return { text: text.trim(), sessionId, usage };
|
|
}
|
|
|
|
export function parseCliJsonl(raw: string, backend: CliBackendConfig): CliOutput | null {
|
|
const lines = raw
|
|
.split(/\r?\n/g)
|
|
.map((line) => line.trim())
|
|
.filter(Boolean);
|
|
if (lines.length === 0) {
|
|
return null;
|
|
}
|
|
let sessionId: string | undefined;
|
|
let usage: CliUsage | undefined;
|
|
const texts: string[] = [];
|
|
for (const line of lines) {
|
|
let parsed: unknown;
|
|
try {
|
|
parsed = JSON.parse(line);
|
|
} catch {
|
|
continue;
|
|
}
|
|
if (!isRecord(parsed)) {
|
|
continue;
|
|
}
|
|
if (!sessionId) {
|
|
sessionId = pickSessionId(parsed, backend);
|
|
}
|
|
if (!sessionId && typeof parsed.thread_id === "string") {
|
|
sessionId = parsed.thread_id.trim();
|
|
}
|
|
if (isRecord(parsed.usage)) {
|
|
usage = toUsage(parsed.usage) ?? usage;
|
|
}
|
|
const item = isRecord(parsed.item) ? parsed.item : null;
|
|
if (item && typeof item.text === "string") {
|
|
const type = typeof item.type === "string" ? item.type.toLowerCase() : "";
|
|
if (!type || type.includes("message")) {
|
|
texts.push(item.text);
|
|
}
|
|
}
|
|
}
|
|
const text = texts.join("\n").trim();
|
|
if (!text) {
|
|
return null;
|
|
}
|
|
return { text, sessionId, usage };
|
|
}
|
|
|
|
export function resolveSystemPromptUsage(params: {
|
|
backend: CliBackendConfig;
|
|
isNewSession: boolean;
|
|
systemPrompt?: string;
|
|
}): string | null {
|
|
const systemPrompt = params.systemPrompt?.trim();
|
|
if (!systemPrompt) {
|
|
return null;
|
|
}
|
|
const when = params.backend.systemPromptWhen ?? "first";
|
|
if (when === "never") {
|
|
return null;
|
|
}
|
|
if (when === "first" && !params.isNewSession) {
|
|
return null;
|
|
}
|
|
if (!params.backend.systemPromptArg?.trim()) {
|
|
return null;
|
|
}
|
|
return systemPrompt;
|
|
}
|
|
|
|
export function resolveSessionIdToSend(params: {
|
|
backend: CliBackendConfig;
|
|
cliSessionId?: string;
|
|
}): { sessionId?: string; isNew: boolean } {
|
|
const mode = params.backend.sessionMode ?? "always";
|
|
const existing = params.cliSessionId?.trim();
|
|
if (mode === "none") {
|
|
return { sessionId: undefined, isNew: !existing };
|
|
}
|
|
if (mode === "existing") {
|
|
return { sessionId: existing, isNew: !existing };
|
|
}
|
|
if (existing) {
|
|
return { sessionId: existing, isNew: false };
|
|
}
|
|
return { sessionId: crypto.randomUUID(), isNew: true };
|
|
}
|
|
|
|
export function resolvePromptInput(params: { backend: CliBackendConfig; prompt: string }): {
|
|
argsPrompt?: string;
|
|
stdin?: string;
|
|
} {
|
|
const inputMode = params.backend.input ?? "arg";
|
|
if (inputMode === "stdin") {
|
|
return { stdin: params.prompt };
|
|
}
|
|
if (params.backend.maxPromptArgChars && params.prompt.length > params.backend.maxPromptArgChars) {
|
|
return { stdin: params.prompt };
|
|
}
|
|
return { argsPrompt: params.prompt };
|
|
}
|
|
|
|
function resolveImageExtension(mimeType: string): string {
|
|
const normalized = mimeType.toLowerCase();
|
|
if (normalized.includes("png")) {
|
|
return "png";
|
|
}
|
|
if (normalized.includes("jpeg") || normalized.includes("jpg")) {
|
|
return "jpg";
|
|
}
|
|
if (normalized.includes("gif")) {
|
|
return "gif";
|
|
}
|
|
if (normalized.includes("webp")) {
|
|
return "webp";
|
|
}
|
|
return "bin";
|
|
}
|
|
|
|
export function appendImagePathsToPrompt(prompt: string, paths: string[]): string {
|
|
if (!paths.length) {
|
|
return prompt;
|
|
}
|
|
const trimmed = prompt.trimEnd();
|
|
const separator = trimmed ? "\n\n" : "";
|
|
return `${trimmed}${separator}${paths.join("\n")}`;
|
|
}
|
|
|
|
export async function writeCliImages(
|
|
images: ImageContent[],
|
|
): Promise<{ paths: string[]; cleanup: () => Promise<void> }> {
|
|
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cli-images-"));
|
|
const paths: string[] = [];
|
|
for (let i = 0; i < images.length; i += 1) {
|
|
const image = images[i];
|
|
const ext = resolveImageExtension(image.mimeType);
|
|
const filePath = path.join(tempDir, `image-${i + 1}.${ext}`);
|
|
const buffer = Buffer.from(image.data, "base64");
|
|
await fs.writeFile(filePath, buffer, { mode: 0o600 });
|
|
paths.push(filePath);
|
|
}
|
|
const cleanup = async () => {
|
|
await fs.rm(tempDir, { recursive: true, force: true });
|
|
};
|
|
return { paths, cleanup };
|
|
}
|
|
|
|
export function buildCliArgs(params: {
|
|
backend: CliBackendConfig;
|
|
baseArgs: string[];
|
|
modelId: string;
|
|
sessionId?: string;
|
|
systemPrompt?: string | null;
|
|
imagePaths?: string[];
|
|
promptArg?: string;
|
|
useResume: boolean;
|
|
}): string[] {
|
|
const args: string[] = [...params.baseArgs];
|
|
if (!params.useResume && params.backend.modelArg && params.modelId) {
|
|
args.push(params.backend.modelArg, params.modelId);
|
|
}
|
|
if (!params.useResume && params.systemPrompt && params.backend.systemPromptArg) {
|
|
args.push(params.backend.systemPromptArg, params.systemPrompt);
|
|
}
|
|
if (!params.useResume && params.sessionId) {
|
|
if (params.backend.sessionArgs && params.backend.sessionArgs.length > 0) {
|
|
for (const entry of params.backend.sessionArgs) {
|
|
args.push(entry.replaceAll("{sessionId}", params.sessionId));
|
|
}
|
|
} else if (params.backend.sessionArg) {
|
|
args.push(params.backend.sessionArg, params.sessionId);
|
|
}
|
|
}
|
|
if (params.imagePaths && params.imagePaths.length > 0) {
|
|
const mode = params.backend.imageMode ?? "repeat";
|
|
const imageArg = params.backend.imageArg;
|
|
if (imageArg) {
|
|
if (mode === "list") {
|
|
args.push(imageArg, params.imagePaths.join(","));
|
|
} else {
|
|
for (const imagePath of params.imagePaths) {
|
|
args.push(imageArg, imagePath);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (params.promptArg !== undefined) {
|
|
args.push(params.promptArg);
|
|
}
|
|
return args;
|
|
}
|