fix: replace file-based session store lock with in-process Promise chain mutex (#14498)

* fix: replace file-based session store lock with in-process Promise chain mutex

Node.js is single-threaded, so file-based locking (open('wx') + polling +
stale eviction) is unnecessary and causes timeouts under heavy session load.

Replace with a simple per-storePath Promise chain that serializes access
without any filesystem overhead.

In a 1159-session environment over 3 hours:
- Lock timeouts: 25
- Stuck sessions: 157 (max 1031s, avg 388s)
- Slow listeners: 39 (max 265s, avg 70s)

Root cause: during sessions.json file I/O, await yields control and other
lock requests hit the 10s timeout waiting for the .lock file to be released.

* test: add comprehensive tests for Promise chain mutex lock

- Concurrent access serialization (10 parallel writers, counter integrity)
- Error resilience (single & multiple consecutive throws don't poison queue)
- Independent storePath parallelism (different paths run concurrently)
- LOCK_QUEUES cleanup after completion and after errors
- No .lock file created on disk

Also fix: store caught promise in LOCK_QUEUES to avoid unhandled rejection
warnings when queued fn() throws.

* fix: add timeout to Promise chain mutex to prevent infinite hangs on Windows

* fix(session-store): enforce strict queue timeout + cross-process lock

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Kentaro Kuribayashi
2026-02-13 13:12:59 +09:00
committed by GitHub
parent 13bfd9da83
commit c6ecd2a044
2 changed files with 449 additions and 57 deletions

View File

@@ -0,0 +1,296 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import type { SessionEntry } from "./types.js";
import { sleep } from "../../utils.js";
import {
clearSessionStoreCacheForTest,
getSessionStoreLockQueueSizeForTest,
loadSessionStore,
updateSessionStore,
updateSessionStoreEntry,
withSessionStoreLockForTest,
} from "../sessions.js";
describe("session store lock (Promise chain mutex)", () => {
let tmpDirs: string[] = [];
async function makeTmpStore(
initial: Record<string, unknown> = {},
): Promise<{ dir: string; storePath: string }> {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-test-"));
tmpDirs.push(dir);
const storePath = path.join(dir, "sessions.json");
if (Object.keys(initial).length > 0) {
await fs.writeFile(storePath, JSON.stringify(initial, null, 2), "utf-8");
}
return { dir, storePath };
}
afterEach(async () => {
clearSessionStoreCacheForTest();
for (const dir of tmpDirs) {
await fs.rm(dir, { recursive: true, force: true }).catch(() => undefined);
}
tmpDirs = [];
});
// ── 1. Concurrent access does not corrupt data ──────────────────────
it("serializes concurrent updateSessionStore calls without data loss", async () => {
const key = "agent:main:test";
const { storePath } = await makeTmpStore({
[key]: { sessionId: "s1", updatedAt: 100, counter: 0 },
});
// Launch 10 concurrent read-modify-write cycles.
const N = 10;
await Promise.all(
Array.from({ length: N }, (_, i) =>
updateSessionStore(storePath, async (store) => {
const entry = store[key] as Record<string, unknown>;
// Simulate async work so that without proper serialization
// multiple readers would see the same stale value.
await sleep(Math.random() * 20);
entry.counter = (entry.counter as number) + 1;
entry.tag = `writer-${i}`;
}),
),
);
const store = loadSessionStore(storePath);
expect((store[key] as Record<string, unknown>).counter).toBe(N);
});
it("concurrent updateSessionStoreEntry patches all merge correctly", async () => {
const key = "agent:main:merge";
const { storePath } = await makeTmpStore({
[key]: { sessionId: "s1", updatedAt: 100 },
});
await Promise.all([
updateSessionStoreEntry({
storePath,
sessionKey: key,
update: async () => {
await sleep(30);
return { modelOverride: "model-a" };
},
}),
updateSessionStoreEntry({
storePath,
sessionKey: key,
update: async () => {
await sleep(10);
return { thinkingLevel: "high" as const };
},
}),
updateSessionStoreEntry({
storePath,
sessionKey: key,
update: async () => {
await sleep(20);
return { systemPromptOverride: "custom" };
},
}),
]);
const store = loadSessionStore(storePath);
const entry = store[key];
expect(entry.modelOverride).toBe("model-a");
expect(entry.thinkingLevel).toBe("high");
expect(entry.systemPromptOverride).toBe("custom");
});
// ── 2. Error in fn() does not break queue ───────────────────────────
it("continues processing queued tasks after a preceding task throws", async () => {
const key = "agent:main:err";
const { storePath } = await makeTmpStore({
[key]: { sessionId: "s1", updatedAt: 100 },
});
const errorPromise = updateSessionStore(storePath, async () => {
throw new Error("boom");
});
// Queue a second write immediately after the failing one.
const successPromise = updateSessionStore(storePath, async (store) => {
store[key] = { ...store[key], modelOverride: "after-error" } as unknown as SessionEntry;
});
await expect(errorPromise).rejects.toThrow("boom");
await successPromise; // must resolve, not hang or reject
const store = loadSessionStore(storePath);
expect(store[key]?.modelOverride).toBe("after-error");
});
it("multiple consecutive errors do not permanently poison the queue", async () => {
const key = "agent:main:multi-err";
const { storePath } = await makeTmpStore({
[key]: { sessionId: "s1", updatedAt: 100 },
});
const errors = Array.from({ length: 3 }, (_, i) =>
updateSessionStore(storePath, async () => {
throw new Error(`fail-${i}`);
}),
);
const success = updateSessionStore(storePath, async (store) => {
store[key] = { ...store[key], modelOverride: "recovered" } as unknown as SessionEntry;
});
// All error promises reject.
for (const p of errors) {
await expect(p).rejects.toThrow();
}
// The trailing write succeeds.
await success;
const store = loadSessionStore(storePath);
expect(store[key]?.modelOverride).toBe("recovered");
});
// ── 3. Different storePaths run independently / in parallel ─────────
it("operations on different storePaths execute concurrently", async () => {
const { storePath: pathA } = await makeTmpStore({
a: { sessionId: "a", updatedAt: 100 },
});
const { storePath: pathB } = await makeTmpStore({
b: { sessionId: "b", updatedAt: 100 },
});
const order: string[] = [];
const opA = updateSessionStore(pathA, async (store) => {
order.push("a-start");
await sleep(50);
store.a = { ...store.a, modelOverride: "done-a" } as unknown as SessionEntry;
order.push("a-end");
});
const opB = updateSessionStore(pathB, async (store) => {
order.push("b-start");
await sleep(10);
store.b = { ...store.b, modelOverride: "done-b" } as unknown as SessionEntry;
order.push("b-end");
});
await Promise.all([opA, opB]);
// B should finish before A because they run in parallel and B sleeps less.
expect(order.indexOf("b-end")).toBeLessThan(order.indexOf("a-end"));
expect(loadSessionStore(pathA).a?.modelOverride).toBe("done-a");
expect(loadSessionStore(pathB).b?.modelOverride).toBe("done-b");
});
// ── 4. LOCK_QUEUES cleanup ─────────────────────────────────────────
it("cleans up LOCK_QUEUES entry after all tasks complete", async () => {
const { storePath } = await makeTmpStore({
x: { sessionId: "x", updatedAt: 100 },
});
await updateSessionStore(storePath, async (store) => {
store.x = { ...store.x, modelOverride: "done" } as unknown as SessionEntry;
});
// Allow microtask (finally) to run.
await sleep(0);
expect(getSessionStoreLockQueueSizeForTest()).toBe(0);
});
it("cleans up LOCK_QUEUES entry even after errors", async () => {
const { storePath } = await makeTmpStore({});
await updateSessionStore(storePath, async () => {
throw new Error("fail");
}).catch(() => undefined);
await sleep(0);
expect(getSessionStoreLockQueueSizeForTest()).toBe(0);
});
// ── 5. FIFO order guarantee ──────────────────────────────────────────
it("executes queued operations in FIFO order", async () => {
const key = "agent:main:fifo";
const { storePath } = await makeTmpStore({
[key]: { sessionId: "s1", updatedAt: 100, order: "" },
});
const executionOrder: number[] = [];
// Queue 5 operations sequentially (no awaiting in between).
const promises = Array.from({ length: 5 }, (_, i) =>
updateSessionStore(storePath, async (store) => {
executionOrder.push(i);
const entry = store[key] as Record<string, unknown>;
entry.order = ((entry.order as string) || "") + String(i);
}),
);
await Promise.all(promises);
// Execution order must be 0, 1, 2, 3, 4 (FIFO).
expect(executionOrder).toEqual([0, 1, 2, 3, 4]);
// The store should reflect sequential application.
const store = loadSessionStore(storePath);
expect((store[key] as Record<string, unknown>).order).toBe("01234");
});
it("times out queued operations strictly and does not run them later", async () => {
const { storePath } = await makeTmpStore({
x: { sessionId: "x", updatedAt: 100 },
});
let timedOutRan = false;
const lockHolder = withSessionStoreLockForTest(
storePath,
async () => {
await sleep(80);
},
{ timeoutMs: 2_000 },
);
const timedOut = withSessionStoreLockForTest(
storePath,
async () => {
timedOutRan = true;
},
{ timeoutMs: 20 },
);
await expect(timedOut).rejects.toThrow("timeout waiting for session store lock");
await lockHolder;
await sleep(30);
expect(timedOutRan).toBe(false);
});
it("creates and removes lock file while operation runs", async () => {
const key = "agent:main:no-lock-file";
const { dir, storePath } = await makeTmpStore({
[key]: { sessionId: "s1", updatedAt: 100 },
});
const write = updateSessionStore(storePath, async (store) => {
await sleep(60);
store[key] = { ...store[key], modelOverride: "v" } as unknown as SessionEntry;
});
await sleep(10);
await expect(fs.access(`${storePath}.lock`)).resolves.toBeUndefined();
await write;
const files = await fs.readdir(dir);
const lockFiles = files.filter((f) => f.endsWith(".lock"));
expect(lockFiles).toHaveLength(0);
});
});

View File

@@ -3,6 +3,7 @@ import fs from "node:fs";
import path from "node:path";
import type { MsgContext } from "../../auto-reply/templating.js";
import type { SessionMaintenanceConfig, SessionMaintenanceMode } from "../types.base.js";
import { acquireSessionWriteLock } from "../../agents/session-write-lock.js";
import { parseByteSize } from "../../cli/parse-bytes.js";
import { parseDurationMs } from "../../cli/parse-duration.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
@@ -115,6 +116,28 @@ function normalizeSessionStore(store: Record<string, SessionEntry>): void {
export function clearSessionStoreCacheForTest(): void {
SESSION_STORE_CACHE.clear();
for (const queue of LOCK_QUEUES.values()) {
for (const task of queue.pending) {
task.timedOut = true;
if (task.timer) {
clearTimeout(task.timer);
}
}
}
LOCK_QUEUES.clear();
}
/** Expose lock queue size for tests. */
export function getSessionStoreLockQueueSizeForTest(): number {
return LOCK_QUEUES.size;
}
export async function withSessionStoreLockForTest<T>(
storePath: string,
fn: () => Promise<T>,
opts: SessionStoreLockOptions = {},
): Promise<T> {
return await withSessionStoreLock(storePath, fn, opts);
}
type LoadSessionStoreOptions = {
@@ -584,76 +607,149 @@ type SessionStoreLockOptions = {
staleMs?: number;
};
type SessionStoreLockTask = {
fn: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
timeoutAt?: number;
staleMs: number;
timer?: ReturnType<typeof setTimeout>;
started: boolean;
timedOut: boolean;
};
type SessionStoreLockQueue = {
running: boolean;
pending: SessionStoreLockTask[];
};
const LOCK_QUEUES = new Map<string, SessionStoreLockQueue>();
function lockTimeoutError(storePath: string): Error {
return new Error(`timeout waiting for session store lock: ${storePath}`);
}
function getOrCreateLockQueue(storePath: string): SessionStoreLockQueue {
const existing = LOCK_QUEUES.get(storePath);
if (existing) {
return existing;
}
const created: SessionStoreLockQueue = { running: false, pending: [] };
LOCK_QUEUES.set(storePath, created);
return created;
}
function removePendingTask(queue: SessionStoreLockQueue, task: SessionStoreLockTask): void {
const idx = queue.pending.indexOf(task);
if (idx >= 0) {
queue.pending.splice(idx, 1);
}
}
async function drainSessionStoreLockQueue(storePath: string): Promise<void> {
const queue = LOCK_QUEUES.get(storePath);
if (!queue || queue.running) {
return;
}
queue.running = true;
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task || task.timedOut) {
continue;
}
if (task.timer) {
clearTimeout(task.timer);
}
task.started = true;
const remainingTimeoutMs =
task.timeoutAt != null
? Math.max(0, task.timeoutAt - Date.now())
: Number.POSITIVE_INFINITY;
if (task.timeoutAt != null && remainingTimeoutMs <= 0) {
task.timedOut = true;
task.reject(lockTimeoutError(storePath));
continue;
}
let lock: { release: () => Promise<void> } | undefined;
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
lock = await acquireSessionWriteLock({
sessionFile: storePath,
timeoutMs: remainingTimeoutMs,
staleMs: task.staleMs,
});
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
} finally {
await lock?.release().catch(() => undefined);
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
if (queue.pending.length === 0) {
LOCK_QUEUES.delete(storePath);
} else {
queueMicrotask(() => {
void drainSessionStoreLockQueue(storePath);
});
}
}
}
async function withSessionStoreLock<T>(
storePath: string,
fn: () => Promise<T>,
opts: SessionStoreLockOptions = {},
): Promise<T> {
const timeoutMs = opts.timeoutMs ?? 10_000;
const pollIntervalMs = opts.pollIntervalMs ?? 25;
const staleMs = opts.staleMs ?? 30_000;
const lockPath = `${storePath}.lock`;
const startedAt = Date.now();
// `pollIntervalMs` is retained for API compatibility with older lock options.
void opts.pollIntervalMs;
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
const hasTimeout = timeoutMs > 0 && Number.isFinite(timeoutMs);
const timeoutAt = hasTimeout ? Date.now() + timeoutMs : undefined;
const queue = getOrCreateLockQueue(storePath);
while (true) {
try {
const handle = await fs.promises.open(lockPath, "wx");
try {
await handle.writeFile(
JSON.stringify({ pid: process.pid, startedAt: Date.now() }),
"utf-8",
);
} catch {
// best-effort
}
await handle.close();
break;
} catch (err) {
const code =
err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
if (code === "ENOENT") {
// Store directory may be deleted/recreated in tests while writes are in-flight.
// Best-effort: recreate the parent dir and retry until timeout.
await fs.promises
.mkdir(path.dirname(storePath), { recursive: true })
.catch(() => undefined);
await new Promise((r) => setTimeout(r, pollIntervalMs));
continue;
}
if (code !== "EEXIST") {
throw err;
}
const promise = new Promise<T>((resolve, reject) => {
const task: SessionStoreLockTask = {
fn: async () => await fn(),
resolve: (value) => resolve(value as T),
reject,
timeoutAt,
staleMs,
started: false,
timedOut: false,
};
const now = Date.now();
if (now - startedAt > timeoutMs) {
throw new Error(`timeout acquiring session store lock: ${lockPath}`, { cause: err });
}
// Best-effort stale lock eviction (e.g. crashed process).
try {
const st = await fs.promises.stat(lockPath);
const ageMs = now - st.mtimeMs;
if (ageMs > staleMs) {
await fs.promises.unlink(lockPath);
continue;
if (hasTimeout) {
task.timer = setTimeout(() => {
if (task.started || task.timedOut) {
return;
}
} catch {
// ignore
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
task.timedOut = true;
removePendingTask(queue, task);
reject(lockTimeoutError(storePath));
}, timeoutMs);
}
}
try {
return await fn();
} finally {
await fs.promises.unlink(lockPath).catch(() => undefined);
}
queue.pending.push(task);
void drainSessionStoreLockQueue(storePath);
});
return await promise;
}
export async function updateSessionStoreEntry(params: {