Agents: add fallback error observations (#41337)
Merged via squash. Prepared head SHA: 852469c82ff28fb0e1be7f1019f5283e712c4283 Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com> Co-authored-by: altaywtf <9790196+altaywtf@users.noreply.github.com> Reviewed-by: @altaywtf
This commit is contained in:
@@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai
|
||||
- ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky.
|
||||
- Agents/billing recovery: probe single-provider billing cooldowns on the existing throttle so topping up credits can recover without a manual gateway restart. (#41422) thanks @altaywtf.
|
||||
- ACP/follow-up hardening: make session restore and prompt completion degrade gracefully on transcript/update failures, enforce bounded tool-location traversal, and skip non-image ACPX turns the runtime cannot serialize. (#41464) Thanks @mbelinky.
|
||||
- Agents/fallback observability: add structured, sanitized model-fallback decision and auth-profile failure-state events with correlated run IDs so cooldown probes and failover paths are easier to trace in logs. (#41337) thanks @altaywtf.
|
||||
|
||||
## 2026.3.8
|
||||
|
||||
|
||||
38
src/agents/auth-profiles/state-observation.test.ts
Normal file
38
src/agents/auth-profiles/state-observation.test.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { resetLogger, setLoggerOverride } from "../../logging/logger.js";
|
||||
import { logAuthProfileFailureStateChange } from "./state-observation.js";
|
||||
|
||||
afterEach(() => {
|
||||
setLoggerOverride(null);
|
||||
resetLogger();
|
||||
});
|
||||
|
||||
describe("logAuthProfileFailureStateChange", () => {
|
||||
it("sanitizes consoleMessage fields before logging", () => {
|
||||
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
|
||||
setLoggerOverride({ level: "silent", consoleLevel: "warn" });
|
||||
|
||||
logAuthProfileFailureStateChange({
|
||||
runId: "run-1\nforged\tentry\rtest",
|
||||
profileId: "openai:profile-1",
|
||||
provider: "openai\u001b]8;;https://evil.test\u0007",
|
||||
reason: "overloaded",
|
||||
previous: undefined,
|
||||
next: {
|
||||
errorCount: 1,
|
||||
cooldownUntil: 1_700_000_060_000,
|
||||
failureCounts: { overloaded: 1 },
|
||||
},
|
||||
now: 1_700_000_000_000,
|
||||
});
|
||||
|
||||
const consoleLine = warnSpy.mock.calls[0]?.[0];
|
||||
expect(typeof consoleLine).toBe("string");
|
||||
expect(consoleLine).toContain("runId=run-1 forged entry test");
|
||||
expect(consoleLine).toContain("provider=openai]8;;https://evil.test");
|
||||
expect(consoleLine).not.toContain("\n");
|
||||
expect(consoleLine).not.toContain("\r");
|
||||
expect(consoleLine).not.toContain("\t");
|
||||
expect(consoleLine).not.toContain("\u001b");
|
||||
});
|
||||
});
|
||||
59
src/agents/auth-profiles/state-observation.ts
Normal file
59
src/agents/auth-profiles/state-observation.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { redactIdentifier } from "../../logging/redact-identifier.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { sanitizeForConsole } from "../pi-embedded-error-observation.js";
|
||||
import type { AuthProfileFailureReason, ProfileUsageStats } from "./types.js";
|
||||
|
||||
const observationLog = createSubsystemLogger("agent/embedded");
|
||||
|
||||
export function logAuthProfileFailureStateChange(params: {
|
||||
runId?: string;
|
||||
profileId: string;
|
||||
provider: string;
|
||||
reason: AuthProfileFailureReason;
|
||||
previous: ProfileUsageStats | undefined;
|
||||
next: ProfileUsageStats;
|
||||
now: number;
|
||||
}): void {
|
||||
const windowType =
|
||||
params.reason === "billing" || params.reason === "auth_permanent" ? "disabled" : "cooldown";
|
||||
const previousCooldownUntil = params.previous?.cooldownUntil;
|
||||
const previousDisabledUntil = params.previous?.disabledUntil;
|
||||
// Active cooldown/disable windows are intentionally immutable; log whether this
|
||||
// update reused the existing window instead of extending it.
|
||||
const windowReused =
|
||||
windowType === "disabled"
|
||||
? typeof previousDisabledUntil === "number" &&
|
||||
Number.isFinite(previousDisabledUntil) &&
|
||||
previousDisabledUntil > params.now &&
|
||||
previousDisabledUntil === params.next.disabledUntil
|
||||
: typeof previousCooldownUntil === "number" &&
|
||||
Number.isFinite(previousCooldownUntil) &&
|
||||
previousCooldownUntil > params.now &&
|
||||
previousCooldownUntil === params.next.cooldownUntil;
|
||||
const safeProfileId = redactIdentifier(params.profileId, { len: 12 });
|
||||
const safeRunId = sanitizeForConsole(params.runId) ?? "-";
|
||||
const safeProvider = sanitizeForConsole(params.provider) ?? "-";
|
||||
|
||||
observationLog.warn("auth profile failure state updated", {
|
||||
event: "auth_profile_failure_state_updated",
|
||||
tags: ["error_handling", "auth_profiles", windowType],
|
||||
runId: params.runId,
|
||||
profileId: safeProfileId,
|
||||
provider: params.provider,
|
||||
reason: params.reason,
|
||||
windowType,
|
||||
windowReused,
|
||||
previousErrorCount: params.previous?.errorCount,
|
||||
errorCount: params.next.errorCount,
|
||||
previousCooldownUntil,
|
||||
cooldownUntil: params.next.cooldownUntil,
|
||||
previousDisabledUntil,
|
||||
disabledUntil: params.next.disabledUntil,
|
||||
previousDisabledReason: params.previous?.disabledReason,
|
||||
disabledReason: params.next.disabledReason,
|
||||
failureCounts: params.next.failureCounts,
|
||||
consoleMessage:
|
||||
`auth profile failure state updated: runId=${safeRunId} profile=${safeProfileId} provider=${safeProvider} ` +
|
||||
`reason=${params.reason} window=${windowType} reused=${String(windowReused)}`,
|
||||
});
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { normalizeProviderId } from "../model-selection.js";
|
||||
import { logAuthProfileFailureStateChange } from "./state-observation.js";
|
||||
import { saveAuthProfileStore, updateAuthProfileStoreWithLock } from "./store.js";
|
||||
import type { AuthProfileFailureReason, AuthProfileStore, ProfileUsageStats } from "./types.js";
|
||||
|
||||
@@ -462,12 +463,16 @@ export async function markAuthProfileFailure(params: {
|
||||
reason: AuthProfileFailureReason;
|
||||
cfg?: OpenClawConfig;
|
||||
agentDir?: string;
|
||||
runId?: string;
|
||||
}): Promise<void> {
|
||||
const { store, profileId, reason, agentDir, cfg } = params;
|
||||
const { store, profileId, reason, agentDir, cfg, runId } = params;
|
||||
const profile = store.profiles[profileId];
|
||||
if (!profile || isAuthCooldownBypassedForProvider(profile.provider)) {
|
||||
return;
|
||||
}
|
||||
let nextStats: ProfileUsageStats | undefined;
|
||||
let previousStats: ProfileUsageStats | undefined;
|
||||
let updateTime = 0;
|
||||
const updated = await updateAuthProfileStoreWithLock({
|
||||
agentDir,
|
||||
updater: (freshStore) => {
|
||||
@@ -482,19 +487,32 @@ export async function markAuthProfileFailure(params: {
|
||||
providerId: providerKey,
|
||||
});
|
||||
|
||||
updateUsageStatsEntry(freshStore, profileId, (existing) =>
|
||||
computeNextProfileUsageStats({
|
||||
existing: existing ?? {},
|
||||
now,
|
||||
reason,
|
||||
cfgResolved,
|
||||
}),
|
||||
);
|
||||
previousStats = freshStore.usageStats?.[profileId];
|
||||
updateTime = now;
|
||||
const computed = computeNextProfileUsageStats({
|
||||
existing: previousStats ?? {},
|
||||
now,
|
||||
reason,
|
||||
cfgResolved,
|
||||
});
|
||||
nextStats = computed;
|
||||
updateUsageStatsEntry(freshStore, profileId, () => computed);
|
||||
return true;
|
||||
},
|
||||
});
|
||||
if (updated) {
|
||||
store.usageStats = updated.usageStats;
|
||||
if (nextStats) {
|
||||
logAuthProfileFailureStateChange({
|
||||
runId,
|
||||
profileId,
|
||||
provider: profile.provider,
|
||||
reason,
|
||||
previous: previousStats,
|
||||
next: nextStats,
|
||||
now: updateTime,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!store.profiles[profileId]) {
|
||||
@@ -508,15 +526,25 @@ export async function markAuthProfileFailure(params: {
|
||||
providerId: providerKey,
|
||||
});
|
||||
|
||||
updateUsageStatsEntry(store, profileId, (existing) =>
|
||||
computeNextProfileUsageStats({
|
||||
existing: existing ?? {},
|
||||
now,
|
||||
reason,
|
||||
cfgResolved,
|
||||
}),
|
||||
);
|
||||
previousStats = store.usageStats?.[profileId];
|
||||
const computed = computeNextProfileUsageStats({
|
||||
existing: previousStats ?? {},
|
||||
now,
|
||||
reason,
|
||||
cfgResolved,
|
||||
});
|
||||
nextStats = computed;
|
||||
updateUsageStatsEntry(store, profileId, () => computed);
|
||||
saveAuthProfileStore(store, agentDir);
|
||||
logAuthProfileFailureStateChange({
|
||||
runId,
|
||||
profileId,
|
||||
provider: store.profiles[profileId]?.provider ?? profile.provider,
|
||||
reason,
|
||||
previous: previousStats,
|
||||
next: nextStats,
|
||||
now,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -528,12 +556,14 @@ export async function markAuthProfileCooldown(params: {
|
||||
store: AuthProfileStore;
|
||||
profileId: string;
|
||||
agentDir?: string;
|
||||
runId?: string;
|
||||
}): Promise<void> {
|
||||
await markAuthProfileFailure({
|
||||
store: params.store,
|
||||
profileId: params.profileId,
|
||||
reason: "unknown",
|
||||
agentDir: params.agentDir,
|
||||
runId: params.runId,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
93
src/agents/model-fallback-observation.ts
Normal file
93
src/agents/model-fallback-observation.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { sanitizeForLog } from "../terminal/ansi.js";
|
||||
import type { FallbackAttempt, ModelCandidate } from "./model-fallback.types.js";
|
||||
import { buildTextObservationFields } from "./pi-embedded-error-observation.js";
|
||||
import type { FailoverReason } from "./pi-embedded-helpers.js";
|
||||
|
||||
const decisionLog = createSubsystemLogger("model-fallback").child("decision");
|
||||
|
||||
function buildErrorObservationFields(error?: string): {
|
||||
errorPreview?: string;
|
||||
errorHash?: string;
|
||||
errorFingerprint?: string;
|
||||
httpCode?: string;
|
||||
providerErrorType?: string;
|
||||
providerErrorMessagePreview?: string;
|
||||
requestIdHash?: string;
|
||||
} {
|
||||
const observed = buildTextObservationFields(error);
|
||||
return {
|
||||
errorPreview: observed.textPreview,
|
||||
errorHash: observed.textHash,
|
||||
errorFingerprint: observed.textFingerprint,
|
||||
httpCode: observed.httpCode,
|
||||
providerErrorType: observed.providerErrorType,
|
||||
providerErrorMessagePreview: observed.providerErrorMessagePreview,
|
||||
requestIdHash: observed.requestIdHash,
|
||||
};
|
||||
}
|
||||
|
||||
export function logModelFallbackDecision(params: {
|
||||
decision:
|
||||
| "skip_candidate"
|
||||
| "probe_cooldown_candidate"
|
||||
| "candidate_failed"
|
||||
| "candidate_succeeded";
|
||||
runId?: string;
|
||||
requestedProvider: string;
|
||||
requestedModel: string;
|
||||
candidate: ModelCandidate;
|
||||
attempt?: number;
|
||||
total?: number;
|
||||
reason?: FailoverReason | null;
|
||||
status?: number;
|
||||
code?: string;
|
||||
error?: string;
|
||||
nextCandidate?: ModelCandidate;
|
||||
isPrimary?: boolean;
|
||||
requestedModelMatched?: boolean;
|
||||
fallbackConfigured?: boolean;
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
profileCount?: number;
|
||||
previousAttempts?: FallbackAttempt[];
|
||||
}): void {
|
||||
const nextText = params.nextCandidate
|
||||
? `${sanitizeForLog(params.nextCandidate.provider)}/${sanitizeForLog(params.nextCandidate.model)}`
|
||||
: "none";
|
||||
const reasonText = params.reason ?? "unknown";
|
||||
const observedError = buildErrorObservationFields(params.error);
|
||||
decisionLog.warn("model fallback decision", {
|
||||
event: "model_fallback_decision",
|
||||
tags: ["error_handling", "model_fallback", params.decision],
|
||||
runId: params.runId,
|
||||
decision: params.decision,
|
||||
requestedProvider: params.requestedProvider,
|
||||
requestedModel: params.requestedModel,
|
||||
candidateProvider: params.candidate.provider,
|
||||
candidateModel: params.candidate.model,
|
||||
attempt: params.attempt,
|
||||
total: params.total,
|
||||
reason: params.reason,
|
||||
status: params.status,
|
||||
code: params.code,
|
||||
...observedError,
|
||||
nextCandidateProvider: params.nextCandidate?.provider,
|
||||
nextCandidateModel: params.nextCandidate?.model,
|
||||
isPrimary: params.isPrimary,
|
||||
requestedModelMatched: params.requestedModelMatched,
|
||||
fallbackConfigured: params.fallbackConfigured,
|
||||
allowTransientCooldownProbe: params.allowTransientCooldownProbe,
|
||||
profileCount: params.profileCount,
|
||||
previousAttempts: params.previousAttempts?.map((attempt) => ({
|
||||
provider: attempt.provider,
|
||||
model: attempt.model,
|
||||
reason: attempt.reason,
|
||||
status: attempt.status,
|
||||
code: attempt.code,
|
||||
...buildErrorObservationFields(attempt.error),
|
||||
})),
|
||||
consoleMessage:
|
||||
`model fallback decision: decision=${params.decision} requested=${sanitizeForLog(params.requestedProvider)}/${sanitizeForLog(params.requestedModel)} ` +
|
||||
`candidate=${sanitizeForLog(params.candidate.provider)}/${sanitizeForLog(params.candidate.model)} reason=${reasonText} next=${nextText}`,
|
||||
});
|
||||
}
|
||||
@@ -1,5 +1,8 @@
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { registerLogTransport, resetLogger, setLoggerOverride } from "../logging/logger.js";
|
||||
import type { AuthProfileStore } from "./auth-profiles.js";
|
||||
import { makeModelFallbackCfg } from "./test-helpers/model-fallback-config-fixture.js";
|
||||
|
||||
@@ -28,6 +31,7 @@ const mockedResolveProfilesUnavailableReason = vi.mocked(resolveProfilesUnavaila
|
||||
const mockedResolveAuthProfileOrder = vi.mocked(resolveAuthProfileOrder);
|
||||
|
||||
const makeCfg = makeModelFallbackCfg;
|
||||
let unregisterLogTransport: (() => void) | undefined;
|
||||
|
||||
function expectFallbackUsed(
|
||||
result: { result: unknown; attempts: Array<{ reason?: string }> },
|
||||
@@ -149,6 +153,10 @@ describe("runWithModelFallback – probe logic", () => {
|
||||
|
||||
afterEach(() => {
|
||||
Date.now = realDateNow;
|
||||
unregisterLogTransport?.();
|
||||
unregisterLogTransport = undefined;
|
||||
setLoggerOverride(null);
|
||||
resetLogger();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
@@ -194,6 +202,99 @@ describe("runWithModelFallback – probe logic", () => {
|
||||
expectPrimaryProbeSuccess(result, run, "probed-ok");
|
||||
});
|
||||
|
||||
it("logs primary metadata on probe success and failure fallback decisions", async () => {
|
||||
const cfg = makeCfg();
|
||||
const records: Array<Record<string, unknown>> = [];
|
||||
mockedGetSoonestCooldownExpiry.mockReturnValue(NOW + 60 * 1000);
|
||||
setLoggerOverride({
|
||||
level: "trace",
|
||||
consoleLevel: "silent",
|
||||
file: path.join(os.tmpdir(), `openclaw-model-fallback-probe-${Date.now()}.log`),
|
||||
});
|
||||
unregisterLogTransport = registerLogTransport((record) => {
|
||||
records.push(record);
|
||||
});
|
||||
|
||||
const run = vi.fn().mockResolvedValue("probed-ok");
|
||||
|
||||
const result = await runPrimaryCandidate(cfg, run);
|
||||
|
||||
expectPrimaryProbeSuccess(result, run, "probed-ok");
|
||||
|
||||
_probeThrottleInternals.lastProbeAttempt.clear();
|
||||
|
||||
const fallbackCfg = makeCfg({
|
||||
agents: {
|
||||
defaults: {
|
||||
model: {
|
||||
primary: "openai/gpt-4.1-mini",
|
||||
fallbacks: ["anthropic/claude-haiku-3-5", "google/gemini-2-flash"],
|
||||
},
|
||||
},
|
||||
},
|
||||
} as Partial<OpenClawConfig>);
|
||||
mockedGetSoonestCooldownExpiry.mockReturnValue(NOW + 60 * 1000);
|
||||
const fallbackRun = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(Object.assign(new Error("rate limited"), { status: 429 }))
|
||||
.mockResolvedValueOnce("fallback-ok");
|
||||
|
||||
const fallbackResult = await runPrimaryCandidate(fallbackCfg, fallbackRun);
|
||||
|
||||
expect(fallbackResult.result).toBe("fallback-ok");
|
||||
expect(fallbackRun).toHaveBeenNthCalledWith(1, "openai", "gpt-4.1-mini", {
|
||||
allowTransientCooldownProbe: true,
|
||||
});
|
||||
expect(fallbackRun).toHaveBeenNthCalledWith(2, "anthropic", "claude-haiku-3-5");
|
||||
|
||||
const decisionPayloads = records
|
||||
.filter(
|
||||
(record) =>
|
||||
record["2"] === "model fallback decision" &&
|
||||
record["1"] &&
|
||||
typeof record["1"] === "object",
|
||||
)
|
||||
.map((record) => record["1"] as Record<string, unknown>);
|
||||
|
||||
expect(decisionPayloads).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
event: "model_fallback_decision",
|
||||
decision: "probe_cooldown_candidate",
|
||||
candidateProvider: "openai",
|
||||
candidateModel: "gpt-4.1-mini",
|
||||
allowTransientCooldownProbe: true,
|
||||
}),
|
||||
expect.objectContaining({
|
||||
event: "model_fallback_decision",
|
||||
decision: "candidate_succeeded",
|
||||
candidateProvider: "openai",
|
||||
candidateModel: "gpt-4.1-mini",
|
||||
isPrimary: true,
|
||||
requestedModelMatched: true,
|
||||
}),
|
||||
expect.objectContaining({
|
||||
event: "model_fallback_decision",
|
||||
decision: "candidate_failed",
|
||||
candidateProvider: "openai",
|
||||
candidateModel: "gpt-4.1-mini",
|
||||
isPrimary: true,
|
||||
requestedModelMatched: true,
|
||||
nextCandidateProvider: "anthropic",
|
||||
nextCandidateModel: "claude-haiku-3-5",
|
||||
}),
|
||||
expect.objectContaining({
|
||||
event: "model_fallback_decision",
|
||||
decision: "candidate_succeeded",
|
||||
candidateProvider: "anthropic",
|
||||
candidateModel: "claude-haiku-3-5",
|
||||
isPrimary: false,
|
||||
requestedModelMatched: false,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("probes primary model when cooldown already expired", async () => {
|
||||
const cfg = makeCfg();
|
||||
// Cooldown expired 5 min ago
|
||||
|
||||
@@ -207,6 +207,7 @@ async function runEmbeddedFallback(params: {
|
||||
cfg,
|
||||
provider: "openai",
|
||||
model: "mock-1",
|
||||
runId: params.runId,
|
||||
agentDir: params.agentDir,
|
||||
run: (provider, model, options) =>
|
||||
runEmbeddedPiAgent({
|
||||
|
||||
@@ -536,7 +536,9 @@ describe("runWithModelFallback", () => {
|
||||
});
|
||||
|
||||
expect(result.result).toBe("ok");
|
||||
const warning = warnSpy.mock.calls[0]?.[0] as string;
|
||||
const warning = warnSpy.mock.calls
|
||||
.map((call) => call[0] as string)
|
||||
.find((value) => value.includes('Model "openai/gpt-6spoof" not found'));
|
||||
expect(warning).toContain('Model "openai/gpt-6spoof" not found');
|
||||
expect(warning).not.toContain("\u001B");
|
||||
expect(warning).not.toContain("\n");
|
||||
|
||||
@@ -19,6 +19,8 @@ import {
|
||||
isFailoverError,
|
||||
isTimeoutError,
|
||||
} from "./failover-error.js";
|
||||
import { logModelFallbackDecision } from "./model-fallback-observation.js";
|
||||
import type { FallbackAttempt, ModelCandidate } from "./model-fallback.types.js";
|
||||
import {
|
||||
buildConfiguredAllowlistKeys,
|
||||
buildModelAliasIndex,
|
||||
@@ -32,11 +34,6 @@ import { isLikelyContextOverflowError } from "./pi-embedded-helpers.js";
|
||||
|
||||
const log = createSubsystemLogger("model-fallback");
|
||||
|
||||
type ModelCandidate = {
|
||||
provider: string;
|
||||
model: string;
|
||||
};
|
||||
|
||||
export type ModelFallbackRunOptions = {
|
||||
allowTransientCooldownProbe?: boolean;
|
||||
};
|
||||
@@ -47,15 +44,6 @@ type ModelFallbackRunFn<T> = (
|
||||
options?: ModelFallbackRunOptions,
|
||||
) => Promise<T>;
|
||||
|
||||
type FallbackAttempt = {
|
||||
provider: string;
|
||||
model: string;
|
||||
error: string;
|
||||
reason?: FailoverReason;
|
||||
status?: number;
|
||||
code?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Fallback abort check. Only treats explicit AbortError names as user aborts.
|
||||
* Message-based checks (e.g., "aborted") can mask timeouts and skip fallback.
|
||||
@@ -515,6 +503,7 @@ export async function runWithModelFallback<T>(params: {
|
||||
cfg: OpenClawConfig | undefined;
|
||||
provider: string;
|
||||
model: string;
|
||||
runId?: string;
|
||||
agentDir?: string;
|
||||
/** Optional explicit fallbacks list; when provided (even empty), replaces agents.defaults.model.fallbacks. */
|
||||
fallbacksOverride?: string[];
|
||||
@@ -537,7 +526,11 @@ export async function runWithModelFallback<T>(params: {
|
||||
|
||||
for (let i = 0; i < candidates.length; i += 1) {
|
||||
const candidate = candidates[i];
|
||||
const isPrimary = i === 0;
|
||||
const requestedModel =
|
||||
params.provider === candidate.provider && params.model === candidate.model;
|
||||
let runOptions: ModelFallbackRunOptions | undefined;
|
||||
let attemptedDuringCooldown = false;
|
||||
if (authStore) {
|
||||
const profileIds = resolveAuthProfileOrder({
|
||||
cfg: params.cfg,
|
||||
@@ -548,9 +541,6 @@ export async function runWithModelFallback<T>(params: {
|
||||
|
||||
if (profileIds.length > 0 && !isAnyProfileAvailable) {
|
||||
// All profiles for this provider are in cooldown.
|
||||
const isPrimary = i === 0;
|
||||
const requestedModel =
|
||||
params.provider === candidate.provider && params.model === candidate.model;
|
||||
const now = Date.now();
|
||||
const probeThrottleKey = resolveProbeThrottleKey(candidate.provider, params.agentDir);
|
||||
const decision = resolveCooldownDecision({
|
||||
@@ -571,6 +561,22 @@ export async function runWithModelFallback<T>(params: {
|
||||
error: decision.error,
|
||||
reason: decision.reason,
|
||||
});
|
||||
logModelFallbackDecision({
|
||||
decision: "skip_candidate",
|
||||
runId: params.runId,
|
||||
requestedProvider: params.provider,
|
||||
requestedModel: params.model,
|
||||
candidate,
|
||||
attempt: i + 1,
|
||||
total: candidates.length,
|
||||
reason: decision.reason,
|
||||
error: decision.error,
|
||||
nextCandidate: candidates[i + 1],
|
||||
isPrimary,
|
||||
requestedModelMatched: requestedModel,
|
||||
fallbackConfigured: hasFallbackCandidates,
|
||||
profileCount: profileIds.length,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -584,6 +590,23 @@ export async function runWithModelFallback<T>(params: {
|
||||
) {
|
||||
runOptions = { allowTransientCooldownProbe: true };
|
||||
}
|
||||
attemptedDuringCooldown = true;
|
||||
logModelFallbackDecision({
|
||||
decision: "probe_cooldown_candidate",
|
||||
runId: params.runId,
|
||||
requestedProvider: params.provider,
|
||||
requestedModel: params.model,
|
||||
candidate,
|
||||
attempt: i + 1,
|
||||
total: candidates.length,
|
||||
reason: decision.reason,
|
||||
nextCandidate: candidates[i + 1],
|
||||
isPrimary,
|
||||
requestedModelMatched: requestedModel,
|
||||
fallbackConfigured: hasFallbackCandidates,
|
||||
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
|
||||
profileCount: profileIds.length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -594,6 +617,21 @@ export async function runWithModelFallback<T>(params: {
|
||||
options: runOptions,
|
||||
});
|
||||
if ("success" in attemptRun) {
|
||||
if (i > 0 || attempts.length > 0 || attemptedDuringCooldown) {
|
||||
logModelFallbackDecision({
|
||||
decision: "candidate_succeeded",
|
||||
runId: params.runId,
|
||||
requestedProvider: params.provider,
|
||||
requestedModel: params.model,
|
||||
candidate,
|
||||
attempt: i + 1,
|
||||
total: candidates.length,
|
||||
previousAttempts: attempts,
|
||||
isPrimary,
|
||||
requestedModelMatched: requestedModel,
|
||||
fallbackConfigured: hasFallbackCandidates,
|
||||
});
|
||||
}
|
||||
const notFoundAttempt =
|
||||
i > 0 ? attempts.find((a) => a.reason === "model_not_found") : undefined;
|
||||
if (notFoundAttempt) {
|
||||
@@ -637,6 +675,23 @@ export async function runWithModelFallback<T>(params: {
|
||||
status: described.status,
|
||||
code: described.code,
|
||||
});
|
||||
logModelFallbackDecision({
|
||||
decision: "candidate_failed",
|
||||
runId: params.runId,
|
||||
requestedProvider: params.provider,
|
||||
requestedModel: params.model,
|
||||
candidate,
|
||||
attempt: i + 1,
|
||||
total: candidates.length,
|
||||
reason: described.reason,
|
||||
status: described.status,
|
||||
code: described.code,
|
||||
error: described.message,
|
||||
nextCandidate: candidates[i + 1],
|
||||
isPrimary,
|
||||
requestedModelMatched: requestedModel,
|
||||
fallbackConfigured: hasFallbackCandidates,
|
||||
});
|
||||
await params.onError?.({
|
||||
provider: candidate.provider,
|
||||
model: candidate.model,
|
||||
|
||||
15
src/agents/model-fallback.types.ts
Normal file
15
src/agents/model-fallback.types.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
import type { FailoverReason } from "./pi-embedded-helpers.js";
|
||||
|
||||
export type ModelCandidate = {
|
||||
provider: string;
|
||||
model: string;
|
||||
};
|
||||
|
||||
export type FallbackAttempt = {
|
||||
provider: string;
|
||||
model: string;
|
||||
error: string;
|
||||
reason?: FailoverReason;
|
||||
status?: number;
|
||||
code?: string;
|
||||
};
|
||||
@@ -2,8 +2,10 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { registerLogTransport, resetLogger, setLoggerOverride } from "../logging/logger.js";
|
||||
import { redactIdentifier } from "../logging/redact-identifier.js";
|
||||
import type { AuthProfileFailureReason } from "./auth-profiles.js";
|
||||
import type { EmbeddedRunAttemptResult } from "./pi-embedded-runner/run/types.js";
|
||||
|
||||
@@ -51,6 +53,7 @@ vi.mock("./models-config.js", async (importOriginal) => {
|
||||
});
|
||||
|
||||
let runEmbeddedPiAgent: typeof import("./pi-embedded-runner/run.js").runEmbeddedPiAgent;
|
||||
let unregisterLogTransport: (() => void) | undefined;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ runEmbeddedPiAgent } = await import("./pi-embedded-runner/run.js"));
|
||||
@@ -64,6 +67,13 @@ beforeEach(() => {
|
||||
sleepWithAbortMock.mockClear();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
unregisterLogTransport?.();
|
||||
unregisterLogTransport = undefined;
|
||||
setLoggerOverride(null);
|
||||
resetLogger();
|
||||
});
|
||||
|
||||
const baseUsage = {
|
||||
input: 0,
|
||||
output: 0,
|
||||
@@ -720,6 +730,61 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
||||
expect(sleepWithAbortMock).toHaveBeenCalledWith(321, undefined);
|
||||
});
|
||||
|
||||
it("logs structured failover decision metadata for overloaded assistant rotation", async () => {
|
||||
const records: Array<Record<string, unknown>> = [];
|
||||
setLoggerOverride({
|
||||
level: "trace",
|
||||
consoleLevel: "silent",
|
||||
file: path.join(os.tmpdir(), `openclaw-auth-rotation-${Date.now()}.log`),
|
||||
});
|
||||
unregisterLogTransport = registerLogTransport((record) => {
|
||||
records.push(record);
|
||||
});
|
||||
|
||||
await runAutoPinnedRotationCase({
|
||||
errorMessage:
|
||||
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":"req_overload"}',
|
||||
sessionKey: "agent:test:overloaded-logging",
|
||||
runId: "run:overloaded-logging",
|
||||
});
|
||||
|
||||
const decisionRecord = records.find(
|
||||
(record) =>
|
||||
record["2"] === "embedded run failover decision" &&
|
||||
record["1"] &&
|
||||
typeof record["1"] === "object" &&
|
||||
(record["1"] as Record<string, unknown>).decision === "rotate_profile",
|
||||
);
|
||||
|
||||
expect(decisionRecord).toBeDefined();
|
||||
const safeProfileId = redactIdentifier("openai:p1", { len: 12 });
|
||||
expect((decisionRecord as Record<string, unknown>)["1"]).toMatchObject({
|
||||
event: "embedded_run_failover_decision",
|
||||
runId: "run:overloaded-logging",
|
||||
decision: "rotate_profile",
|
||||
failoverReason: "overloaded",
|
||||
profileId: safeProfileId,
|
||||
providerErrorType: "overloaded_error",
|
||||
rawErrorPreview: expect.stringContaining('"request_id":"sha256:'),
|
||||
});
|
||||
|
||||
const stateRecord = records.find(
|
||||
(record) =>
|
||||
record["2"] === "auth profile failure state updated" &&
|
||||
record["1"] &&
|
||||
typeof record["1"] === "object" &&
|
||||
(record["1"] as Record<string, unknown>).profileId === safeProfileId,
|
||||
);
|
||||
|
||||
expect(stateRecord).toBeDefined();
|
||||
expect((stateRecord as Record<string, unknown>)["1"]).toMatchObject({
|
||||
event: "auth_profile_failure_state_updated",
|
||||
runId: "run:overloaded-logging",
|
||||
profileId: safeProfileId,
|
||||
reason: "overloaded",
|
||||
});
|
||||
});
|
||||
|
||||
it("rotates for overloaded prompt failures across auto-pinned profiles", async () => {
|
||||
const { usageStats } = await runAutoPinnedPromptErrorRotationCase({
|
||||
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
|
||||
|
||||
@@ -763,6 +763,7 @@ export async function runEmbeddedPiAgent(
|
||||
reason,
|
||||
cfg: params.config,
|
||||
agentDir,
|
||||
runId: params.runId,
|
||||
});
|
||||
};
|
||||
const resolveAuthProfileFailureReason = (
|
||||
|
||||
@@ -199,6 +199,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
const onToolResult = params.opts?.onToolResult;
|
||||
const fallbackResult = await runWithModelFallback({
|
||||
...resolveModelFallbackOptions(params.followupRun.run),
|
||||
runId,
|
||||
run: (provider, model, runOptions) => {
|
||||
// Notify that model selection is complete (including after fallback).
|
||||
// This allows responsePrefix template interpolation with the actual model.
|
||||
|
||||
@@ -474,6 +474,7 @@ export async function runMemoryFlushIfNeeded(params: {
|
||||
try {
|
||||
await runWithModelFallback({
|
||||
...resolveModelFallbackOptions(params.followupRun.run),
|
||||
runId: flushRunId,
|
||||
run: async (provider, model, runOptions) => {
|
||||
const { authProfile, embeddedContext, senderContext } = buildEmbeddedRunContexts({
|
||||
run: params.followupRun.run,
|
||||
|
||||
@@ -159,6 +159,7 @@ export function createFollowupRunner(params: {
|
||||
cfg: queued.run.config,
|
||||
provider: queued.run.provider,
|
||||
model: queued.run.model,
|
||||
runId,
|
||||
agentDir: queued.run.agentDir,
|
||||
fallbacksOverride: resolveRunModelFallbacksOverride({
|
||||
cfg: queued.run.config,
|
||||
|
||||
@@ -1103,6 +1103,7 @@ async function agentCommandInternal(
|
||||
cfg,
|
||||
provider,
|
||||
model,
|
||||
runId,
|
||||
agentDir,
|
||||
fallbacksOverride: effectiveFallbacksOverride,
|
||||
run: (providerOverride, modelOverride, runOptions) => {
|
||||
|
||||
@@ -553,6 +553,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
cfg: cfgWithAgentDefaults,
|
||||
provider,
|
||||
model,
|
||||
runId: cronSession.sessionEntry.sessionId,
|
||||
agentDir,
|
||||
fallbacksOverride:
|
||||
payloadFallbacks ?? resolveAgentModelFallbacksOverride(params.cfg, agentId),
|
||||
|
||||
Reference in New Issue
Block a user