fix(acp): implicit streamToParent for mode=run without thread (#42404)
* fix(acp): implicit streamToParent for mode=run without thread When spawning ACP sessions with mode=run and no thread binding, automatically route output to parent session instead of Discord. This enables agent-to-agent supervision patterns where the spawning agent wants results returned programmatically, not posted as chat. The change makes sessions_spawn with runtime=acp and thread=false behave like direct acpx invocation - output goes to the spawning session, not to Discord. Fixes the issue where mode=run without thread still posted to Discord because hasDeliveryTarget was true when called from a Discord context. * fix: use resolved spawnMode instead of params.mode Move implicit streamToParent check to after resolveSpawnMode so that both explicit mode="run" and omitted mode (which defaults to "run" when thread is false) correctly trigger parent routing. This fixes the issue where callers that rely on default mode selection would not get the intended parent streaming behavior. * fix: tighten implicit ACP parent relay gating (#42404) (thanks @davidguttman) --------- Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
@@ -75,6 +75,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Secrets/SecretRef: reject exec SecretRef traversal ids across schema, runtime, and gateway. (#42370) Thanks @joshavant.
|
||||
- Telegram/docs: clarify that `channels.telegram.groups` allowlists chats while `groupAllowFrom` allowlists users inside those chats, and point invalid negative chat IDs at the right config key. (#42451) Thanks @altaywtf.
|
||||
- Models/Alibaba Cloud Model Studio: wire `MODELSTUDIO_API_KEY` through shared env auth, implicit provider discovery, and shell-env fallback so onboarding works outside the wizard too. (#40634) Thanks @pomelo-nwu.
|
||||
- ACP/sessions_spawn: implicitly stream `mode="run"` ACP spawns to parent only for eligible subagent orchestrator sessions (heartbeat `target: "last"` with a usable session-local route), restoring parent progress relays without thread binding. (#42404) Thanks @davidguttman.
|
||||
|
||||
## 2026.3.8
|
||||
|
||||
|
||||
@@ -180,7 +180,9 @@ export function startAcpSpawnParentStreamRelay(params: {
|
||||
};
|
||||
const wake = () => {
|
||||
requestHeartbeatNow(
|
||||
scopedHeartbeatWakeOptions(parentSessionKey, { reason: "acp:spawn:stream" }),
|
||||
scopedHeartbeatWakeOptions(parentSessionKey, {
|
||||
reason: "acp:spawn:stream",
|
||||
}),
|
||||
);
|
||||
};
|
||||
const emit = (text: string, contextKey: string) => {
|
||||
|
||||
@@ -38,6 +38,7 @@ const hoisted = vi.hoisted(() => {
|
||||
const loadSessionStoreMock = vi.fn();
|
||||
const resolveStorePathMock = vi.fn();
|
||||
const resolveSessionTranscriptFileMock = vi.fn();
|
||||
const areHeartbeatsEnabledMock = vi.fn();
|
||||
const state = {
|
||||
cfg: createDefaultSpawnConfig(),
|
||||
};
|
||||
@@ -55,6 +56,7 @@ const hoisted = vi.hoisted(() => {
|
||||
loadSessionStoreMock,
|
||||
resolveStorePathMock,
|
||||
resolveSessionTranscriptFileMock,
|
||||
areHeartbeatsEnabledMock,
|
||||
state,
|
||||
};
|
||||
});
|
||||
@@ -128,6 +130,14 @@ vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) =
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../infra/heartbeat-wake.js")>();
|
||||
return {
|
||||
...actual,
|
||||
areHeartbeatsEnabled: () => hoisted.areHeartbeatsEnabledMock(),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./acp-spawn-parent-stream.js", () => ({
|
||||
startAcpSpawnParentStreamRelay: (...args: unknown[]) =>
|
||||
hoisted.startAcpSpawnParentStreamRelayMock(...args),
|
||||
@@ -192,6 +202,7 @@ function expectResolvedIntroTextInBindMetadata(): void {
|
||||
describe("spawnAcpDirect", () => {
|
||||
beforeEach(() => {
|
||||
hoisted.state.cfg = createDefaultSpawnConfig();
|
||||
hoisted.areHeartbeatsEnabledMock.mockReset().mockReturnValue(true);
|
||||
|
||||
hoisted.callGatewayMock.mockReset().mockImplementation(async (argsUnknown: unknown) => {
|
||||
const args = argsUnknown as { method?: string };
|
||||
@@ -393,6 +404,8 @@ describe("spawnAcpDirect", () => {
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
expect(hoisted.resolveSessionTranscriptFileMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionId: "sess-123",
|
||||
@@ -633,6 +646,290 @@ describe("spawnAcpDirect", () => {
|
||||
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("implicitly streams mode=run ACP spawns for subagent requester sessions", async () => {
|
||||
hoisted.state.cfg = {
|
||||
...hoisted.state.cfg,
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: {
|
||||
every: "30m",
|
||||
target: "last",
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const firstHandle = createRelayHandle();
|
||||
const secondHandle = createRelayHandle();
|
||||
hoisted.startAcpSpawnParentStreamRelayMock
|
||||
.mockReset()
|
||||
.mockReturnValueOnce(firstHandle)
|
||||
.mockReturnValueOnce(secondHandle);
|
||||
hoisted.loadSessionStoreMock.mockReset().mockImplementation(() => {
|
||||
const store: Record<
|
||||
string,
|
||||
{ sessionId: string; updatedAt: number; deliveryContext?: unknown }
|
||||
> = {
|
||||
"agent:main:subagent:parent": {
|
||||
sessionId: "parent-sess-1",
|
||||
updatedAt: Date.now(),
|
||||
deliveryContext: {
|
||||
channel: "discord",
|
||||
to: "channel:parent-channel",
|
||||
accountId: "default",
|
||||
},
|
||||
},
|
||||
};
|
||||
return new Proxy(store, {
|
||||
get(target, prop) {
|
||||
if (typeof prop === "string" && prop.startsWith("agent:codex:acp:")) {
|
||||
return { sessionId: "sess-123", updatedAt: Date.now() };
|
||||
}
|
||||
return target[prop as keyof typeof target];
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:subagent:parent",
|
||||
agentChannel: "discord",
|
||||
agentAccountId: "default",
|
||||
agentTo: "channel:parent-channel",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl");
|
||||
const agentCall = hoisted.callGatewayMock.mock.calls
|
||||
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
||||
.find((request) => request.method === "agent");
|
||||
expect(agentCall?.params?.deliver).toBe(false);
|
||||
expect(agentCall?.params?.channel).toBeUndefined();
|
||||
expect(agentCall?.params?.to).toBeUndefined();
|
||||
expect(agentCall?.params?.threadId).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
parentSessionKey: "agent:main:subagent:parent",
|
||||
agentId: "codex",
|
||||
logPath: "/tmp/sess-main.acp-stream.jsonl",
|
||||
emitStartNotice: false,
|
||||
}),
|
||||
);
|
||||
expect(firstHandle.dispose).toHaveBeenCalledTimes(1);
|
||||
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not implicitly stream when heartbeat target is not session-local", async () => {
|
||||
hoisted.state.cfg = {
|
||||
...hoisted.state.cfg,
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: {
|
||||
every: "30m",
|
||||
target: "discord",
|
||||
to: "channel:ops-room",
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:subagent:fixed-target",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream when session scope is global", async () => {
|
||||
hoisted.state.cfg = {
|
||||
...hoisted.state.cfg,
|
||||
session: {
|
||||
...hoisted.state.cfg.session,
|
||||
scope: "global",
|
||||
},
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: {
|
||||
every: "30m",
|
||||
target: "last",
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:subagent:global-scope",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream for subagent requester sessions when heartbeat is disabled", async () => {
|
||||
hoisted.state.cfg = {
|
||||
...hoisted.state.cfg,
|
||||
agents: {
|
||||
list: [{ id: "main", heartbeat: { every: "30m" } }, { id: "research" }],
|
||||
},
|
||||
};
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:research:subagent:orchestrator",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream for subagent requester sessions when heartbeat cadence is invalid", async () => {
|
||||
hoisted.state.cfg = {
|
||||
...hoisted.state.cfg,
|
||||
agents: {
|
||||
list: [
|
||||
{
|
||||
id: "research",
|
||||
heartbeat: { every: "0m" },
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:research:subagent:invalid-heartbeat",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream when heartbeats are runtime-disabled", async () => {
|
||||
hoisted.areHeartbeatsEnabledMock.mockReturnValue(false);
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:subagent:runtime-disabled",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream for legacy subagent requester session keys", async () => {
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "subagent:legacy-worker",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream for subagent requester sessions with thread context", async () => {
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:subagent:thread-context",
|
||||
agentChannel: "discord",
|
||||
agentAccountId: "default",
|
||||
agentTo: "channel:parent-channel",
|
||||
agentThreadId: "requester-thread",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not implicitly stream for thread-bound subagent requester sessions", async () => {
|
||||
hoisted.sessionBindingListBySessionMock.mockImplementation((targetSessionKey: string) => {
|
||||
if (targetSessionKey === "agent:main:subagent:thread-bound") {
|
||||
return [
|
||||
createSessionBinding({
|
||||
targetSessionKey,
|
||||
targetKind: "subagent",
|
||||
status: "active",
|
||||
}),
|
||||
];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
const result = await spawnAcpDirect(
|
||||
{
|
||||
task: "Investigate flaky tests",
|
||||
agentId: "codex",
|
||||
},
|
||||
{
|
||||
agentSessionKey: "agent:main:subagent:thread-bound",
|
||||
agentChannel: "discord",
|
||||
agentAccountId: "default",
|
||||
agentTo: "channel:parent-channel",
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe("accepted");
|
||||
expect(result.mode).toBe("run");
|
||||
expect(result.streamLogPath).toBeUndefined();
|
||||
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("announces parent relay start only after successful child dispatch", async () => {
|
||||
const firstHandle = createRelayHandle();
|
||||
const secondHandle = createRelayHandle();
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
resolveAcpThreadSessionDetailLines,
|
||||
} from "../acp/runtime/session-identifiers.js";
|
||||
import type { AcpRuntimeSessionMode } from "../acp/runtime/types.js";
|
||||
import { DEFAULT_HEARTBEAT_EVERY } from "../auto-reply/heartbeat.js";
|
||||
import {
|
||||
resolveThreadBindingIntroText,
|
||||
resolveThreadBindingThreadName,
|
||||
@@ -21,11 +22,13 @@ import {
|
||||
resolveThreadBindingMaxAgeMsForChannel,
|
||||
resolveThreadBindingSpawnPolicy,
|
||||
} from "../channels/thread-bindings-policy.js";
|
||||
import { parseDurationMs } from "../cli/parse-duration.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js";
|
||||
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js";
|
||||
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
|
||||
import {
|
||||
getSessionBindingService,
|
||||
@@ -33,13 +36,18 @@ import {
|
||||
type SessionBindingRecord,
|
||||
} from "../infra/outbound/session-binding-service.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { normalizeAgentId } from "../routing/session-key.js";
|
||||
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import {
|
||||
isSubagentSessionKey,
|
||||
normalizeAgentId,
|
||||
parseAgentSessionKey,
|
||||
} from "../routing/session-key.js";
|
||||
import { deliveryContextFromSession, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import {
|
||||
type AcpSpawnParentRelayHandle,
|
||||
resolveAcpSpawnStreamLogPath,
|
||||
startAcpSpawnParentStreamRelay,
|
||||
} from "./acp-spawn-parent-stream.js";
|
||||
import { resolveAgentConfig, resolveDefaultAgentId } from "./agent-scope.js";
|
||||
import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js";
|
||||
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js";
|
||||
|
||||
@@ -130,6 +138,95 @@ function resolveAcpSessionMode(mode: SpawnAcpMode): AcpRuntimeSessionMode {
|
||||
return mode === "session" ? "persistent" : "oneshot";
|
||||
}
|
||||
|
||||
function isHeartbeatEnabledForSessionAgent(params: {
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey?: string;
|
||||
}): boolean {
|
||||
if (!areHeartbeatsEnabled()) {
|
||||
return false;
|
||||
}
|
||||
const requesterAgentId = parseAgentSessionKey(params.sessionKey)?.agentId;
|
||||
if (!requesterAgentId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const agentEntries = params.cfg.agents?.list ?? [];
|
||||
const hasExplicitHeartbeatAgents = agentEntries.some((entry) => Boolean(entry?.heartbeat));
|
||||
const enabledByPolicy = hasExplicitHeartbeatAgents
|
||||
? agentEntries.some(
|
||||
(entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === requesterAgentId,
|
||||
)
|
||||
: requesterAgentId === resolveDefaultAgentId(params.cfg);
|
||||
if (!enabledByPolicy) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const heartbeatEvery =
|
||||
resolveAgentConfig(params.cfg, requesterAgentId)?.heartbeat?.every ??
|
||||
params.cfg.agents?.defaults?.heartbeat?.every ??
|
||||
DEFAULT_HEARTBEAT_EVERY;
|
||||
const trimmedEvery = typeof heartbeatEvery === "string" ? heartbeatEvery.trim() : "";
|
||||
if (!trimmedEvery) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return parseDurationMs(trimmedEvery, { defaultUnit: "m" }) > 0;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveHeartbeatConfigForAgent(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentId: string;
|
||||
}): NonNullable<NonNullable<OpenClawConfig["agents"]>["defaults"]>["heartbeat"] {
|
||||
const defaults = params.cfg.agents?.defaults?.heartbeat;
|
||||
const overrides = resolveAgentConfig(params.cfg, params.agentId)?.heartbeat;
|
||||
if (!defaults && !overrides) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
...defaults,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function hasSessionLocalHeartbeatRelayRoute(params: {
|
||||
cfg: OpenClawConfig;
|
||||
parentSessionKey: string;
|
||||
requesterAgentId: string;
|
||||
}): boolean {
|
||||
const scope = params.cfg.session?.scope ?? "per-sender";
|
||||
if (scope === "global") {
|
||||
return false;
|
||||
}
|
||||
|
||||
const heartbeat = resolveHeartbeatConfigForAgent({
|
||||
cfg: params.cfg,
|
||||
agentId: params.requesterAgentId,
|
||||
});
|
||||
if ((heartbeat?.target ?? "none") !== "last") {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Explicit delivery overrides are not session-local and can route updates
|
||||
// to unrelated destinations (for example a pinned ops channel).
|
||||
if (typeof heartbeat?.to === "string" && heartbeat.to.trim().length > 0) {
|
||||
return false;
|
||||
}
|
||||
if (typeof heartbeat?.accountId === "string" && heartbeat.accountId.trim().length > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const storePath = resolveStorePath(params.cfg.session?.store, {
|
||||
agentId: params.requesterAgentId,
|
||||
});
|
||||
const sessionStore = loadSessionStore(storePath);
|
||||
const parentEntry = sessionStore[params.parentSessionKey];
|
||||
const parentDeliveryContext = deliveryContextFromSession(parentEntry);
|
||||
return Boolean(parentDeliveryContext?.channel && parentDeliveryContext.to);
|
||||
}
|
||||
|
||||
function resolveTargetAcpAgentId(params: {
|
||||
requestedAgentId?: string;
|
||||
cfg: OpenClawConfig;
|
||||
@@ -326,6 +423,8 @@ export async function spawnAcpDirect(
|
||||
error: 'sessions_spawn streamTo="parent" requires an active requester session context.',
|
||||
};
|
||||
}
|
||||
|
||||
const requestThreadBinding = params.thread === true;
|
||||
const runtimePolicyError = resolveAcpSpawnRuntimePolicyError({
|
||||
cfg,
|
||||
requesterSessionKey: ctx.agentSessionKey,
|
||||
@@ -339,7 +438,6 @@ export async function spawnAcpDirect(
|
||||
};
|
||||
}
|
||||
|
||||
const requestThreadBinding = params.thread === true;
|
||||
const spawnMode = resolveSpawnMode({
|
||||
requestedMode: params.mode,
|
||||
threadRequested: requestThreadBinding,
|
||||
@@ -351,6 +449,52 @@ export async function spawnAcpDirect(
|
||||
};
|
||||
}
|
||||
|
||||
const bindingService = getSessionBindingService();
|
||||
const requesterParsedSession = parseAgentSessionKey(parentSessionKey);
|
||||
const requesterIsSubagentSession =
|
||||
Boolean(requesterParsedSession) && isSubagentSessionKey(parentSessionKey);
|
||||
const requesterHasActiveSubagentBinding =
|
||||
requesterIsSubagentSession && parentSessionKey
|
||||
? bindingService
|
||||
.listBySession(parentSessionKey)
|
||||
.some((record) => record.targetKind === "subagent" && record.status !== "ended")
|
||||
: false;
|
||||
const requesterHasThreadContext =
|
||||
typeof ctx.agentThreadId === "string"
|
||||
? ctx.agentThreadId.trim().length > 0
|
||||
: ctx.agentThreadId != null;
|
||||
const requesterHeartbeatEnabled = isHeartbeatEnabledForSessionAgent({
|
||||
cfg,
|
||||
sessionKey: parentSessionKey,
|
||||
});
|
||||
const requesterAgentId = requesterParsedSession?.agentId;
|
||||
const requesterHeartbeatRelayRouteUsable =
|
||||
parentSessionKey && requesterAgentId
|
||||
? hasSessionLocalHeartbeatRelayRoute({
|
||||
cfg,
|
||||
parentSessionKey,
|
||||
requesterAgentId,
|
||||
})
|
||||
: false;
|
||||
|
||||
// For mode=run without thread binding, implicitly route output to parent
|
||||
// only for spawned subagent orchestrator sessions with heartbeat enabled
|
||||
// AND a session-local heartbeat delivery route (target=last + usable last route).
|
||||
// Skip requester sessions that are thread-bound (or carrying thread context)
|
||||
// so user-facing threads do not receive unsolicited ACP progress chatter
|
||||
// unless streamTo="parent" is explicitly requested. Use resolved spawnMode
|
||||
// (not params.mode) so default mode selection works.
|
||||
const implicitStreamToParent =
|
||||
!streamToParentRequested &&
|
||||
spawnMode === "run" &&
|
||||
!requestThreadBinding &&
|
||||
requesterIsSubagentSession &&
|
||||
!requesterHasActiveSubagentBinding &&
|
||||
!requesterHasThreadContext &&
|
||||
requesterHeartbeatEnabled &&
|
||||
requesterHeartbeatRelayRouteUsable;
|
||||
const effectiveStreamToParent = streamToParentRequested || implicitStreamToParent;
|
||||
|
||||
const targetAgentResult = resolveTargetAcpAgentId({
|
||||
requestedAgentId: params.agentId,
|
||||
cfg,
|
||||
@@ -392,7 +536,6 @@ export async function spawnAcpDirect(
|
||||
}
|
||||
|
||||
const acpManager = getAcpSessionManager();
|
||||
const bindingService = getSessionBindingService();
|
||||
let binding: SessionBindingRecord | null = null;
|
||||
let sessionCreated = false;
|
||||
let initializedRuntime: AcpSpawnRuntimeCloseHandle | undefined;
|
||||
@@ -530,17 +673,17 @@ export async function spawnAcpDirect(
|
||||
// Fresh one-shot ACP runs should bootstrap the worker first, then let higher layers
|
||||
// decide how to relay status. Inline delivery is reserved for thread-bound sessions.
|
||||
const useInlineDelivery =
|
||||
hasDeliveryTarget && spawnMode === "session" && !streamToParentRequested;
|
||||
hasDeliveryTarget && spawnMode === "session" && !effectiveStreamToParent;
|
||||
const childIdem = crypto.randomUUID();
|
||||
let childRunId: string = childIdem;
|
||||
const streamLogPath =
|
||||
streamToParentRequested && parentSessionKey
|
||||
effectiveStreamToParent && parentSessionKey
|
||||
? resolveAcpSpawnStreamLogPath({
|
||||
childSessionKey: sessionKey,
|
||||
})
|
||||
: undefined;
|
||||
let parentRelay: AcpSpawnParentRelayHandle | undefined;
|
||||
if (streamToParentRequested && parentSessionKey) {
|
||||
if (effectiveStreamToParent && parentSessionKey) {
|
||||
// Register relay before dispatch so fast lifecycle failures are not missed.
|
||||
parentRelay = startAcpSpawnParentStreamRelay({
|
||||
runId: childIdem,
|
||||
@@ -585,7 +728,7 @@ export async function spawnAcpDirect(
|
||||
};
|
||||
}
|
||||
|
||||
if (streamToParentRequested && parentSessionKey) {
|
||||
if (effectiveStreamToParent && parentSessionKey) {
|
||||
if (parentRelay && childRunId !== childIdem) {
|
||||
parentRelay.dispose();
|
||||
// Defensive fallback if gateway returns a runId that differs from idempotency key.
|
||||
|
||||
@@ -19,6 +19,7 @@ describe("heartbeat-reason", () => {
|
||||
expect(resolveHeartbeatReasonKind("manual")).toBe("manual");
|
||||
expect(resolveHeartbeatReasonKind("exec-event")).toBe("exec-event");
|
||||
expect(resolveHeartbeatReasonKind("wake")).toBe("wake");
|
||||
expect(resolveHeartbeatReasonKind("acp:spawn:stream")).toBe("wake");
|
||||
expect(resolveHeartbeatReasonKind("cron:job-1")).toBe("cron");
|
||||
expect(resolveHeartbeatReasonKind("hook:wake")).toBe("hook");
|
||||
expect(resolveHeartbeatReasonKind(" hook:wake ")).toBe("hook");
|
||||
@@ -35,6 +36,7 @@ describe("heartbeat-reason", () => {
|
||||
expect(isHeartbeatEventDrivenReason("exec-event")).toBe(true);
|
||||
expect(isHeartbeatEventDrivenReason("cron:job-1")).toBe(true);
|
||||
expect(isHeartbeatEventDrivenReason("wake")).toBe(true);
|
||||
expect(isHeartbeatEventDrivenReason("acp:spawn:stream")).toBe(true);
|
||||
expect(isHeartbeatEventDrivenReason("hook:gmail:sync")).toBe(true);
|
||||
expect(isHeartbeatEventDrivenReason("interval")).toBe(false);
|
||||
expect(isHeartbeatEventDrivenReason("manual")).toBe(false);
|
||||
|
||||
@@ -34,6 +34,9 @@ export function resolveHeartbeatReasonKind(reason?: string): HeartbeatReasonKind
|
||||
if (trimmed === "wake") {
|
||||
return "wake";
|
||||
}
|
||||
if (trimmed.startsWith("acp:spawn:")) {
|
||||
return "wake";
|
||||
}
|
||||
if (trimmed.startsWith("cron:")) {
|
||||
return "cron";
|
||||
}
|
||||
|
||||
@@ -38,7 +38,11 @@ import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { getQueueSize } from "../process/command-queue.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
|
||||
import {
|
||||
normalizeAgentId,
|
||||
parseAgentSessionKey,
|
||||
toAgentStoreSessionKey,
|
||||
} from "../routing/session-key.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { escapeRegExp } from "../utils.js";
|
||||
import { formatErrorMessage, hasErrnoCode } from "./errors.js";
|
||||
@@ -53,9 +57,11 @@ import { emitHeartbeatEvent, resolveIndicatorType } from "./heartbeat-events.js"
|
||||
import { resolveHeartbeatReasonKind } from "./heartbeat-reason.js";
|
||||
import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js";
|
||||
import {
|
||||
areHeartbeatsEnabled,
|
||||
type HeartbeatRunResult,
|
||||
type HeartbeatWakeHandler,
|
||||
requestHeartbeatNow,
|
||||
setHeartbeatsEnabled,
|
||||
setHeartbeatWakeHandler,
|
||||
} from "./heartbeat-wake.js";
|
||||
import type { OutboundSendDeps } from "./outbound/deliver.js";
|
||||
@@ -75,11 +81,8 @@ export type HeartbeatDeps = OutboundSendDeps &
|
||||
};
|
||||
|
||||
const log = createSubsystemLogger("gateway/heartbeat");
|
||||
let heartbeatsEnabled = true;
|
||||
|
||||
export function setHeartbeatsEnabled(enabled: boolean) {
|
||||
heartbeatsEnabled = enabled;
|
||||
}
|
||||
export { areHeartbeatsEnabled, setHeartbeatsEnabled };
|
||||
|
||||
type HeartbeatConfig = AgentDefaultsConfig["heartbeat"];
|
||||
type HeartbeatAgent = {
|
||||
@@ -611,9 +614,14 @@ export async function runHeartbeatOnce(opts: {
|
||||
deps?: HeartbeatDeps;
|
||||
}): Promise<HeartbeatRunResult> {
|
||||
const cfg = opts.cfg ?? loadConfig();
|
||||
const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg));
|
||||
const explicitAgentId = typeof opts.agentId === "string" ? opts.agentId.trim() : "";
|
||||
const forcedSessionAgentId =
|
||||
explicitAgentId.length > 0 ? undefined : parseAgentSessionKey(opts.sessionKey)?.agentId;
|
||||
const agentId = normalizeAgentId(
|
||||
explicitAgentId || forcedSessionAgentId || resolveDefaultAgentId(cfg),
|
||||
);
|
||||
const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId);
|
||||
if (!heartbeatsEnabled) {
|
||||
if (!areHeartbeatsEnabled()) {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
if (!isHeartbeatEnabledForAgent(cfg, agentId)) {
|
||||
@@ -1114,7 +1122,7 @@ export function startHeartbeatRunner(opts: {
|
||||
reason: "disabled",
|
||||
} satisfies HeartbeatRunResult;
|
||||
}
|
||||
if (!heartbeatsEnabled) {
|
||||
if (!areHeartbeatsEnabled()) {
|
||||
return {
|
||||
status: "skipped",
|
||||
reason: "disabled",
|
||||
|
||||
@@ -15,6 +15,16 @@ export type HeartbeatWakeHandler = (opts: {
|
||||
sessionKey?: string;
|
||||
}) => Promise<HeartbeatRunResult>;
|
||||
|
||||
let heartbeatsEnabled = true;
|
||||
|
||||
export function setHeartbeatsEnabled(enabled: boolean) {
|
||||
heartbeatsEnabled = enabled;
|
||||
}
|
||||
|
||||
export function areHeartbeatsEnabled(): boolean {
|
||||
return heartbeatsEnabled;
|
||||
}
|
||||
|
||||
type WakeTimerKind = "normal" | "retry";
|
||||
type PendingWakeReason = {
|
||||
reason: string;
|
||||
|
||||
Reference in New Issue
Block a user