Files
openclaw/src/agents/subagent-registry.ts
Nikolay Petrov a9f1188785 sessions_spawn: inline attachments with redaction, lifecycle cleanup, and docs (#16761)
Add inline file attachment support for sessions_spawn (subagent runtime only):

- Schema: attachments[] (name, content, encoding, mimeType) and attachAs.mountPath hint
- Materialization: files written to .openclaw/attachments/<uuid>/ with manifest.json
- Validation: strict base64 decode, filename checks, size limits, duplicate detection
- Transcript redaction: sanitizeToolCallInputs redacts attachment content from persisted transcripts
- Lifecycle cleanup: safeRemoveAttachmentsDir with symlink-safe path containment check
- Config: tools.sessions_spawn.attachments (enabled, maxFiles, maxFileBytes, maxTotalBytes, retainOnSessionKeep)
- Registry: attachmentsDir/attachmentsRootDir/retainAttachmentsOnKeep on SubagentRunRecord
- ACP rejection: attachments rejected for runtime=acp with clear error message
- Docs: updated tools/index.md, concepts/session-tool.md, configuration-reference.md
- Tests: 85 new/updated tests across 5 test files

Fixes:
- Guard fs.rm in materialization catch block with try/catch (review concern #1)
- Remove unreachable fallback in safeRemoveAttachmentsDir (review concern #7)
- Move attachment cleanup out of retry path to avoid timing issues with announce loop

Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
Co-authored-by: napetrov <napetrov@users.noreply.github.com>
2026-03-01 21:33:51 -08:00

1227 lines
35 KiB
TypeScript

import { promises as fs } from "node:fs";
import path from "node:path";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveStorePath,
type SessionEntry,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { defaultRuntime } from "../runtime.js";
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js";
import {
SUBAGENT_ENDED_OUTCOME_KILLED,
SUBAGENT_ENDED_REASON_COMPLETE,
SUBAGENT_ENDED_REASON_ERROR,
SUBAGENT_ENDED_REASON_KILLED,
type SubagentLifecycleEndedReason,
} from "./subagent-lifecycle-events.js";
import {
resolveCleanupCompletionReason,
resolveDeferredCleanupDecision,
} from "./subagent-registry-cleanup.js";
import {
emitSubagentEndedHookOnce,
resolveLifecycleOutcomeFromRunOutcome,
runOutcomesEqual,
} from "./subagent-registry-completion.js";
import {
countActiveDescendantRunsFromRuns,
countActiveRunsForSessionFromRuns,
findRunIdsByChildSessionKeyFromRuns,
listDescendantRunsForRequesterFromRuns,
listRunsForRequesterFromRuns,
resolveRequesterForChildSessionFromRuns,
} from "./subagent-registry-queries.js";
import {
getSubagentRunsSnapshotForRead,
persistSubagentRunsToDisk,
restoreSubagentRunsFromDisk,
} from "./subagent-registry-state.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
import { resolveAgentTimeoutMs } from "./timeout.js";
export type { SubagentRunRecord } from "./subagent-registry.types.js";
const subagentRuns = new Map<string, SubagentRunRecord>();
let sweeper: NodeJS.Timeout | null = null;
let listenerStarted = false;
let listenerStop: (() => void) | null = null;
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
var restoreAttempted = false;
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000;
const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000;
/**
* Maximum number of announce delivery attempts before giving up.
* Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly
* returns `false` due to stale state or transient conditions (#18264).
*/
const MAX_ANNOUNCE_RETRY_COUNT = 3;
/**
* Announce entries older than this are force-expired even if delivery never
* succeeded. Guards against stale registry entries surviving gateway restarts.
*/
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
/**
* Embedded runs can emit transient lifecycle `error` events while provider/model
* retry is still in progress. Defer terminal error cleanup briefly so a
* subsequent lifecycle `start` / `end` can cancel premature failure announces.
*/
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
function resolveAnnounceRetryDelayMs(retryCount: number) {
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
// retryCount is "attempts already made", so retry #1 waits 1s, then 2s, 4s...
const backoffExponent = Math.max(0, boundedRetryCount - 1);
const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** backoffExponent;
return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS);
}
function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") {
const retryCount = entry.announceRetryCount ?? 0;
const endedAgoMs =
typeof entry.endedAt === "number" ? Math.max(0, Date.now() - entry.endedAt) : undefined;
const endedAgoLabel = endedAgoMs != null ? `${Math.round(endedAgoMs / 1000)}s` : "n/a";
defaultRuntime.log(
`[warn] Subagent announce give up (${reason}) run=${entry.runId} child=${entry.childSessionKey} requester=${entry.requesterSessionKey} retries=${retryCount} endedAgo=${endedAgoLabel}`,
);
}
function persistSubagentRuns() {
persistSubagentRunsToDisk(subagentRuns);
}
function findSessionEntryByKey(store: Record<string, SessionEntry>, sessionKey: string) {
const direct = store[sessionKey];
if (direct) {
return direct;
}
const normalized = sessionKey.toLowerCase();
for (const [key, entry] of Object.entries(store)) {
if (key.toLowerCase() === normalized) {
return entry;
}
}
return undefined;
}
function resolveSubagentRunOrphanReason(params: {
entry: SubagentRunRecord;
storeCache?: Map<string, Record<string, SessionEntry>>;
}): SubagentRunOrphanReason | null {
const childSessionKey = params.entry.childSessionKey?.trim();
if (!childSessionKey) {
return "missing-session-entry";
}
try {
const cfg = loadConfig();
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
let store = params.storeCache?.get(storePath);
if (!store) {
store = loadSessionStore(storePath);
params.storeCache?.set(storePath, store);
}
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
if (!sessionEntry) {
return "missing-session-entry";
}
if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) {
return "missing-session-id";
}
return null;
} catch {
// Best-effort guard: avoid false orphan pruning on transient read/config failures.
return null;
}
}
function reconcileOrphanedRun(params: {
runId: string;
entry: SubagentRunRecord;
reason: SubagentRunOrphanReason;
source: "restore" | "resume";
}) {
const now = Date.now();
let changed = false;
if (typeof params.entry.endedAt !== "number") {
params.entry.endedAt = now;
changed = true;
}
const orphanOutcome: SubagentRunOutcome = {
status: "error",
error: `orphaned subagent run (${params.reason})`,
};
if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) {
params.entry.outcome = orphanOutcome;
changed = true;
}
if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) {
params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR;
changed = true;
}
if (params.entry.cleanupHandled !== true) {
params.entry.cleanupHandled = true;
changed = true;
}
if (typeof params.entry.cleanupCompletedAt !== "number") {
params.entry.cleanupCompletedAt = now;
changed = true;
}
const removed = subagentRuns.delete(params.runId);
resumedRuns.delete(params.runId);
if (!removed && !changed) {
return false;
}
defaultRuntime.log(
`[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`,
);
return true;
}
function reconcileOrphanedRestoredRuns() {
const storeCache = new Map<string, Record<string, SessionEntry>>();
let changed = false;
for (const [runId, entry] of subagentRuns.entries()) {
const orphanReason = resolveSubagentRunOrphanReason({
entry,
storeCache,
});
if (!orphanReason) {
continue;
}
if (
reconcileOrphanedRun({
runId,
entry,
reason: orphanReason,
source: "restore",
})
) {
changed = true;
}
}
return changed;
}
const resumedRuns = new Set<string>();
const endedHookInFlightRunIds = new Set<string>();
const pendingLifecycleErrorByRunId = new Map<
string,
{
timer: NodeJS.Timeout;
endedAt: number;
error?: string;
}
>();
function clearPendingLifecycleError(runId: string) {
const pending = pendingLifecycleErrorByRunId.get(runId);
if (!pending) {
return;
}
clearTimeout(pending.timer);
pendingLifecycleErrorByRunId.delete(runId);
}
function clearAllPendingLifecycleErrors() {
for (const pending of pendingLifecycleErrorByRunId.values()) {
clearTimeout(pending.timer);
}
pendingLifecycleErrorByRunId.clear();
}
function schedulePendingLifecycleError(params: { runId: string; endedAt: number; error?: string }) {
clearPendingLifecycleError(params.runId);
const timer = setTimeout(() => {
const pending = pendingLifecycleErrorByRunId.get(params.runId);
if (!pending || pending.timer !== timer) {
return;
}
pendingLifecycleErrorByRunId.delete(params.runId);
const entry = subagentRuns.get(params.runId);
if (!entry) {
return;
}
if (entry.endedReason === SUBAGENT_ENDED_REASON_COMPLETE || entry.outcome?.status === "ok") {
return;
}
void completeSubagentRun({
runId: params.runId,
endedAt: pending.endedAt,
outcome: {
status: "error",
error: pending.error,
},
reason: SUBAGENT_ENDED_REASON_ERROR,
sendFarewell: true,
accountId: entry.requesterOrigin?.accountId,
triggerCleanup: true,
});
}, LIFECYCLE_ERROR_RETRY_GRACE_MS);
timer.unref?.();
pendingLifecycleErrorByRunId.set(params.runId, {
timer,
endedAt: params.endedAt,
error: params.error,
});
}
function suppressAnnounceForSteerRestart(entry?: SubagentRunRecord) {
return entry?.suppressAnnounceReason === "steer-restart";
}
function shouldKeepThreadBindingAfterRun(params: {
entry: SubagentRunRecord;
reason: SubagentLifecycleEndedReason;
}) {
if (params.reason === SUBAGENT_ENDED_REASON_KILLED) {
return false;
}
return params.entry.spawnMode === "session";
}
function shouldEmitEndedHookForRun(params: {
entry: SubagentRunRecord;
reason: SubagentLifecycleEndedReason;
}) {
return !shouldKeepThreadBindingAfterRun(params);
}
async function emitSubagentEndedHookForRun(params: {
entry: SubagentRunRecord;
reason?: SubagentLifecycleEndedReason;
sendFarewell?: boolean;
accountId?: string;
}) {
const reason = params.reason ?? params.entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE;
const outcome = resolveLifecycleOutcomeFromRunOutcome(params.entry.outcome);
const error = params.entry.outcome?.status === "error" ? params.entry.outcome.error : undefined;
await emitSubagentEndedHookOnce({
entry: params.entry,
reason,
sendFarewell: params.sendFarewell,
accountId: params.accountId ?? params.entry.requesterOrigin?.accountId,
outcome,
error,
inFlightRunIds: endedHookInFlightRunIds,
persist: persistSubagentRuns,
});
}
async function completeSubagentRun(params: {
runId: string;
endedAt?: number;
outcome: SubagentRunOutcome;
reason: SubagentLifecycleEndedReason;
sendFarewell?: boolean;
accountId?: string;
triggerCleanup: boolean;
}) {
clearPendingLifecycleError(params.runId);
const entry = subagentRuns.get(params.runId);
if (!entry) {
return;
}
let mutated = false;
const endedAt = typeof params.endedAt === "number" ? params.endedAt : Date.now();
if (entry.endedAt !== endedAt) {
entry.endedAt = endedAt;
mutated = true;
}
if (!runOutcomesEqual(entry.outcome, params.outcome)) {
entry.outcome = params.outcome;
mutated = true;
}
if (entry.endedReason !== params.reason) {
entry.endedReason = params.reason;
mutated = true;
}
if (mutated) {
persistSubagentRuns();
}
const suppressedForSteerRestart = suppressAnnounceForSteerRestart(entry);
const shouldEmitEndedHook =
!suppressedForSteerRestart &&
shouldEmitEndedHookForRun({
entry,
reason: params.reason,
});
const shouldDeferEndedHook =
shouldEmitEndedHook &&
params.triggerCleanup &&
entry.expectsCompletionMessage === true &&
!suppressedForSteerRestart;
if (!shouldDeferEndedHook && shouldEmitEndedHook) {
await emitSubagentEndedHookForRun({
entry,
reason: params.reason,
sendFarewell: params.sendFarewell,
accountId: params.accountId,
});
}
if (!params.triggerCleanup) {
return;
}
if (suppressedForSteerRestart) {
return;
}
startSubagentAnnounceCleanupFlow(params.runId, entry);
}
function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecord): boolean {
if (!beginSubagentCleanup(runId)) {
return false;
}
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
void runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey,
childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey,
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
cleanup: entry.cleanup,
waitForCompletion: false,
startedAt: entry.startedAt,
endedAt: entry.endedAt,
label: entry.label,
outcome: entry.outcome,
spawnMode: entry.spawnMode,
expectsCompletionMessage: entry.expectsCompletionMessage,
})
.then((didAnnounce) => {
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
})
.catch((error) => {
defaultRuntime.log(
`[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`,
);
void finalizeSubagentCleanup(runId, entry.cleanup, false);
});
return true;
}
function resumeSubagentRun(runId: string) {
if (!runId || resumedRuns.has(runId)) {
return;
}
const entry = subagentRuns.get(runId);
if (!entry) {
return;
}
const orphanReason = resolveSubagentRunOrphanReason({ entry });
if (orphanReason) {
if (
reconcileOrphanedRun({
runId,
entry,
reason: orphanReason,
source: "resume",
})
) {
persistSubagentRuns();
}
return;
}
if (entry.cleanupCompletedAt) {
return;
}
// Skip entries that have exhausted their retry budget or expired (#18264).
if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) {
logAnnounceGiveUp(entry, "retry-limit");
entry.cleanupCompletedAt = Date.now();
persistSubagentRuns();
return;
}
if (typeof entry.endedAt === "number" && Date.now() - entry.endedAt > ANNOUNCE_EXPIRY_MS) {
logAnnounceGiveUp(entry, "expiry");
entry.cleanupCompletedAt = Date.now();
persistSubagentRuns();
return;
}
const now = Date.now();
const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0);
const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs;
if (
entry.expectsCompletionMessage === true &&
entry.lastAnnounceRetryAt &&
now < earliestRetryAt
) {
const waitMs = Math.max(1, earliestRetryAt - now);
setTimeout(() => {
resumeSubagentRun(runId);
}, waitMs).unref?.();
resumedRuns.add(runId);
return;
}
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
if (suppressAnnounceForSteerRestart(entry)) {
resumedRuns.add(runId);
return;
}
if (!startSubagentAnnounceCleanupFlow(runId, entry)) {
return;
}
resumedRuns.add(runId);
return;
}
// Wait for completion again after restart.
const cfg = loadConfig();
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds);
void waitForSubagentCompletion(runId, waitTimeoutMs);
resumedRuns.add(runId);
}
function restoreSubagentRunsOnce() {
if (restoreAttempted) {
return;
}
restoreAttempted = true;
try {
const restoredCount = restoreSubagentRunsFromDisk({
runs: subagentRuns,
mergeOnly: true,
});
if (restoredCount === 0) {
return;
}
if (reconcileOrphanedRestoredRuns()) {
persistSubagentRuns();
}
if (subagentRuns.size === 0) {
return;
}
// Resume pending work.
ensureListener();
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
startSweeper();
}
for (const runId of subagentRuns.keys()) {
resumeSubagentRun(runId);
}
} catch {
// ignore restore failures
}
}
function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
const config = cfg ?? loadConfig();
const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60;
if (!Number.isFinite(minutes) || minutes <= 0) {
return undefined;
}
return Math.max(1, Math.floor(minutes)) * 60_000;
}
function resolveSubagentWaitTimeoutMs(
cfg: ReturnType<typeof loadConfig>,
runTimeoutSeconds?: number,
) {
return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds ?? 0 });
}
function startSweeper() {
if (sweeper) {
return;
}
sweeper = setInterval(() => {
void sweepSubagentRuns();
}, 60_000);
sweeper.unref?.();
}
function stopSweeper() {
if (!sweeper) {
return;
}
clearInterval(sweeper);
sweeper = null;
}
async function sweepSubagentRuns() {
const now = Date.now();
let mutated = false;
for (const [runId, entry] of subagentRuns.entries()) {
if (!entry.archiveAtMs || entry.archiveAtMs > now) {
continue;
}
clearPendingLifecycleError(runId);
subagentRuns.delete(runId);
mutated = true;
// Archive/purge is terminal for the run record; remove any retained attachments too.
await safeRemoveAttachmentsDir(entry);
try {
await callGateway({
method: "sessions.delete",
params: {
key: entry.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {
// ignore
}
}
if (mutated) {
persistSubagentRuns();
}
if (subagentRuns.size === 0) {
stopSweeper();
}
}
function ensureListener() {
if (listenerStarted) {
return;
}
listenerStarted = true;
listenerStop = onAgentEvent((evt) => {
void (async () => {
if (!evt || evt.stream !== "lifecycle") {
return;
}
const entry = subagentRuns.get(evt.runId);
if (!entry) {
return;
}
const phase = evt.data?.phase;
if (phase === "start") {
clearPendingLifecycleError(evt.runId);
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
if (startedAt) {
entry.startedAt = startedAt;
persistSubagentRuns();
}
return;
}
if (phase !== "end" && phase !== "error") {
return;
}
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : Date.now();
const error = typeof evt.data?.error === "string" ? evt.data.error : undefined;
if (phase === "error") {
schedulePendingLifecycleError({
runId: evt.runId,
endedAt,
error,
});
return;
}
clearPendingLifecycleError(evt.runId);
const outcome: SubagentRunOutcome = evt.data?.aborted
? { status: "timeout" }
: { status: "ok" };
await completeSubagentRun({
runId: evt.runId,
endedAt,
outcome,
reason: SUBAGENT_ENDED_REASON_COMPLETE,
sendFarewell: true,
accountId: entry.requesterOrigin?.accountId,
triggerCleanup: true,
});
})();
});
}
async function safeRemoveAttachmentsDir(entry: SubagentRunRecord): Promise<void> {
if (!entry.attachmentsDir || !entry.attachmentsRootDir) {
return;
}
const resolveReal = async (targetPath: string): Promise<string | null> => {
try {
return await fs.realpath(targetPath);
} catch (err) {
if ((err as NodeJS.ErrnoException | undefined)?.code === "ENOENT") {
return null;
}
throw err;
}
};
try {
const [rootReal, dirReal] = await Promise.all([
resolveReal(entry.attachmentsRootDir),
resolveReal(entry.attachmentsDir),
]);
if (!dirReal) {
return;
}
const rootBase = rootReal ?? path.resolve(entry.attachmentsRootDir);
// dirReal is guaranteed non-null here (early return above handles null case).
const dirBase = dirReal;
const rootWithSep = rootBase.endsWith(path.sep) ? rootBase : `${rootBase}${path.sep}`;
if (!dirBase.startsWith(rootWithSep)) {
return;
}
await fs.rm(dirBase, { recursive: true, force: true });
} catch {
// best effort
}
}
async function finalizeSubagentCleanup(
runId: string,
cleanup: "delete" | "keep",
didAnnounce: boolean,
) {
const entry = subagentRuns.get(runId);
if (!entry) {
return;
}
if (didAnnounce) {
const completionReason = resolveCleanupCompletionReason(entry);
await emitCompletionEndedHookIfNeeded(entry, completionReason);
// Clean up attachments before the run record is removed.
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
if (shouldDeleteAttachments) {
await safeRemoveAttachmentsDir(entry);
}
completeCleanupBookkeeping({
runId,
entry,
cleanup,
completedAt: Date.now(),
});
return;
}
const now = Date.now();
const deferredDecision = resolveDeferredCleanupDecision({
entry,
now,
activeDescendantRuns: Math.max(0, countActiveDescendantRuns(entry.childSessionKey)),
announceExpiryMs: ANNOUNCE_EXPIRY_MS,
maxAnnounceRetryCount: MAX_ANNOUNCE_RETRY_COUNT,
deferDescendantDelayMs: MIN_ANNOUNCE_RETRY_DELAY_MS,
resolveAnnounceRetryDelayMs,
});
if (deferredDecision.kind === "defer-descendants") {
entry.lastAnnounceRetryAt = now;
entry.cleanupHandled = false;
resumedRuns.delete(runId);
persistSubagentRuns();
setTimeout(() => {
resumeSubagentRun(runId);
}, deferredDecision.delayMs).unref?.();
return;
}
if (deferredDecision.retryCount != null) {
entry.announceRetryCount = deferredDecision.retryCount;
entry.lastAnnounceRetryAt = now;
}
if (deferredDecision.kind === "give-up") {
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
if (shouldDeleteAttachments) {
await safeRemoveAttachmentsDir(entry);
}
const completionReason = resolveCleanupCompletionReason(entry);
await emitCompletionEndedHookIfNeeded(entry, completionReason);
logAnnounceGiveUp(entry, deferredDecision.reason);
completeCleanupBookkeeping({
runId,
entry,
cleanup: "keep",
completedAt: now,
});
return;
}
// Allow retry on the next wake if announce was deferred or failed.
// Applies to both keep/delete cleanup modes so delete-runs are only removed
// after a successful announce (or terminal give-up).
entry.cleanupHandled = false;
resumedRuns.delete(runId);
persistSubagentRuns();
if (deferredDecision.resumeDelayMs == null) {
return;
}
setTimeout(() => {
resumeSubagentRun(runId);
}, deferredDecision.resumeDelayMs).unref?.();
}
async function emitCompletionEndedHookIfNeeded(
entry: SubagentRunRecord,
reason: SubagentLifecycleEndedReason,
) {
if (
entry.expectsCompletionMessage === true &&
shouldEmitEndedHookForRun({
entry,
reason,
})
) {
await emitSubagentEndedHookForRun({
entry,
reason,
sendFarewell: true,
});
}
}
function completeCleanupBookkeeping(params: {
runId: string;
entry: SubagentRunRecord;
cleanup: "delete" | "keep";
completedAt: number;
}) {
if (params.cleanup === "delete") {
clearPendingLifecycleError(params.runId);
subagentRuns.delete(params.runId);
persistSubagentRuns();
retryDeferredCompletedAnnounces(params.runId);
return;
}
params.entry.cleanupCompletedAt = params.completedAt;
persistSubagentRuns();
retryDeferredCompletedAnnounces(params.runId);
}
function retryDeferredCompletedAnnounces(excludeRunId?: string) {
const now = Date.now();
for (const [runId, entry] of subagentRuns.entries()) {
if (excludeRunId && runId === excludeRunId) {
continue;
}
if (typeof entry.endedAt !== "number") {
continue;
}
if (entry.cleanupCompletedAt || entry.cleanupHandled) {
continue;
}
if (suppressAnnounceForSteerRestart(entry)) {
continue;
}
// Force-expire announces that have been pending too long (#18264).
const endedAgo = now - (entry.endedAt ?? now);
if (endedAgo > ANNOUNCE_EXPIRY_MS) {
logAnnounceGiveUp(entry, "expiry");
entry.cleanupCompletedAt = now;
persistSubagentRuns();
continue;
}
resumedRuns.delete(runId);
resumeSubagentRun(runId);
}
}
function beginSubagentCleanup(runId: string) {
const entry = subagentRuns.get(runId);
if (!entry) {
return false;
}
if (entry.cleanupCompletedAt) {
return false;
}
if (entry.cleanupHandled) {
return false;
}
entry.cleanupHandled = true;
persistSubagentRuns();
return true;
}
export function markSubagentRunForSteerRestart(runId: string) {
const key = runId.trim();
if (!key) {
return false;
}
const entry = subagentRuns.get(key);
if (!entry) {
return false;
}
if (entry.suppressAnnounceReason === "steer-restart") {
return true;
}
entry.suppressAnnounceReason = "steer-restart";
persistSubagentRuns();
return true;
}
export function clearSubagentRunSteerRestart(runId: string) {
const key = runId.trim();
if (!key) {
return false;
}
const entry = subagentRuns.get(key);
if (!entry) {
return false;
}
if (entry.suppressAnnounceReason !== "steer-restart") {
return true;
}
entry.suppressAnnounceReason = undefined;
persistSubagentRuns();
// If the interrupted run already finished while suppression was active, retry
// cleanup now so completion output is not lost when restart dispatch fails.
resumedRuns.delete(key);
if (typeof entry.endedAt === "number" && !entry.cleanupCompletedAt) {
resumeSubagentRun(key);
}
return true;
}
export function replaceSubagentRunAfterSteer(params: {
previousRunId: string;
nextRunId: string;
fallback?: SubagentRunRecord;
runTimeoutSeconds?: number;
}) {
const previousRunId = params.previousRunId.trim();
const nextRunId = params.nextRunId.trim();
if (!previousRunId || !nextRunId) {
return false;
}
const previous = subagentRuns.get(previousRunId);
const source = previous ?? params.fallback;
if (!source) {
return false;
}
if (previousRunId !== nextRunId) {
clearPendingLifecycleError(previousRunId);
subagentRuns.delete(previousRunId);
resumedRuns.delete(previousRunId);
}
const now = Date.now();
const cfg = loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const spawnMode = source.spawnMode === "session" ? "session" : "run";
const archiveAtMs =
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
const runTimeoutSeconds = params.runTimeoutSeconds ?? source.runTimeoutSeconds ?? 0;
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
const next: SubagentRunRecord = {
...source,
runId: nextRunId,
startedAt: now,
endedAt: undefined,
endedReason: undefined,
endedHookEmittedAt: undefined,
outcome: undefined,
cleanupCompletedAt: undefined,
cleanupHandled: false,
suppressAnnounceReason: undefined,
announceRetryCount: undefined,
lastAnnounceRetryAt: undefined,
spawnMode,
archiveAtMs,
runTimeoutSeconds,
};
subagentRuns.set(nextRunId, next);
ensureListener();
persistSubagentRuns();
if (archiveAtMs) {
startSweeper();
}
void waitForSubagentCompletion(nextRunId, waitTimeoutMs);
return true;
}
export function registerSubagentRun(params: {
runId: string;
childSessionKey: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
requesterDisplayKey: string;
task: string;
cleanup: "delete" | "keep";
label?: string;
model?: string;
runTimeoutSeconds?: number;
expectsCompletionMessage?: boolean;
spawnMode?: "run" | "session";
attachmentsDir?: string;
attachmentsRootDir?: string;
retainAttachmentsOnKeep?: boolean;
}) {
const now = Date.now();
const cfg = loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const spawnMode = params.spawnMode === "session" ? "session" : "run";
const archiveAtMs =
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
const runTimeoutSeconds = params.runTimeoutSeconds ?? 0;
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
subagentRuns.set(params.runId, {
runId: params.runId,
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin,
requesterDisplayKey: params.requesterDisplayKey,
task: params.task,
cleanup: params.cleanup,
expectsCompletionMessage: params.expectsCompletionMessage,
spawnMode,
label: params.label,
model: params.model,
runTimeoutSeconds,
createdAt: now,
startedAt: now,
archiveAtMs,
cleanupHandled: false,
attachmentsDir: params.attachmentsDir,
attachmentsRootDir: params.attachmentsRootDir,
retainAttachmentsOnKeep: params.retainAttachmentsOnKeep,
});
ensureListener();
persistSubagentRuns();
if (archiveAtMs) {
startSweeper();
}
// Wait for subagent completion via gateway RPC (cross-process).
// The in-process lifecycle listener is a fallback for embedded runs.
void waitForSubagentCompletion(params.runId, waitTimeoutMs);
}
async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
try {
const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs));
const wait = await callGateway<{
status?: string;
startedAt?: number;
endedAt?: number;
error?: string;
}>({
method: "agent.wait",
params: {
runId,
timeoutMs,
},
timeoutMs: timeoutMs + 10_000,
});
if (wait?.status !== "ok" && wait?.status !== "error" && wait?.status !== "timeout") {
return;
}
const entry = subagentRuns.get(runId);
if (!entry) {
return;
}
let mutated = false;
if (typeof wait.startedAt === "number") {
entry.startedAt = wait.startedAt;
mutated = true;
}
if (typeof wait.endedAt === "number") {
entry.endedAt = wait.endedAt;
mutated = true;
}
if (!entry.endedAt) {
entry.endedAt = Date.now();
mutated = true;
}
const waitError = typeof wait.error === "string" ? wait.error : undefined;
const outcome: SubagentRunOutcome =
wait.status === "error"
? { status: "error", error: waitError }
: wait.status === "timeout"
? { status: "timeout" }
: { status: "ok" };
if (!runOutcomesEqual(entry.outcome, outcome)) {
entry.outcome = outcome;
mutated = true;
}
if (mutated) {
persistSubagentRuns();
}
await completeSubagentRun({
runId,
endedAt: entry.endedAt,
outcome,
reason:
wait.status === "error" ? SUBAGENT_ENDED_REASON_ERROR : SUBAGENT_ENDED_REASON_COMPLETE,
sendFarewell: true,
accountId: entry.requesterOrigin?.accountId,
triggerCleanup: true,
});
} catch {
// ignore
}
}
export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
subagentRuns.clear();
resumedRuns.clear();
endedHookInFlightRunIds.clear();
clearAllPendingLifecycleErrors();
resetAnnounceQueuesForTests();
stopSweeper();
restoreAttempted = false;
if (listenerStop) {
listenerStop();
listenerStop = null;
}
listenerStarted = false;
if (opts?.persist !== false) {
persistSubagentRuns();
}
}
export function addSubagentRunForTests(entry: SubagentRunRecord) {
subagentRuns.set(entry.runId, entry);
}
export function releaseSubagentRun(runId: string) {
clearPendingLifecycleError(runId);
const didDelete = subagentRuns.delete(runId);
if (didDelete) {
persistSubagentRuns();
}
if (subagentRuns.size === 0) {
stopSweeper();
}
}
function findRunIdsByChildSessionKey(childSessionKey: string): string[] {
return findRunIdsByChildSessionKeyFromRuns(subagentRuns, childSessionKey);
}
export function resolveRequesterForChildSession(childSessionKey: string): {
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
} | null {
const resolved = resolveRequesterForChildSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
childSessionKey,
);
if (!resolved) {
return null;
}
return {
requesterSessionKey: resolved.requesterSessionKey,
requesterOrigin: normalizeDeliveryContext(resolved.requesterOrigin),
};
}
export function isSubagentSessionRunActive(childSessionKey: string): boolean {
const runIds = findRunIdsByChildSessionKey(childSessionKey);
for (const runId of runIds) {
const entry = subagentRuns.get(runId);
if (!entry) {
continue;
}
if (typeof entry.endedAt !== "number") {
return true;
}
}
return false;
}
export function markSubagentRunTerminated(params: {
runId?: string;
childSessionKey?: string;
reason?: string;
}): number {
const runIds = new Set<string>();
if (typeof params.runId === "string" && params.runId.trim()) {
runIds.add(params.runId.trim());
}
if (typeof params.childSessionKey === "string" && params.childSessionKey.trim()) {
for (const runId of findRunIdsByChildSessionKey(params.childSessionKey)) {
runIds.add(runId);
}
}
if (runIds.size === 0) {
return 0;
}
const now = Date.now();
const reason = params.reason?.trim() || "killed";
let updated = 0;
const entriesByChildSessionKey = new Map<string, SubagentRunRecord>();
for (const runId of runIds) {
clearPendingLifecycleError(runId);
const entry = subagentRuns.get(runId);
if (!entry) {
continue;
}
if (typeof entry.endedAt === "number") {
continue;
}
entry.endedAt = now;
entry.outcome = { status: "error", error: reason };
entry.endedReason = SUBAGENT_ENDED_REASON_KILLED;
entry.cleanupHandled = true;
entry.cleanupCompletedAt = now;
entry.suppressAnnounceReason = "killed";
if (!entriesByChildSessionKey.has(entry.childSessionKey)) {
entriesByChildSessionKey.set(entry.childSessionKey, entry);
}
updated += 1;
}
if (updated > 0) {
persistSubagentRuns();
for (const entry of entriesByChildSessionKey.values()) {
void emitSubagentEndedHookOnce({
entry,
reason: SUBAGENT_ENDED_REASON_KILLED,
sendFarewell: true,
outcome: SUBAGENT_ENDED_OUTCOME_KILLED,
error: reason,
inFlightRunIds: endedHookInFlightRunIds,
persist: persistSubagentRuns,
}).catch(() => {
// Hook failures should not break termination flow.
});
}
}
return updated;
}
export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] {
return listRunsForRequesterFromRuns(subagentRuns, requesterSessionKey);
}
export function countActiveRunsForSession(requesterSessionKey: string): number {
return countActiveRunsForSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
requesterSessionKey,
);
}
export function countActiveDescendantRuns(rootSessionKey: string): number {
return countActiveDescendantRunsFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
export function listDescendantRunsForRequester(rootSessionKey: string): SubagentRunRecord[] {
return listDescendantRunsForRequesterFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
export function initSubagentRegistry() {
restoreSubagentRunsOnce();
}