From e6383a2c13e59c14fccd56a04417f869dac88bd6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 21:27:03 +0100 Subject: [PATCH] fix(gateway): probe port liveness for stale lock recovery Co-authored-by: Operative-001 <261882263+Operative-001@users.noreply.github.com> --- CHANGELOG.md | 1 + src/cli/gateway-cli/run-loop.test.ts | 44 +++++++++++++++-- src/cli/gateway-cli/run-loop.ts | 5 +- src/cli/gateway-cli/run.ts | 1 + src/infra/gateway-lock.test.ts | 74 ++++++++++++++++++++++++++++ src/infra/gateway-lock.ts | 46 +++++++++++++++-- 6 files changed, 163 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a3fd8512..bfcc35552 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ Docs: https://docs.openclaw.ai - CLI/Sessions: pass the configured sessions directory when resolving transcript paths in `agentCommand`, so custom `session.store` locations resume sessions reliably. Thanks @davidrudduck. - Gateway/Chat UI: strip inline reply/audio directive tags from non-streaming final webchat broadcasts (including `chat.inject`) while preserving empty-string message content when tags are the entire reply. (#23298) Thanks @SidQin-cyber. - Gateway/Restart: fix restart-loop edge cases by keeping `openclaw.mjs -> dist/entry.js` bootstrap detection explicit, reacquiring the gateway lock for in-process restart fallback paths, and tightening restart-loop regression coverage. (#23416) Thanks @jeffwnli. +- Gateway/Lock: use optional gateway-port reachability as a primary stale-lock liveness signal (and wire gateway run-loop lock acquisition to the resolved port), reducing false "already running" lockouts after unclean exits. (#23760) Thanks @Operative-001. - Signal/Monitor: treat user-initiated abort shutdowns as clean exits when auto-started `signal-cli` is terminated, while still surfacing unexpected daemon exits as startup/runtime failures. (#23379) Thanks @frankekn. - Channels/Dedupe: centralize plugin dedupe primitives in plugin SDK (memory + persistent), move Feishu inbound dedupe to a namespace-scoped persistent store, and reuse shared dedupe cache logic for Zalo webhook replay + Tlon processed-message tracking to reduce duplicate handling during reconnect/replay paths. (#23377) Thanks @SidQin-cyber. - Channels/Delivery: remove hardcoded WhatsApp delivery fallbacks; require explicit/session channel context or auto-pick the sole configured channel when unambiguous. (#23357) Thanks @lbo728. diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 592f5bd46..e03073878 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it, vi } from "vitest"; import type { GatewayBonjourBeacon } from "../../infra/bonjour-discovery.js"; import { pickBeaconHost, pickGatewayPort } from "./discover.js"; -const acquireGatewayLock = vi.fn(async () => ({ +const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({ release: vi.fn(async () => {}), })); const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); @@ -22,7 +22,7 @@ const gatewayLog = { }; vi.mock("../../infra/gateway-lock.js", () => ({ - acquireGatewayLock: () => acquireGatewayLock(), + acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts), })); vi.mock("../../infra/restart.js", () => ({ @@ -109,12 +109,17 @@ function createSignaledStart(close: GatewayCloseFn) { return { start, started }; } -async function runLoopWithStart(params: { start: ReturnType; runtime: LoopRuntime }) { +async function runLoopWithStart(params: { + start: ReturnType; + runtime: LoopRuntime; + lockPort?: number; +}) { vi.resetModules(); const { runGatewayLoop } = await import("./run-loop.js"); const loopPromise = runGatewayLoop({ start: params.start as unknown as Parameters[0]["start"], runtime: params.runtime, + lockPort: params.lockPort, }); return { loopPromise }; } @@ -276,6 +281,39 @@ describe("runGatewayLoop", () => { }); }); + it("forwards lockPort to initial and restart lock acquisitions", async () => { + vi.clearAllMocks(); + + await withIsolatedSignals(async () => { + const closeFirst = vi.fn(async () => {}); + const closeSecond = vi.fn(async () => {}); + restartGatewayProcessWithFreshPid.mockReturnValueOnce({ mode: "disabled" }); + + const start = vi + .fn() + .mockResolvedValueOnce({ close: closeFirst }) + .mockResolvedValueOnce({ close: closeSecond }) + .mockRejectedValueOnce(new Error("stop-loop")); + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; + const { runGatewayLoop } = await import("./run-loop.js"); + const loopPromise = runGatewayLoop({ + start: start as unknown as Parameters[0]["start"], + runtime: runtime as unknown as Parameters[0]["runtime"], + lockPort: 18789, + }); + + await new Promise((resolve) => setImmediate(resolve)); + process.emit("SIGUSR1"); + await new Promise((resolve) => setImmediate(resolve)); + process.emit("SIGUSR1"); + + await expect(loopPromise).rejects.toThrow("stop-loop"); + expect(acquireGatewayLock).toHaveBeenNthCalledWith(1, { port: 18789 }); + expect(acquireGatewayLock).toHaveBeenNthCalledWith(2, { port: 18789 }); + expect(acquireGatewayLock).toHaveBeenNthCalledWith(3, { port: 18789 }); + }); + }); + it("exits when lock reacquire fails during in-process restart fallback", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 842b5544f..0e43faed3 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -22,8 +22,9 @@ type GatewayRunSignalAction = "stop" | "restart"; export async function runGatewayLoop(params: { start: () => Promise>>; runtime: typeof defaultRuntime; + lockPort?: number; }) { - let lock = await acquireGatewayLock(); + let lock = await acquireGatewayLock({ port: params.lockPort }); let server: Awaited> | null = null; let shuttingDown = false; let restartResolver: (() => void) | null = null; @@ -47,7 +48,7 @@ export async function runGatewayLoop(params: { }; const reacquireLockForInProcessRestart = async (): Promise => { try { - lock = await acquireGatewayLock(); + lock = await acquireGatewayLock({ port: params.lockPort }); return true; } catch (err) { gatewayLog.error(`failed to reacquire gateway lock for in-process restart: ${String(err)}`); diff --git a/src/cli/gateway-cli/run.ts b/src/cli/gateway-cli/run.ts index 74c8394b5..0f494812f 100644 --- a/src/cli/gateway-cli/run.ts +++ b/src/cli/gateway-cli/run.ts @@ -317,6 +317,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) { try { await runGatewayLoop({ runtime: defaultRuntime, + lockPort: port, start: async () => await startGatewayServer(port, { bind, diff --git a/src/infra/gateway-lock.test.ts b/src/infra/gateway-lock.test.ts index 6f3826bfc..072a6e866 100644 --- a/src/infra/gateway-lock.test.ts +++ b/src/infra/gateway-lock.test.ts @@ -1,6 +1,7 @@ import { createHash } from "node:crypto"; import fsSync from "node:fs"; import fs from "node:fs/promises"; +import net from "node:net"; import os from "node:os"; import path from "node:path"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; @@ -129,6 +130,35 @@ async function acquireStaleLinuxLock(env: NodeJS.ProcessEnv) { staleProcSpy.mockRestore(); } +async function listenOnLoopbackPort() { + const server = net.createServer(); + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + server.off("error", reject); + resolve(); + }); + }); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("failed to resolve loopback test port"); + } + return { + port: address.port, + close: async () => { + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err); + return; + } + resolve(); + }); + }); + }, + }; +} + describe("gateway lock", () => { beforeAll(async () => { fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gateway-lock-")); @@ -227,6 +257,50 @@ describe("gateway lock", () => { statSpy.mockRestore(); }); + it("treats lock as stale when owner pid is alive but configured port is free", async () => { + vi.useRealTimers(); + const env = await makeEnv(); + await writeLockFile(env, { + startTime: 111, + createdAt: new Date().toISOString(), + }); + const listener = await listenOnLoopbackPort(); + const port = listener.port; + await listener.close(); + + const lock = await acquireForTest(env, { + timeoutMs: 80, + pollIntervalMs: 5, + staleMs: 10_000, + platform: "darwin", + port, + }); + expect(lock).not.toBeNull(); + await lock?.release(); + }); + + it("keeps lock when configured port is busy and owner pid is alive", async () => { + vi.useRealTimers(); + const env = await makeEnv(); + await writeLockFile(env, { + startTime: 111, + createdAt: new Date().toISOString(), + }); + const listener = await listenOnLoopbackPort(); + try { + const pending = acquireForTest(env, { + timeoutMs: 20, + pollIntervalMs: 2, + staleMs: 10_000, + platform: "darwin", + port: listener.port, + }); + await expect(pending).rejects.toBeInstanceOf(GatewayLockError); + } finally { + await listener.close(); + } + }); + it("returns null when multi-gateway override is enabled", async () => { const env = await makeEnv(); const lock = await acquireGatewayLock({ diff --git a/src/infra/gateway-lock.ts b/src/infra/gateway-lock.ts index 34300f954..6e6b71cf2 100644 --- a/src/infra/gateway-lock.ts +++ b/src/infra/gateway-lock.ts @@ -1,6 +1,7 @@ import { createHash } from "node:crypto"; import fsSync from "node:fs"; import fs from "node:fs/promises"; +import net from "node:net"; import path from "node:path"; import { resolveConfigPath, resolveGatewayLockDir, resolveStateDir } from "../config/paths.js"; import { isPidAlive } from "../shared/pid-alive.js"; @@ -8,6 +9,7 @@ import { isPidAlive } from "../shared/pid-alive.js"; const DEFAULT_TIMEOUT_MS = 5000; const DEFAULT_POLL_INTERVAL_MS = 100; const DEFAULT_STALE_MS = 30_000; +const DEFAULT_PORT_PROBE_TIMEOUT_MS = 1000; type LockPayload = { pid: number; @@ -29,6 +31,7 @@ export type GatewayLockOptions = { staleMs?: number; allowInTests?: boolean; platform?: NodeJS.Platform; + port?: number; }; export class GatewayLockError extends Error { @@ -100,11 +103,47 @@ function readLinuxStartTime(pid: number): number | null { } } -function resolveGatewayOwnerStatus( +async function checkPortFree(port: number, host = "127.0.0.1"): Promise { + return await new Promise((resolve) => { + const socket = net.createConnection({ port, host }); + let settled = false; + const finish = (result: boolean) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + socket.removeAllListeners(); + socket.destroy(); + resolve(result); + }; + const timer = setTimeout(() => { + // Conservative for liveness checks: timeout usually means no responsive + // local listener, so treat the lock owner as stale. + finish(true); + }, DEFAULT_PORT_PROBE_TIMEOUT_MS); + socket.once("connect", () => { + finish(false); + }); + socket.once("error", () => { + finish(true); + }); + }); +} + +async function resolveGatewayOwnerStatus( pid: number, payload: LockPayload | null, platform: NodeJS.Platform, -): LockOwnerStatus { + port: number | undefined, +): Promise { + if (port != null) { + const portFree = await checkPortFree(port); + if (portFree) { + return "dead"; + } + } + if (!isPidAlive(pid)) { return "dead"; } @@ -178,6 +217,7 @@ export async function acquireGatewayLock( const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; const staleMs = opts.staleMs ?? DEFAULT_STALE_MS; const platform = opts.platform ?? process.platform; + const port = opts.port; const { lockPath, configPath } = resolveGatewayLockPath(env); await fs.mkdir(path.dirname(lockPath), { recursive: true }); @@ -214,7 +254,7 @@ export async function acquireGatewayLock( lastPayload = await readLockPayload(lockPath); const ownerPid = lastPayload?.pid; const ownerStatus = ownerPid - ? resolveGatewayOwnerStatus(ownerPid, lastPayload, platform) + ? await resolveGatewayOwnerStatus(ownerPid, lastPayload, platform, port) : "unknown"; if (ownerStatus === "dead" && ownerPid) { await fs.rm(lockPath, { force: true });