diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 98f1e0e52..9282e679c 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -128,6 +128,16 @@ import { LogsTailParamsSchema, type LogsTailResult, LogsTailResultSchema, + type MeshPlanParams, + MeshPlanParamsSchema, + type MeshRetryParams, + MeshRetryParamsSchema, + type MeshRunParams, + MeshRunParamsSchema, + type MeshStatusParams, + MeshStatusParamsSchema, + type MeshWorkflowPlan, + MeshWorkflowPlanSchema, type ModelsListParams, ModelsListParamsSchema, type NodeDescribeParams, @@ -358,6 +368,10 @@ export const validateExecApprovalsNodeSetParams = ajv.compile(LogsTailParamsSchema); +export const validateMeshPlanParams = ajv.compile(MeshPlanParamsSchema); +export const validateMeshRunParams = ajv.compile(MeshRunParamsSchema); +export const validateMeshStatusParams = ajv.compile(MeshStatusParamsSchema); +export const validateMeshRetryParams = ajv.compile(MeshRetryParamsSchema); export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); export const validateChatAbortParams = ajv.compile(ChatAbortParamsSchema); @@ -417,6 +431,11 @@ export { StateVersionSchema, AgentEventSchema, ChatEventSchema, + MeshPlanParamsSchema, + MeshWorkflowPlanSchema, + MeshRunParamsSchema, + MeshStatusParamsSchema, + MeshRetryParamsSchema, SendParamsSchema, PollParamsSchema, AgentParamsSchema, @@ -516,6 +535,11 @@ export type { AgentIdentityResult, AgentWaitParams, ChatEvent, + MeshPlanParams, + MeshWorkflowPlan, + MeshRunParams, + MeshStatusParams, + MeshRetryParams, TickEvent, ShutdownEvent, WakeParams, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 614942008..6035c659f 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -8,6 +8,7 @@ export * from "./schema/exec-approvals.js"; export * from "./schema/devices.js"; export * from "./schema/frames.js"; export * from "./schema/logs-chat.js"; +export * from "./schema/mesh.js"; export * from "./schema/nodes.js"; export * from "./schema/protocol-schemas.js"; export * from "./schema/sessions.js"; diff --git a/src/gateway/protocol/schema/mesh.ts b/src/gateway/protocol/schema/mesh.ts new file mode 100644 index 000000000..1c296eb6e --- /dev/null +++ b/src/gateway/protocol/schema/mesh.ts @@ -0,0 +1,83 @@ +import { Type, type Static } from "@sinclair/typebox"; +import { NonEmptyString } from "./primitives.js"; + +export const MeshPlanStepSchema = Type.Object( + { + id: NonEmptyString, + name: Type.Optional(NonEmptyString), + prompt: NonEmptyString, + dependsOn: Type.Optional(Type.Array(NonEmptyString, { maxItems: 64 })), + agentId: Type.Optional(NonEmptyString), + sessionKey: Type.Optional(NonEmptyString), + thinking: Type.Optional(Type.String()), + timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })), + }, + { additionalProperties: false }, +); + +export const MeshWorkflowPlanSchema = Type.Object( + { + planId: NonEmptyString, + goal: NonEmptyString, + createdAt: Type.Integer({ minimum: 0 }), + steps: Type.Array(MeshPlanStepSchema, { minItems: 1, maxItems: 128 }), + }, + { additionalProperties: false }, +); + +export const MeshPlanParamsSchema = Type.Object( + { + goal: NonEmptyString, + steps: Type.Optional( + Type.Array( + Type.Object( + { + id: Type.Optional(NonEmptyString), + name: Type.Optional(NonEmptyString), + prompt: NonEmptyString, + dependsOn: Type.Optional(Type.Array(NonEmptyString, { maxItems: 64 })), + agentId: Type.Optional(NonEmptyString), + sessionKey: Type.Optional(NonEmptyString), + thinking: Type.Optional(Type.String()), + timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })), + }, + { additionalProperties: false }, + ), + { minItems: 1, maxItems: 128 }, + ), + ), + }, + { additionalProperties: false }, +); + +export const MeshRunParamsSchema = Type.Object( + { + plan: MeshWorkflowPlanSchema, + continueOnError: Type.Optional(Type.Boolean()), + maxParallel: Type.Optional(Type.Integer({ minimum: 1, maximum: 16 })), + defaultStepTimeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })), + lane: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + +export const MeshStatusParamsSchema = Type.Object( + { + runId: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const MeshRetryParamsSchema = Type.Object( + { + runId: NonEmptyString, + stepIds: Type.Optional(Type.Array(NonEmptyString, { minItems: 1, maxItems: 128 })), + }, + { additionalProperties: false }, +); + +export type MeshPlanParams = Static; +export type MeshWorkflowPlan = Static; +export type MeshRunParams = Static; +export type MeshStatusParams = Static; +export type MeshRetryParams = Static; diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 68670a3d7..23a8ecf35 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -103,6 +103,13 @@ import { LogsTailParamsSchema, LogsTailResultSchema, } from "./logs-chat.js"; +import { + MeshPlanParamsSchema, + MeshRetryParamsSchema, + MeshRunParamsSchema, + MeshStatusParamsSchema, + MeshWorkflowPlanSchema, +} from "./mesh.js"; import { NodeDescribeParamsSchema, NodeEventParamsSchema, @@ -254,6 +261,11 @@ export const ProtocolSchemas: Record = { ChatAbortParams: ChatAbortParamsSchema, ChatInjectParams: ChatInjectParamsSchema, ChatEvent: ChatEventSchema, + MeshPlanParams: MeshPlanParamsSchema, + MeshWorkflowPlan: MeshWorkflowPlanSchema, + MeshRunParams: MeshRunParamsSchema, + MeshStatusParams: MeshStatusParamsSchema, + MeshRetryParams: MeshRetryParamsSchema, UpdateRunParams: UpdateRunParamsSchema, TickEvent: TickEventSchema, ShutdownEvent: ShutdownEventSchema, diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index bb691f08e..eb571a06f 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -85,6 +85,10 @@ const BASE_METHODS = [ "agent", "agent.identity.get", "agent.wait", + "mesh.plan", + "mesh.run", + "mesh.status", + "mesh.retry", "browser.request", // WebChat WebSocket-native chat methods "chat.history", diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index e6086301c..0794a9ff9 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -12,6 +12,7 @@ import { deviceHandlers } from "./server-methods/devices.js"; import { execApprovalsHandlers } from "./server-methods/exec-approvals.js"; import { healthHandlers } from "./server-methods/health.js"; import { logsHandlers } from "./server-methods/logs.js"; +import { meshHandlers } from "./server-methods/mesh.js"; import { modelsHandlers } from "./server-methods/models.js"; import { nodeHandlers } from "./server-methods/nodes.js"; import { sendHandlers } from "./server-methods/send.js"; @@ -78,6 +79,8 @@ const READ_METHODS = new Set([ "chat.history", "config.get", "talk.config", + "mesh.plan", + "mesh.status", ]); const WRITE_METHODS = new Set([ "send", @@ -94,6 +97,8 @@ const WRITE_METHODS = new Set([ "chat.send", "chat.abort", "browser.request", + "mesh.run", + "mesh.retry", ]); function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) { @@ -171,6 +176,7 @@ function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["c export const coreGatewayHandlers: GatewayRequestHandlers = { ...connectHandlers, ...logsHandlers, + ...meshHandlers, ...voicewakeHandlers, ...healthHandlers, ...channelsHandlers, diff --git a/src/gateway/server-methods/mesh.test.ts b/src/gateway/server-methods/mesh.test.ts new file mode 100644 index 000000000..7441455e7 --- /dev/null +++ b/src/gateway/server-methods/mesh.test.ts @@ -0,0 +1,138 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { GatewayRequestContext } from "./types.js"; +import { __resetMeshRunsForTest, meshHandlers } from "./mesh.js"; + +const mocks = vi.hoisted(() => ({ + agent: vi.fn(), + agentWait: vi.fn(), +})); + +vi.mock("./agent.js", () => ({ + agentHandlers: { + agent: (...args: unknown[]) => mocks.agent(...args), + "agent.wait": (...args: unknown[]) => mocks.agentWait(...args), + }, +})); + +const makeContext = (): GatewayRequestContext => + ({ + dedupe: new Map(), + addChatRun: vi.fn(), + logGateway: { info: vi.fn(), error: vi.fn() }, + }) as unknown as GatewayRequestContext; + +async function callMesh(method: keyof typeof meshHandlers, params: Record) { + return await new Promise<{ ok: boolean; payload?: unknown; error?: unknown }>((resolve) => { + void meshHandlers[method]({ + req: { type: "req", id: `test-${method}`, method }, + params, + respond: (ok, payload, error) => resolve({ ok, payload, error }), + context: makeContext(), + client: null, + isWebchatConnect: () => false, + }); + }); +} + +afterEach(() => { + __resetMeshRunsForTest(); + mocks.agent.mockReset(); + mocks.agentWait.mockReset(); +}); + +describe("mesh handlers", () => { + it("builds a default single-step plan", async () => { + const res = await callMesh("mesh.plan", { goal: "Write release notes" }); + expect(res.ok).toBe(true); + const payload = res.payload as { plan: { goal: string; steps: Array<{ id: string }> } }; + expect(payload.plan.goal).toBe("Write release notes"); + expect(payload.plan.steps).toHaveLength(1); + expect(payload.plan.steps[0]?.id).toBe("step-1"); + }); + + it("rejects cyclic plans", async () => { + const cyclePlan = { + planId: "mesh-plan-1", + goal: "cycle", + createdAt: Date.now(), + steps: [ + { id: "a", prompt: "a", dependsOn: ["b"] }, + { id: "b", prompt: "b", dependsOn: ["a"] }, + ], + }; + const res = await callMesh("mesh.run", { plan: cyclePlan }); + expect(res.ok).toBe(false); + }); + + it("runs steps in DAG order and supports retrying failed steps", async () => { + const runState = new Map(); + mocks.agent.mockImplementation( + (opts: { params: { idempotencyKey: string }; respond: (ok: boolean, payload?: unknown) => void }) => { + const agentRunId = `agent-${opts.params.idempotencyKey}`; + runState.set(agentRunId, "ok"); + if (opts.params.idempotencyKey.includes(":review:1")) { + runState.set(agentRunId, "error"); + } + opts.respond(true, { runId: agentRunId, status: "accepted" }); + }, + ); + mocks.agentWait.mockImplementation( + (opts: { params: { runId: string }; respond: (ok: boolean, payload?: unknown) => void }) => { + const status = runState.get(opts.params.runId) ?? "error"; + if (status === "ok") { + opts.respond(true, { runId: opts.params.runId, status: "ok" }); + return; + } + opts.respond(true, { + runId: opts.params.runId, + status: "error", + error: "simulated failure", + }); + }, + ); + + const plan = { + planId: "mesh-plan-2", + goal: "Ship patch", + createdAt: Date.now(), + steps: [ + { id: "research", prompt: "Research requirements" }, + { id: "build", prompt: "Build feature", dependsOn: ["research"] }, + { id: "review", prompt: "Review result", dependsOn: ["build"] }, + ], + }; + + const runRes = await callMesh("mesh.run", { plan }); + expect(runRes.ok).toBe(true); + const runPayload = runRes.payload as { + runId: string; + status: string; + stats: { failed: number }; + }; + expect(runPayload.status).toBe("failed"); + expect(runPayload.stats.failed).toBe(1); + + // Make subsequent retries succeed + mocks.agent.mockImplementation( + (opts: { params: { idempotencyKey: string }; respond: (ok: boolean, payload?: unknown) => void }) => { + const agentRunId = `agent-${opts.params.idempotencyKey}`; + runState.set(agentRunId, "ok"); + opts.respond(true, { runId: agentRunId, status: "accepted" }); + }, + ); + + const retryRes = await callMesh("mesh.retry", { + runId: runPayload.runId, + stepIds: ["review"], + }); + expect(retryRes.ok).toBe(true); + const retryPayload = retryRes.payload as { status: string; stats: { failed: number } }; + expect(retryPayload.status).toBe("completed"); + expect(retryPayload.stats.failed).toBe(0); + + const statusRes = await callMesh("mesh.status", { runId: runPayload.runId }); + expect(statusRes.ok).toBe(true); + const statusPayload = statusRes.payload as { status: string }; + expect(statusPayload.status).toBe("completed"); + }); +}); diff --git a/src/gateway/server-methods/mesh.ts b/src/gateway/server-methods/mesh.ts new file mode 100644 index 000000000..37587ef85 --- /dev/null +++ b/src/gateway/server-methods/mesh.ts @@ -0,0 +1,706 @@ +import { randomUUID } from "node:crypto"; +import type { GatewayRequestHandlerOptions, GatewayRequestHandlers, RespondFn } from "./types.js"; +import { + ErrorCodes, + errorShape, + formatValidationErrors, + validateMeshPlanParams, + validateMeshRetryParams, + validateMeshRunParams, + validateMeshStatusParams, + type MeshRunParams, + type MeshWorkflowPlan, +} from "../protocol/index.js"; +import { agentHandlers } from "./agent.js"; + +type MeshStepStatus = "pending" | "running" | "succeeded" | "failed" | "skipped"; +type MeshRunStatus = "pending" | "running" | "completed" | "failed"; + +type MeshStepRuntime = { + id: string; + name?: string; + prompt: string; + dependsOn: string[]; + agentId?: string; + sessionKey?: string; + thinking?: string; + timeoutMs?: number; + status: MeshStepStatus; + attempts: number; + startedAt?: number; + endedAt?: number; + agentRunId?: string; + error?: string; +}; + +type MeshRunRecord = { + runId: string; + plan: MeshWorkflowPlan; + status: MeshRunStatus; + startedAt: number; + endedAt?: number; + continueOnError: boolean; + maxParallel: number; + defaultStepTimeoutMs: number; + lane?: string; + stepOrder: string[]; + steps: Record; + history: Array<{ ts: number; type: string; stepId?: string; data?: Record }>; +}; + +const meshRuns = new Map(); +const MAX_KEEP_RUNS = 200; + +function trimMap() { + if (meshRuns.size <= MAX_KEEP_RUNS) { + return; + } + const sorted = [...meshRuns.values()].sort((a, b) => a.startedAt - b.startedAt); + const overflow = meshRuns.size - MAX_KEEP_RUNS; + for (const stale of sorted.slice(0, overflow)) { + meshRuns.delete(stale.runId); + } +} + +function normalizeDependsOn(dependsOn: string[] | undefined): string[] { + if (!Array.isArray(dependsOn)) { + return []; + } + const seen = new Set(); + const normalized: string[] = []; + for (const raw of dependsOn) { + const trimmed = String(raw ?? "").trim(); + if (!trimmed || seen.has(trimmed)) { + continue; + } + seen.add(trimmed); + normalized.push(trimmed); + } + return normalized; +} + +function normalizePlan(plan: MeshWorkflowPlan): MeshWorkflowPlan { + return { + planId: plan.planId.trim(), + goal: plan.goal.trim(), + createdAt: plan.createdAt, + steps: plan.steps.map((step) => ({ + id: step.id.trim(), + name: typeof step.name === "string" ? step.name.trim() || undefined : undefined, + prompt: step.prompt.trim(), + dependsOn: normalizeDependsOn(step.dependsOn), + agentId: typeof step.agentId === "string" ? step.agentId.trim() || undefined : undefined, + sessionKey: + typeof step.sessionKey === "string" ? step.sessionKey.trim() || undefined : undefined, + thinking: typeof step.thinking === "string" ? step.thinking : undefined, + timeoutMs: + typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs) + ? Math.max(1_000, Math.floor(step.timeoutMs)) + : undefined, + })), + }; +} + +function createPlanFromParams(params: { + goal: string; + steps?: Array<{ + id?: string; + name?: string; + prompt: string; + dependsOn?: string[]; + agentId?: string; + sessionKey?: string; + thinking?: string; + timeoutMs?: number; + }>; +}): MeshWorkflowPlan { + const now = Date.now(); + const goal = params.goal.trim(); + const sourceSteps = params.steps?.length + ? params.steps + : [ + { + id: "step-1", + name: "Primary Task", + prompt: goal, + }, + ]; + + const steps = sourceSteps.map((step, index) => { + const stepId = step.id?.trim() || `step-${index + 1}`; + return { + id: stepId, + name: step.name?.trim() || undefined, + prompt: step.prompt.trim(), + dependsOn: normalizeDependsOn(step.dependsOn), + agentId: step.agentId?.trim() || undefined, + sessionKey: step.sessionKey?.trim() || undefined, + thinking: typeof step.thinking === "string" ? step.thinking : undefined, + timeoutMs: + typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs) + ? Math.max(1_000, Math.floor(step.timeoutMs)) + : undefined, + }; + }); + + return { + planId: `mesh-plan-${randomUUID()}`, + goal, + createdAt: now, + steps, + }; +} + +function validatePlanGraph(plan: MeshWorkflowPlan): { ok: true; order: string[] } | { ok: false; error: string } { + const ids = new Set(); + for (const step of plan.steps) { + if (ids.has(step.id)) { + return { ok: false, error: `duplicate step id: ${step.id}` }; + } + ids.add(step.id); + } + + for (const step of plan.steps) { + for (const depId of step.dependsOn ?? []) { + if (!ids.has(depId)) { + return { ok: false, error: `unknown dependency "${depId}" on step "${step.id}"` }; + } + if (depId === step.id) { + return { ok: false, error: `step "${step.id}" cannot depend on itself` }; + } + } + } + + const inDegree = new Map(); + const outgoing = new Map(); + for (const step of plan.steps) { + inDegree.set(step.id, 0); + outgoing.set(step.id, []); + } + for (const step of plan.steps) { + for (const dep of step.dependsOn ?? []) { + inDegree.set(step.id, (inDegree.get(step.id) ?? 0) + 1); + const list = outgoing.get(dep); + if (list) { + list.push(step.id); + } + } + } + + const queue = plan.steps.filter((step) => (inDegree.get(step.id) ?? 0) === 0).map((s) => s.id); + const order: string[] = []; + + while (queue.length > 0) { + const current = queue.shift(); + if (!current) { + continue; + } + order.push(current); + const targets = outgoing.get(current) ?? []; + for (const next of targets) { + const degree = (inDegree.get(next) ?? 0) - 1; + inDegree.set(next, degree); + if (degree === 0) { + queue.push(next); + } + } + } + + if (order.length !== plan.steps.length) { + return { ok: false, error: "workflow contains a dependency cycle" }; + } + return { ok: true, order }; +} + +async function callGatewayHandler( + handler: (opts: GatewayRequestHandlerOptions) => Promise | void, + opts: GatewayRequestHandlerOptions, +): Promise<{ ok: boolean; payload?: unknown; error?: unknown; meta?: Record }> { + return await new Promise((resolve) => { + let settled = false; + const settle = (result: { ok: boolean; payload?: unknown; error?: unknown; meta?: Record }) => { + if (settled) { + return; + } + settled = true; + resolve(result); + }; + const respond: RespondFn = (ok, payload, error, meta) => { + settle({ ok, payload, error, meta }); + }; + void Promise.resolve( + handler({ + ...opts, + respond, + }), + ).catch((err) => { + settle({ ok: false, error: err }); + }); + }); +} + +function buildStepPrompt(step: MeshStepRuntime, run: MeshRunRecord): string { + if (step.dependsOn.length === 0) { + return step.prompt; + } + const lines = step.dependsOn.map((depId) => { + const dep = run.steps[depId]; + const details = dep.agentRunId ? ` runId=${dep.agentRunId}` : ""; + return `- ${depId}: ${dep.status}${details}`; + }); + return `${step.prompt}\n\nDependency context:\n${lines.join("\n")}`; +} + +function resolveStepTimeoutMs(step: MeshStepRuntime, run: MeshRunRecord): number { + if (typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)) { + return Math.max(1_000, Math.floor(step.timeoutMs)); + } + return run.defaultStepTimeoutMs; +} + +async function executeStep(params: { + run: MeshRunRecord; + step: MeshStepRuntime; + opts: GatewayRequestHandlerOptions; +}) { + const { run, step, opts } = params; + step.status = "running"; + step.startedAt = Date.now(); + step.endedAt = undefined; + step.error = undefined; + step.attempts += 1; + run.history.push({ ts: Date.now(), type: "step.start", stepId: step.id }); + + const agentRequestId = `${run.runId}:${step.id}:${step.attempts}`; + const prompt = buildStepPrompt(step, run); + const timeoutMs = resolveStepTimeoutMs(step, run); + const timeoutSeconds = Math.ceil(timeoutMs / 1000); + + const accepted = await callGatewayHandler(agentHandlers.agent, { + ...opts, + req: { + type: "req", + id: `${agentRequestId}:agent`, + method: "agent", + params: {}, + }, + params: { + message: prompt, + idempotencyKey: agentRequestId, + ...(step.agentId ? { agentId: step.agentId } : {}), + ...(step.sessionKey ? { sessionKey: step.sessionKey } : {}), + ...(step.thinking ? { thinking: step.thinking } : {}), + ...(run.lane ? { lane: run.lane } : {}), + timeout: timeoutSeconds, + deliver: false, + }, + }); + + if (!accepted.ok) { + step.status = "failed"; + step.endedAt = Date.now(); + step.error = String(accepted.error ?? "agent request failed"); + run.history.push({ + ts: Date.now(), + type: "step.error", + stepId: step.id, + data: { error: step.error }, + }); + return; + } + + const runId = (() => { + const candidate = accepted.payload as { runId?: unknown } | undefined; + return typeof candidate?.runId === "string" ? candidate.runId : undefined; + })(); + step.agentRunId = runId; + + if (!runId) { + step.status = "failed"; + step.endedAt = Date.now(); + step.error = "agent did not return runId"; + run.history.push({ + ts: Date.now(), + type: "step.error", + stepId: step.id, + data: { error: step.error }, + }); + return; + } + + const waited = await callGatewayHandler(agentHandlers["agent.wait"], { + ...opts, + req: { + type: "req", + id: `${agentRequestId}:wait`, + method: "agent.wait", + params: {}, + }, + params: { + runId, + timeoutMs, + }, + }); + + const waitPayload = waited.payload as { status?: unknown; error?: unknown } | undefined; + const waitStatus = typeof waitPayload?.status === "string" ? waitPayload.status : "error"; + if (waited.ok && waitStatus === "ok") { + step.status = "succeeded"; + step.endedAt = Date.now(); + run.history.push({ ts: Date.now(), type: "step.ok", stepId: step.id, data: { runId } }); + return; + } + + step.status = "failed"; + step.endedAt = Date.now(); + step.error = + typeof waitPayload?.error === "string" + ? waitPayload.error + : String(waited.error ?? `agent.wait returned status ${waitStatus}`); + run.history.push({ + ts: Date.now(), + type: "step.error", + stepId: step.id, + data: { runId, status: waitStatus, error: step.error }, + }); +} + +function createRunRecord(params: { + runId: string; + plan: MeshWorkflowPlan; + order: string[]; + continueOnError: boolean; + maxParallel: number; + defaultStepTimeoutMs: number; + lane?: string; +}): MeshRunRecord { + const steps: Record = {}; + for (const step of params.plan.steps) { + steps[step.id] = { + id: step.id, + name: step.name, + prompt: step.prompt, + dependsOn: step.dependsOn ?? [], + agentId: step.agentId, + sessionKey: step.sessionKey, + thinking: step.thinking, + timeoutMs: step.timeoutMs, + status: "pending", + attempts: 0, + }; + } + return { + runId: params.runId, + plan: params.plan, + status: "pending", + startedAt: Date.now(), + continueOnError: params.continueOnError, + maxParallel: params.maxParallel, + defaultStepTimeoutMs: params.defaultStepTimeoutMs, + lane: params.lane, + stepOrder: params.order, + steps, + history: [], + }; +} + +function findReadySteps(run: MeshRunRecord): MeshStepRuntime[] { + const ready: MeshStepRuntime[] = []; + for (const stepId of run.stepOrder) { + const step = run.steps[stepId]; + if (!step || step.status !== "pending") { + continue; + } + const deps = step.dependsOn.map((depId) => run.steps[depId]).filter(Boolean); + if (deps.some((dep) => dep.status === "failed" || dep.status === "skipped")) { + step.status = "skipped"; + step.endedAt = Date.now(); + step.error = "dependency failed"; + continue; + } + if (deps.every((dep) => dep.status === "succeeded")) { + ready.push(step); + } + } + return ready; +} + +async function runWorkflow(run: MeshRunRecord, opts: GatewayRequestHandlerOptions) { + run.status = "running"; + run.history.push({ ts: Date.now(), type: "run.start" }); + + const inFlight = new Set>(); + let stopScheduling = false; + while (true) { + const failed = Object.values(run.steps).some((step) => step.status === "failed"); + if (failed && !run.continueOnError) { + stopScheduling = true; + } + + if (!stopScheduling) { + const ready = findReadySteps(run); + for (const step of ready) { + if (inFlight.size >= run.maxParallel) { + break; + } + const task = executeStep({ run, step, opts }).finally(() => { + inFlight.delete(task); + }); + inFlight.add(task); + } + } + + if (inFlight.size > 0) { + await Promise.race(inFlight); + continue; + } + + const pending = Object.values(run.steps).filter((step) => step.status === "pending"); + if (pending.length === 0) { + break; + } + for (const step of pending) { + step.status = "skipped"; + step.endedAt = Date.now(); + step.error = stopScheduling ? "cancelled after failure" : "unresolvable dependencies"; + } + break; + } + + const hasFailure = Object.values(run.steps).some((step) => step.status === "failed"); + run.status = hasFailure ? "failed" : "completed"; + run.endedAt = Date.now(); + run.history.push({ + ts: Date.now(), + type: "run.end", + data: { status: run.status }, + }); +} + +function resolveStepIdsForRetry(run: MeshRunRecord, requested?: string[]): string[] { + if (Array.isArray(requested) && requested.length > 0) { + return requested.map((stepId) => stepId.trim()).filter(Boolean); + } + return Object.values(run.steps) + .filter((step) => step.status === "failed" || step.status === "skipped") + .map((step) => step.id); +} + +function descendantsOf(run: MeshRunRecord, roots: Set): Set { + const descendants = new Set(); + const queue = [...roots]; + while (queue.length > 0) { + const current = queue.shift(); + if (!current) { + continue; + } + for (const step of Object.values(run.steps)) { + if (!step.dependsOn.includes(current) || descendants.has(step.id)) { + continue; + } + descendants.add(step.id); + queue.push(step.id); + } + } + return descendants; +} + +function resetStepsForRetry(run: MeshRunRecord, stepIds: string[]) { + const rootSet = new Set(stepIds); + const descendants = descendantsOf(run, rootSet); + const resetIds = new Set([...rootSet, ...descendants]); + for (const stepId of resetIds) { + const step = run.steps[stepId]; + if (!step) { + continue; + } + if (step.status === "succeeded" && !rootSet.has(stepId)) { + continue; + } + step.status = "pending"; + step.startedAt = undefined; + step.endedAt = undefined; + step.error = undefined; + if (rootSet.has(stepId)) { + step.agentRunId = undefined; + } + } +} + +function summarizeRun(run: MeshRunRecord) { + return { + runId: run.runId, + plan: run.plan, + status: run.status, + startedAt: run.startedAt, + endedAt: run.endedAt, + stats: { + total: Object.keys(run.steps).length, + succeeded: Object.values(run.steps).filter((step) => step.status === "succeeded").length, + failed: Object.values(run.steps).filter((step) => step.status === "failed").length, + skipped: Object.values(run.steps).filter((step) => step.status === "skipped").length, + running: Object.values(run.steps).filter((step) => step.status === "running").length, + pending: Object.values(run.steps).filter((step) => step.status === "pending").length, + }, + steps: run.stepOrder.map((stepId) => run.steps[stepId]), + history: run.history, + }; +} + +export const meshHandlers: GatewayRequestHandlers = { + "mesh.plan": ({ params, respond }) => { + if (!validateMeshPlanParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid mesh.plan params: ${formatValidationErrors(validateMeshPlanParams.errors)}`, + ), + ); + return; + } + const p = params; + const plan = normalizePlan( + createPlanFromParams({ + goal: p.goal, + steps: p.steps, + }), + ); + const graph = validatePlanGraph(plan); + if (!graph.ok) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error)); + return; + } + respond( + true, + { + plan, + order: graph.order, + }, + undefined, + ); + }, + "mesh.run": async (opts) => { + const { params, respond } = opts; + if (!validateMeshRunParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid mesh.run params: ${formatValidationErrors(validateMeshRunParams.errors)}`, + ), + ); + return; + } + const p = params as MeshRunParams; + const plan = normalizePlan(p.plan); + const graph = validatePlanGraph(plan); + if (!graph.ok) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error)); + return; + } + + const maxParallel = + typeof p.maxParallel === "number" && Number.isFinite(p.maxParallel) + ? Math.min(16, Math.max(1, Math.floor(p.maxParallel))) + : 2; + const defaultStepTimeoutMs = + typeof p.defaultStepTimeoutMs === "number" && Number.isFinite(p.defaultStepTimeoutMs) + ? Math.max(1_000, Math.floor(p.defaultStepTimeoutMs)) + : 120_000; + const runId = `mesh-run-${randomUUID()}`; + const record = createRunRecord({ + runId, + plan, + order: graph.order, + continueOnError: p.continueOnError === true, + maxParallel, + defaultStepTimeoutMs, + lane: typeof p.lane === "string" ? p.lane : undefined, + }); + meshRuns.set(runId, record); + trimMap(); + + await runWorkflow(record, opts); + respond(true, summarizeRun(record), undefined); + }, + "mesh.status": ({ params, respond }) => { + if (!validateMeshStatusParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid mesh.status params: ${formatValidationErrors(validateMeshStatusParams.errors)}`, + ), + ); + return; + } + const run = meshRuns.get(params.runId.trim()); + if (!run) { + respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found")); + return; + } + respond(true, summarizeRun(run), undefined); + }, + "mesh.retry": async (opts) => { + const { params, respond } = opts; + if (!validateMeshRetryParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid mesh.retry params: ${formatValidationErrors(validateMeshRetryParams.errors)}`, + ), + ); + return; + } + const runId = params.runId.trim(); + const run = meshRuns.get(runId); + if (!run) { + respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found")); + return; + } + if (run.status === "running") { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "mesh run is currently running")); + return; + } + const stepIds = resolveStepIdsForRetry(run, params.stepIds); + if (stepIds.length === 0) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "no failed or skipped steps available to retry"), + ); + return; + } + for (const stepId of stepIds) { + if (!run.steps[stepId]) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, `unknown retry step id: ${stepId}`), + ); + return; + } + } + + resetStepsForRetry(run, stepIds); + run.status = "pending"; + run.endedAt = undefined; + run.history.push({ + ts: Date.now(), + type: "run.retry", + data: { stepIds }, + }); + await runWorkflow(run, opts); + respond(true, summarizeRun(run), undefined); + }, +}; + +export function __resetMeshRunsForTest() { + meshRuns.clear(); +}