fix(voice-call): verify call status with provider before loading stale calls

On gateway restart, persisted non-terminal calls are now verified with
the provider (Twilio/Plivo/Telnyx) before being restored to memory.
This prevents phantom calls from blocking the concurrent call limit.

- Add getCallStatus() to VoiceCallProvider interface
- Implement for all providers with SSRF-guarded fetch
- Transient errors (5xx, network) keep the call with timer fallback
- 404/known-terminal statuses drop the call
- Restart max-duration timers for restored answered calls
- Skip calls older than maxDurationSeconds or without providerCallId
This commit is contained in:
garnetlyx
2026-03-01 22:13:24 -08:00
parent 3049ca840f
commit ffa7c13c9b
11 changed files with 436 additions and 26 deletions

View File

@@ -1,3 +1,4 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
@@ -5,6 +6,8 @@ import { VoiceCallConfigSchema } from "./config.js";
import { CallManager } from "./manager.js";
import type { VoiceCallProvider } from "./providers/base.js";
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@@ -22,6 +25,7 @@ class FakeProvider implements VoiceCallProvider {
readonly hangupCalls: HangupCallInput[] = [];
readonly startListeningCalls: StartListeningInput[] = [];
readonly stopListeningCalls: StopListeningInput[] = [];
getCallStatusResult: GetCallStatusResult = { status: "in-progress", isTerminal: false };
constructor(name: "plivo" | "twilio" = "plivo") {
this.name = name;
@@ -48,6 +52,9 @@ class FakeProvider implements VoiceCallProvider {
async stopListening(input: StopListeningInput): Promise<void> {
this.stopListeningCalls.push(input);
}
async getCallStatus(_input: GetCallStatusInput): Promise<GetCallStatusResult> {
return this.getCallStatusResult;
}
}
let storeSeq = 0;
@@ -57,13 +64,13 @@ function createTestStorePath(): string {
return path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}-${storeSeq}`);
}
function createManagerHarness(
async function createManagerHarness(
configOverrides: Record<string, unknown> = {},
provider = new FakeProvider(),
): {
): Promise<{
manager: CallManager;
provider: FakeProvider;
} {
}> {
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
@@ -71,7 +78,7 @@ function createManagerHarness(
...configOverrides,
});
const manager = new CallManager(config, createTestStorePath());
manager.initialize(provider, "https://example.com/voice/webhook");
await manager.initialize(provider, "https://example.com/voice/webhook");
return { manager, provider };
}
@@ -87,7 +94,7 @@ function markCallAnswered(manager: CallManager, callId: string, eventId: string)
describe("CallManager", () => {
it("upgrades providerCallId mapping when provider ID changes", async () => {
const { manager } = createManagerHarness();
const { manager } = await createManagerHarness();
const { callId, success, error } = await manager.initiateCall("+15550000001");
expect(success).toBe(true);
@@ -112,7 +119,7 @@ describe("CallManager", () => {
});
it("speaks initial message on answered for notify mode (non-Twilio)", async () => {
const { manager, provider } = createManagerHarness();
const { manager, provider } = await createManagerHarness();
const { callId, success } = await manager.initiateCall("+15550000002", undefined, {
message: "Hello there",
@@ -134,8 +141,8 @@ describe("CallManager", () => {
expect(provider.playTtsCalls[0]?.text).toBe("Hello there");
});
it("rejects inbound calls with missing caller ID when allowlist enabled", () => {
const { manager, provider } = createManagerHarness({
it("rejects inbound calls with missing caller ID when allowlist enabled", async () => {
const { manager, provider } = await createManagerHarness({
inboundPolicy: "allowlist",
allowFrom: ["+15550001234"],
});
@@ -155,8 +162,8 @@ describe("CallManager", () => {
expect(provider.hangupCalls[0]?.providerCallId).toBe("provider-missing");
});
it("rejects inbound calls with anonymous caller ID when allowlist enabled", () => {
const { manager, provider } = createManagerHarness({
it("rejects inbound calls with anonymous caller ID when allowlist enabled", async () => {
const { manager, provider } = await createManagerHarness({
inboundPolicy: "allowlist",
allowFrom: ["+15550001234"],
});
@@ -177,8 +184,8 @@ describe("CallManager", () => {
expect(provider.hangupCalls[0]?.providerCallId).toBe("provider-anon");
});
it("rejects inbound calls that only match allowlist suffixes", () => {
const { manager, provider } = createManagerHarness({
it("rejects inbound calls that only match allowlist suffixes", async () => {
const { manager, provider } = await createManagerHarness({
inboundPolicy: "allowlist",
allowFrom: ["+15550001234"],
});
@@ -199,8 +206,8 @@ describe("CallManager", () => {
expect(provider.hangupCalls[0]?.providerCallId).toBe("provider-suffix");
});
it("rejects duplicate inbound events with a single hangup call", () => {
const { manager, provider } = createManagerHarness({
it("rejects duplicate inbound events with a single hangup call", async () => {
const { manager, provider } = await createManagerHarness({
inboundPolicy: "disabled",
});
@@ -231,8 +238,8 @@ describe("CallManager", () => {
expect(provider.hangupCalls[0]?.providerCallId).toBe("provider-dup");
});
it("accepts inbound calls that exactly match the allowlist", () => {
const { manager } = createManagerHarness({
it("accepts inbound calls that exactly match the allowlist", async () => {
const { manager } = await createManagerHarness({
inboundPolicy: "allowlist",
allowFrom: ["+15550001234"],
});
@@ -252,7 +259,7 @@ describe("CallManager", () => {
});
it("completes a closed-loop turn without live audio", async () => {
const { manager, provider } = createManagerHarness({
const { manager, provider } = await createManagerHarness({
transcriptTimeoutMs: 5000,
});
@@ -292,7 +299,7 @@ describe("CallManager", () => {
});
it("rejects overlapping continueCall requests for the same call", async () => {
const { manager, provider } = createManagerHarness({
const { manager, provider } = await createManagerHarness({
transcriptTimeoutMs: 5000,
});
@@ -324,7 +331,7 @@ describe("CallManager", () => {
});
it("ignores speech events with mismatched turnToken while waiting for transcript", async () => {
const { manager, provider } = createManagerHarness(
const { manager, provider } = await createManagerHarness(
{
transcriptTimeoutMs: 5000,
},
@@ -379,7 +386,7 @@ describe("CallManager", () => {
});
it("tracks latency metadata across multiple closed-loop turns", async () => {
const { manager, provider } = createManagerHarness({
const { manager, provider } = await createManagerHarness({
transcriptTimeoutMs: 5000,
});
@@ -432,7 +439,7 @@ describe("CallManager", () => {
});
it("handles repeated closed-loop turns without waiter churn", async () => {
const { manager, provider } = createManagerHarness({
const { manager, provider } = await createManagerHarness({
transcriptTimeoutMs: 5000,
});
@@ -465,3 +472,152 @@ describe("CallManager", () => {
expect(provider.stopListeningCalls).toHaveLength(5);
});
});
// ---------------------------------------------------------------------------
// Call verification on restore
// ---------------------------------------------------------------------------
function writeCallsToStore(storePath: string, calls: Record<string, unknown>[]): void {
fs.mkdirSync(storePath, { recursive: true });
const logPath = path.join(storePath, "calls.jsonl");
const lines = calls.map((c) => JSON.stringify(c)).join("\n") + "\n";
fs.writeFileSync(logPath, lines);
}
function makePersistedCall(overrides: Record<string, unknown> = {}): Record<string, unknown> {
return {
callId: `call-${Date.now()}-${Math.random().toString(36).slice(2)}`,
providerCallId: `prov-${Date.now()}-${Math.random().toString(36).slice(2)}`,
provider: "plivo",
direction: "outbound",
state: "answered",
from: "+15550000000",
to: "+15550000001",
startedAt: Date.now() - 30_000,
answeredAt: Date.now() - 25_000,
transcript: [],
processedEventIds: [],
...overrides,
};
}
describe("CallManager verification on restore", () => {
it("skips stale calls reported terminal by provider", async () => {
const storePath = createTestStorePath();
const call = makePersistedCall();
writeCallsToStore(storePath, [call]);
const provider = new FakeProvider();
provider.getCallStatusResult = { status: "completed", isTerminal: true };
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
});
const manager = new CallManager(config, storePath);
await manager.initialize(provider, "https://example.com/voice/webhook");
expect(manager.getActiveCalls()).toHaveLength(0);
});
it("keeps calls reported active by provider", async () => {
const storePath = createTestStorePath();
const call = makePersistedCall();
writeCallsToStore(storePath, [call]);
const provider = new FakeProvider();
provider.getCallStatusResult = { status: "in-progress", isTerminal: false };
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
});
const manager = new CallManager(config, storePath);
await manager.initialize(provider, "https://example.com/voice/webhook");
expect(manager.getActiveCalls()).toHaveLength(1);
expect(manager.getActiveCalls()[0]?.callId).toBe(call.callId);
});
it("keeps calls when provider returns unknown (transient error)", async () => {
const storePath = createTestStorePath();
const call = makePersistedCall();
writeCallsToStore(storePath, [call]);
const provider = new FakeProvider();
provider.getCallStatusResult = { status: "error", isTerminal: false, isUnknown: true };
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
});
const manager = new CallManager(config, storePath);
await manager.initialize(provider, "https://example.com/voice/webhook");
expect(manager.getActiveCalls()).toHaveLength(1);
});
it("skips calls older than maxDurationSeconds", async () => {
const storePath = createTestStorePath();
const call = makePersistedCall({
startedAt: Date.now() - 600_000, // 10 minutes ago
answeredAt: Date.now() - 590_000,
});
writeCallsToStore(storePath, [call]);
const provider = new FakeProvider();
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
maxDurationSeconds: 300, // 5 minutes
});
const manager = new CallManager(config, storePath);
await manager.initialize(provider, "https://example.com/voice/webhook");
expect(manager.getActiveCalls()).toHaveLength(0);
});
it("skips calls without providerCallId", async () => {
const storePath = createTestStorePath();
const call = makePersistedCall({ providerCallId: undefined, state: "initiated" });
writeCallsToStore(storePath, [call]);
const provider = new FakeProvider();
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
});
const manager = new CallManager(config, storePath);
await manager.initialize(provider, "https://example.com/voice/webhook");
expect(manager.getActiveCalls()).toHaveLength(0);
});
it("keeps call when getCallStatus throws (verification failure)", async () => {
const storePath = createTestStorePath();
const call = makePersistedCall();
writeCallsToStore(storePath, [call]);
const provider = new FakeProvider();
provider.getCallStatus = async () => {
throw new Error("network failure");
};
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
});
const manager = new CallManager(config, storePath);
await manager.initialize(provider, "https://example.com/voice/webhook");
expect(manager.getActiveCalls()).toHaveLength(1);
});
});

View File

@@ -13,8 +13,15 @@ import {
speakInitialMessage as speakInitialMessageWithContext,
} from "./manager/outbound.js";
import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./manager/store.js";
import { startMaxDurationTimer } from "./manager/timers.js";
import type { VoiceCallProvider } from "./providers/base.js";
import type { CallId, CallRecord, NormalizedEvent, OutboundCallOptions } from "./types.js";
import {
TerminalStates,
type CallId,
type CallRecord,
type NormalizedEvent,
type OutboundCallOptions,
} from "./types.js";
import { resolveUserPath } from "./utils.js";
function resolveDefaultStoreBase(config: VoiceCallConfig, storePath?: string): string {
@@ -65,18 +72,126 @@ export class CallManager {
/**
* Initialize the call manager with a provider.
* Verifies persisted calls with the provider and restarts timers.
*/
initialize(provider: VoiceCallProvider, webhookUrl: string): void {
async initialize(provider: VoiceCallProvider, webhookUrl: string): Promise<void> {
this.provider = provider;
this.webhookUrl = webhookUrl;
fs.mkdirSync(this.storePath, { recursive: true });
const persisted = loadActiveCallsFromStore(this.storePath);
this.activeCalls = persisted.activeCalls;
this.providerCallIdMap = persisted.providerCallIdMap;
this.processedEventIds = persisted.processedEventIds;
this.rejectedProviderCallIds = persisted.rejectedProviderCallIds;
const verified = await this.verifyRestoredCalls(provider, persisted.activeCalls);
this.activeCalls = verified;
// Rebuild providerCallIdMap from verified calls only
this.providerCallIdMap = new Map();
for (const [callId, call] of verified) {
if (call.providerCallId) {
this.providerCallIdMap.set(call.providerCallId, callId);
}
}
// Restart max-duration timers for restored calls that are past the answered state
for (const [callId, call] of verified) {
if (call.answeredAt && !TerminalStates.has(call.state)) {
const elapsed = Date.now() - call.answeredAt;
const maxDurationMs = this.config.maxDurationSeconds * 1000;
if (elapsed >= maxDurationMs) {
// Already expired — remove instead of keeping
verified.delete(callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
console.log(
`[voice-call] Skipping restored call ${callId} (max duration already elapsed)`,
);
continue;
}
startMaxDurationTimer({
ctx: this.getContext(),
callId,
onTimeout: async (id) => {
await endCallWithContext(this.getContext(), id);
},
});
console.log(`[voice-call] Restarted max-duration timer for restored call ${callId}`);
}
}
if (verified.size > 0) {
console.log(`[voice-call] Restored ${verified.size} active call(s) from store`);
}
}
/**
* Verify persisted calls with the provider before restoring.
* Calls without providerCallId or older than maxDurationSeconds are skipped.
* Transient provider errors keep the call (rely on timer fallback).
*/
private async verifyRestoredCalls(
provider: VoiceCallProvider,
candidates: Map<CallId, CallRecord>,
): Promise<Map<CallId, CallRecord>> {
if (candidates.size === 0) {
return new Map();
}
const maxAgeMs = this.config.maxDurationSeconds * 1000;
const now = Date.now();
const verified = new Map<CallId, CallRecord>();
const verifyTasks: Array<{ callId: CallId; call: CallRecord; promise: Promise<void> }> = [];
for (const [callId, call] of candidates) {
// Skip calls without a provider ID — can't verify
if (!call.providerCallId) {
console.log(`[voice-call] Skipping restored call ${callId} (no providerCallId)`);
continue;
}
// Skip calls older than maxDurationSeconds (time-based fallback)
if (now - call.startedAt > maxAgeMs) {
console.log(
`[voice-call] Skipping restored call ${callId} (older than maxDurationSeconds)`,
);
continue;
}
const task = {
callId,
call,
promise: provider
.getCallStatus({ providerCallId: call.providerCallId })
.then((result) => {
if (result.isTerminal) {
console.log(
`[voice-call] Skipping restored call ${callId} (provider status: ${result.status})`,
);
} else if (result.isUnknown) {
console.log(
`[voice-call] Keeping restored call ${callId} (provider status unknown, relying on timer)`,
);
verified.set(callId, call);
} else {
verified.set(callId, call);
}
})
.catch(() => {
// Verification failed entirely — keep the call, rely on timer
console.log(
`[voice-call] Keeping restored call ${callId} (verification failed, relying on timer)`,
);
verified.set(callId, call);
}),
};
verifyTasks.push(task);
}
await Promise.allSettled(verifyTasks.map((t) => t.promise));
return verified;
}
/**

View File

@@ -41,6 +41,7 @@ function createProvider(overrides: Partial<VoiceCallProvider> = {}): VoiceCallPr
playTts: async () => {},
startListening: async () => {},
stopListening: async () => {},
getCallStatus: async () => ({ status: "in-progress", isTerminal: false }),
...overrides,
};
}

View File

@@ -1,4 +1,6 @@
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@@ -65,4 +67,12 @@ export interface VoiceCallProvider {
* Stop listening for user speech (deactivate STT).
*/
stopListening(input: StopListeningInput): Promise<void>;
/**
* Query provider for current call status.
* Used to verify persisted calls are still active on restart.
* Must return `isUnknown: true` for transient errors (network, 5xx)
* so the caller can keep the call and rely on timer-based fallback.
*/
getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult>;
}

View File

@@ -1,6 +1,8 @@
import crypto from "node:crypto";
import type {
EndReason,
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@@ -166,4 +168,12 @@ export class MockProvider implements VoiceCallProvider {
async stopListening(_input: StopListeningInput): Promise<void> {
// No-op for mock
}
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
const id = input.providerCallId.toLowerCase();
if (id.includes("stale") || id.includes("ended") || id.includes("completed")) {
return { status: "completed", isTerminal: true };
}
return { status: "in-progress", isTerminal: false };
}
}

View File

@@ -2,6 +2,8 @@ import crypto from "node:crypto";
import type { PlivoConfig, WebhookSecurityConfig } from "../config.js";
import { getHeader } from "../http-headers.js";
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@@ -441,6 +443,41 @@ export class PlivoProvider implements VoiceCallProvider {
// GetInput ends automatically when speech ends.
}
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
const terminalStatuses = new Set([
"completed",
"busy",
"failed",
"timeout",
"no-answer",
"cancel",
"machine",
"hangup",
]);
try {
const data = await guardedJsonApiRequest<{ call_status?: string }>({
url: `${this.baseUrl}/Call/${input.providerCallId}/`,
method: "GET",
headers: {
Authorization: `Basic ${Buffer.from(`${this.authId}:${this.authToken}`).toString("base64")}`,
},
allowNotFound: true,
allowedHostnames: [this.apiHost],
auditContext: "plivo-get-call-status",
errorPrefix: "Plivo get call status error",
});
if (!data) {
return { status: "not-found", isTerminal: true };
}
const status = data.call_status ?? "unknown";
return { status, isTerminal: terminalStatuses.has(status) };
} catch {
return { status: "error", isTerminal: false, isUnknown: true };
}
}
private static normalizeNumber(numberOrSip: string): string {
const trimmed = numberOrSip.trim();
if (trimmed.toLowerCase().startsWith("sip:")) {

View File

@@ -2,6 +2,8 @@ import crypto from "node:crypto";
import type { TelnyxConfig } from "../config.js";
import type {
EndReason,
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@@ -291,6 +293,37 @@ export class TelnyxProvider implements VoiceCallProvider {
{ allowNotFound: true },
);
}
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
try {
const data = await guardedJsonApiRequest<{ data?: { state?: string; is_alive?: boolean } }>({
url: `${this.baseUrl}/calls/${input.providerCallId}`,
method: "GET",
headers: {
Authorization: `Bearer ${this.apiKey}`,
"Content-Type": "application/json",
},
allowNotFound: true,
allowedHostnames: [this.apiHost],
auditContext: "telnyx-get-call-status",
errorPrefix: "Telnyx get call status error",
});
if (!data) {
return { status: "not-found", isTerminal: true };
}
const state = data.data?.state ?? "unknown";
const isAlive = data.data?.is_alive;
// If is_alive is missing, treat as unknown rather than terminal (P1 fix)
if (isAlive === undefined) {
return { status: state, isTerminal: false, isUnknown: true };
}
return { status: state, isTerminal: !isAlive };
} catch {
return { status: "error", isTerminal: false, isUnknown: true };
}
}
}
// -----------------------------------------------------------------------------

View File

@@ -5,6 +5,8 @@ import type { MediaStreamHandler } from "../media-stream.js";
import { chunkAudio } from "../telephony-audio.js";
import type { TelephonyTtsProvider } from "../telephony-tts.js";
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@@ -19,6 +21,7 @@ import type {
} from "../types.js";
import { escapeXml, mapVoiceToPolly } from "../voice-mapping.js";
import type { VoiceCallProvider } from "./base.js";
import { guardedJsonApiRequest } from "./shared/guarded-json-api.js";
import { twilioApiRequest } from "./twilio/api.js";
import { verifyTwilioProviderWebhook } from "./twilio/webhook.js";
@@ -671,6 +674,33 @@ export class TwilioProvider implements VoiceCallProvider {
// Twilio's <Gather> automatically stops on speech end
// No explicit action needed
}
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
const terminalStatuses = new Set(["completed", "failed", "busy", "no-answer", "canceled"]);
try {
const data = await guardedJsonApiRequest<{ status?: string }>({
url: `${this.baseUrl}/Calls/${input.providerCallId}.json`,
method: "GET",
headers: {
Authorization: `Basic ${Buffer.from(`${this.accountSid}:${this.authToken}`).toString("base64")}`,
},
allowNotFound: true,
allowedHostnames: ["api.twilio.com"],
auditContext: "twilio-get-call-status",
errorPrefix: "Twilio get call status error",
});
if (!data) {
return { status: "not-found", isTerminal: true };
}
const status = data.status ?? "unknown";
return { status, isTerminal: terminalStatuses.has(status) };
} catch {
// Transient error — keep the call and rely on timer fallback
return { status: "error", isTerminal: false, isUnknown: true };
}
}
}
// -----------------------------------------------------------------------------

View File

@@ -189,7 +189,7 @@ export async function createVoiceCallRuntime(params: {
}
}
manager.initialize(provider, webhookUrl);
await manager.initialize(provider, webhookUrl);
const stop = async () => {
if (tunnelResult) {

View File

@@ -248,6 +248,23 @@ export type StopListeningInput = {
providerCallId: ProviderCallId;
};
// -----------------------------------------------------------------------------
// Call Status Verification (used on restart to verify persisted calls)
// -----------------------------------------------------------------------------
export type GetCallStatusInput = {
providerCallId: ProviderCallId;
};
export type GetCallStatusResult = {
/** Provider-specific status string (e.g. "completed", "in-progress") */
status: string;
/** True when the provider confirms the call has ended */
isTerminal: boolean;
/** True when the status could not be determined (transient error) */
isUnknown?: boolean;
};
// -----------------------------------------------------------------------------
// Outbound Call Options
// -----------------------------------------------------------------------------

View File

@@ -14,6 +14,7 @@ const provider: VoiceCallProvider = {
playTts: async () => {},
startListening: async () => {},
stopListening: async () => {},
getCallStatus: async () => ({ status: "in-progress", isTerminal: false }),
};
const createConfig = (overrides: Partial<VoiceCallConfig> = {}): VoiceCallConfig => {