From 4ca84acf240ddb72c68f66ea43a50204372ebd5d Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 12 Mar 2026 14:59:27 -0400 Subject: [PATCH] fix(runtime): duplicate messages, share singleton state across bundled chunks (#43683) * Tests: add fresh module import helper * Process: share command queue runtime state * Agents: share embedded run runtime state * Reply: share followup queue runtime state * Reply: share followup drain callback state * Reply: share queued message dedupe state * Reply: share inbound dedupe state * Tests: cover shared command queue runtime state * Tests: cover shared embedded run runtime state * Tests: cover shared followup queue runtime state * Tests: cover shared inbound dedupe state * Tests: cover shared Slack thread participation state * Slack: share sent thread participation state * Tests: document fresh import helper * Telegram: share draft stream runtime state * Tests: cover shared Telegram draft stream state * Telegram: share sent message cache state * Tests: cover shared Telegram sent message cache * Telegram: share thread binding runtime state * Tests: cover shared Telegram thread binding state * Tests: avoid duplicate shared queue reset * refactor(runtime): centralize global singleton access * refactor(runtime): preserve undefined global singleton values * test(runtime): cover undefined global singleton values --------- Co-authored-by: Nimrod Gutman --- src/agents/pi-embedded-runner/runs.test.ts | 32 ++++++ src/agents/pi-embedded-runner/runs.ts | 16 ++- src/auto-reply/reply/inbound-dedupe.test.ts | 43 ++++++++ src/auto-reply/reply/inbound-dedupe.ts | 17 ++- src/auto-reply/reply/queue/drain.ts | 7 +- src/auto-reply/reply/queue/enqueue.ts | 17 ++- src/auto-reply/reply/queue/state.ts | 9 +- src/auto-reply/reply/reply-flow.test.ts | 115 ++++++++++++++++++++ src/process/command-queue.test.ts | 39 +++++++ src/process/command-queue.ts | 43 +++++--- src/shared/global-singleton.test.ts | 39 +++++++ src/shared/global-singleton.ts | 14 +++ src/slack/sent-thread-cache.test.ts | 24 ++++ src/slack/sent-thread-cache.ts | 10 +- src/telegram/draft-stream.test.ts | 47 +++++++- src/telegram/draft-stream.ts | 22 +++- src/telegram/send.test.ts | 24 ++++ src/telegram/sent-message-cache.ts | 10 +- src/telegram/thread-bindings.test.ts | 48 ++++++++ src/telegram/thread-bindings.ts | 23 +++- test/helpers/import-fresh.ts | 8 ++ 21 files changed, 569 insertions(+), 38 deletions(-) create mode 100644 src/auto-reply/reply/inbound-dedupe.test.ts create mode 100644 src/shared/global-singleton.test.ts create mode 100644 src/shared/global-singleton.ts create mode 100644 test/helpers/import-fresh.ts diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index 732017493..d9bf90f96 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -1,4 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; import { __testing, abortEmbeddedPiRun, @@ -105,4 +106,35 @@ describe("pi-embedded runner run registry", () => { vi.useRealTimers(); } }); + + it("shares active run state across distinct module instances", async () => { + const runsA = await importFreshModule( + import.meta.url, + "./runs.js?scope=shared-a", + ); + const runsB = await importFreshModule( + import.meta.url, + "./runs.js?scope=shared-b", + ); + const handle = { + queueMessage: async () => {}, + isStreaming: () => true, + isCompacting: () => false, + abort: vi.fn(), + }; + + runsA.__testing.resetActiveEmbeddedRuns(); + runsB.__testing.resetActiveEmbeddedRuns(); + + try { + runsA.setActiveEmbeddedRun("session-shared", handle); + expect(runsB.isEmbeddedPiRunActive("session-shared")).toBe(true); + + runsB.clearActiveEmbeddedRun("session-shared", handle); + expect(runsA.isEmbeddedPiRunActive("session-shared")).toBe(false); + } finally { + runsA.__testing.resetActiveEmbeddedRuns(); + runsB.__testing.resetActiveEmbeddedRuns(); + } + }); }); diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 6b62b9b59..0d4cecc83 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -3,6 +3,7 @@ import { logMessageQueued, logSessionStateChange, } from "../../logging/diagnostic.js"; +import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; type EmbeddedPiQueueHandle = { queueMessage: (text: string) => Promise; @@ -11,12 +12,23 @@ type EmbeddedPiQueueHandle = { abort: () => void; }; -const ACTIVE_EMBEDDED_RUNS = new Map(); type EmbeddedRunWaiter = { resolve: (ended: boolean) => void; timer: NodeJS.Timeout; }; -const EMBEDDED_RUN_WAITERS = new Map>(); + +/** + * Use global singleton state so busy/streaming checks stay consistent even + * when the bundler emits multiple copies of this module into separate chunks. + */ +const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState"); + +const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({ + activeRuns: new Map(), + waiters: new Map>(), +})); +const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns; +const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters; export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean { const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); diff --git a/src/auto-reply/reply/inbound-dedupe.test.ts b/src/auto-reply/reply/inbound-dedupe.test.ts new file mode 100644 index 000000000..c71aeb598 --- /dev/null +++ b/src/auto-reply/reply/inbound-dedupe.test.ts @@ -0,0 +1,43 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; +import type { MsgContext } from "../templating.js"; +import { resetInboundDedupe } from "./inbound-dedupe.js"; + +const sharedInboundContext: MsgContext = { + Provider: "discord", + Surface: "discord", + From: "discord:user-1", + To: "channel:c1", + OriginatingChannel: "discord", + OriginatingTo: "channel:c1", + SessionKey: "agent:main:discord:channel:c1", + MessageSid: "msg-1", +}; + +describe("inbound dedupe", () => { + afterEach(() => { + resetInboundDedupe(); + }); + + it("shares dedupe state across distinct module instances", async () => { + const inboundA = await importFreshModule( + import.meta.url, + "./inbound-dedupe.js?scope=shared-a", + ); + const inboundB = await importFreshModule( + import.meta.url, + "./inbound-dedupe.js?scope=shared-b", + ); + + inboundA.resetInboundDedupe(); + inboundB.resetInboundDedupe(); + + try { + expect(inboundA.shouldSkipDuplicateInbound(sharedInboundContext)).toBe(false); + expect(inboundB.shouldSkipDuplicateInbound(sharedInboundContext)).toBe(true); + } finally { + inboundA.resetInboundDedupe(); + inboundB.resetInboundDedupe(); + } + }); +}); diff --git a/src/auto-reply/reply/inbound-dedupe.ts b/src/auto-reply/reply/inbound-dedupe.ts index 0e4740261..04744217c 100644 --- a/src/auto-reply/reply/inbound-dedupe.ts +++ b/src/auto-reply/reply/inbound-dedupe.ts @@ -1,15 +1,24 @@ import { logVerbose, shouldLogVerbose } from "../../globals.js"; import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js"; import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; +import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; import type { MsgContext } from "../templating.js"; const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000; const DEFAULT_INBOUND_DEDUPE_MAX = 5000; -const inboundDedupeCache = createDedupeCache({ - ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS, - maxSize: DEFAULT_INBOUND_DEDUPE_MAX, -}); +/** + * Keep inbound dedupe shared across bundled chunks so the same provider + * message cannot bypass dedupe by entering through a different chunk copy. + */ +const INBOUND_DEDUPE_CACHE_KEY = Symbol.for("openclaw.inboundDedupeCache"); + +const inboundDedupeCache = resolveGlobalSingleton(INBOUND_DEDUPE_CACHE_KEY, () => + createDedupeCache({ + ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS, + maxSize: DEFAULT_INBOUND_DEDUPE_MAX, + }), +); const normalizeProvider = (value?: string | null) => value?.trim().toLowerCase() || ""; diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index e8e93b3dd..1e2fb33e4 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -1,4 +1,5 @@ import { defaultRuntime } from "../../../runtime.js"; +import { resolveGlobalMap } from "../../../shared/global-singleton.js"; import { buildCollectPrompt, beginQueueDrain, @@ -15,7 +16,11 @@ import type { FollowupRun } from "./types.js"; // Persists the most recent runFollowup callback per queue key so that // enqueueFollowupRun can restart a drain that finished and deleted the queue. -const FOLLOWUP_RUN_CALLBACKS = new Map Promise>(); +const FOLLOWUP_DRAIN_CALLBACKS_KEY = Symbol.for("openclaw.followupDrainCallbacks"); + +const FOLLOWUP_RUN_CALLBACKS = resolveGlobalMap Promise>( + FOLLOWUP_DRAIN_CALLBACKS_KEY, +); export function clearFollowupDrainCallback(key: string): void { FOLLOWUP_RUN_CALLBACKS.delete(key); diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 7743048a7..11da0db98 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,13 +1,22 @@ import { createDedupeCache } from "../../../infra/dedupe.js"; +import { resolveGlobalSingleton } from "../../../shared/global-singleton.js"; import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; import { kickFollowupDrainIfIdle } from "./drain.js"; import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; -const RECENT_QUEUE_MESSAGE_IDS = createDedupeCache({ - ttlMs: 5 * 60 * 1000, - maxSize: 10_000, -}); +/** + * Keep queued message-id dedupe shared across bundled chunks so redeliveries + * are rejected no matter which chunk receives the enqueue call. + */ +const RECENT_QUEUE_MESSAGE_IDS_KEY = Symbol.for("openclaw.recentQueueMessageIds"); + +const RECENT_QUEUE_MESSAGE_IDS = resolveGlobalSingleton(RECENT_QUEUE_MESSAGE_IDS_KEY, () => + createDedupeCache({ + ttlMs: 5 * 60 * 1000, + maxSize: 10_000, + }), +); function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined { const messageId = run.messageId?.trim(); diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 73f7ed946..44208e727 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -1,3 +1,4 @@ +import { resolveGlobalMap } from "../../../shared/global-singleton.js"; import { applyQueueRuntimeSettings } from "../../../utils/queue-helpers.js"; import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js"; @@ -18,7 +19,13 @@ export const DEFAULT_QUEUE_DEBOUNCE_MS = 1000; export const DEFAULT_QUEUE_CAP = 20; export const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize"; -export const FOLLOWUP_QUEUES = new Map(); +/** + * Share followup queues across bundled chunks so busy-session enqueue/drain + * logic observes one queue registry per process. + */ +const FOLLOWUP_QUEUES_KEY = Symbol.for("openclaw.followupQueues"); + +export const FOLLOWUP_QUEUES = resolveGlobalMap(FOLLOWUP_QUEUES_KEY); export function getExistingFollowupQueue(key: string): FollowupQueueState | undefined { const cleaned = key.trim(); diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 575ac7f17..d0fd692c2 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -1,4 +1,5 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js"; import type { OpenClawConfig } from "../../config/config.js"; import { defaultRuntime } from "../../runtime.js"; @@ -743,6 +744,71 @@ describe("followup queue deduplication", () => { expect(calls).toHaveLength(1); }); + it("deduplicates same message_id across distinct enqueue module instances", async () => { + const enqueueA = await importFreshModule( + import.meta.url, + "./queue/enqueue.js?scope=dedupe-a", + ); + const enqueueB = await importFreshModule( + import.meta.url, + "./queue/enqueue.js?scope=dedupe-b", + ); + const { clearSessionQueues } = await import("./queue.js"); + const key = `test-dedup-cross-module-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + enqueueA.resetRecentQueuedMessageIdDedupe(); + enqueueB.resetRecentQueuedMessageIdDedupe(); + + try { + expect( + enqueueA.enqueueFollowupRun( + key, + createRun({ + prompt: "first", + messageId: "same-id", + originatingChannel: "signal", + originatingTo: "+10000000000", + }), + settings, + ), + ).toBe(true); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + await new Promise((resolve) => setImmediate(resolve)); + + expect( + enqueueB.enqueueFollowupRun( + key, + createRun({ + prompt: "first-redelivery", + messageId: "same-id", + originatingChannel: "signal", + originatingTo: "+10000000000", + }), + settings, + ), + ).toBe(false); + expect(calls).toHaveLength(1); + } finally { + clearSessionQueues([key]); + enqueueA.resetRecentQueuedMessageIdDedupe(); + enqueueB.resetRecentQueuedMessageIdDedupe(); + } + }); + it("does not collide recent message-id keys when routing contains delimiters", async () => { const key = `test-dedup-key-collision-${Date.now()}`; const calls: FollowupRun[] = []; @@ -1264,6 +1330,55 @@ describe("followup queue drain restart after idle window", () => { expect(calls[1]?.prompt).toBe("after-idle"); }); + it("restarts an idle drain across distinct enqueue and drain module instances", async () => { + const drainA = await importFreshModule( + import.meta.url, + "./queue/drain.js?scope=restart-a", + ); + const enqueueB = await importFreshModule( + import.meta.url, + "./queue/enqueue.js?scope=restart-b", + ); + const { clearSessionQueues } = await import("./queue.js"); + const key = `test-idle-window-cross-module-${Date.now()}`; + const calls: FollowupRun[] = []; + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; + const firstProcessed = createDeferred(); + + enqueueB.resetRecentQueuedMessageIdDedupe(); + + try { + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length === 1) { + firstProcessed.resolve(); + } + }; + + enqueueB.enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings); + drainA.scheduleFollowupDrain(key, runFollowup); + await firstProcessed.promise; + + await new Promise((resolve) => setImmediate(resolve)); + + enqueueB.enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings); + + await vi.waitFor( + () => { + expect(calls).toHaveLength(2); + }, + { timeout: 1_000 }, + ); + + expect(calls[0]?.prompt).toBe("before-idle"); + expect(calls[1]?.prompt).toBe("after-idle"); + } finally { + clearSessionQueues([key]); + drainA.clearFollowupDrainCallback(key); + enqueueB.resetRecentQueuedMessageIdDedupe(); + } + }); + it("does not double-drain when a message arrives while drain is still running", async () => { const key = `test-no-double-drain-${Date.now()}`; const calls: FollowupRun[] = []; diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 16766eabc..b6e6f17cd 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../test/helpers/import-fresh.js"; const diagnosticMocks = vi.hoisted(() => ({ logLaneEnqueue: vi.fn(), @@ -334,4 +335,42 @@ describe("command queue", () => { resetAllLanes(); await expect(enqueueCommand(async () => "ok")).resolves.toBe("ok"); }); + + it("shares lane state across distinct module instances", async () => { + const commandQueueA = await importFreshModule( + import.meta.url, + "./command-queue.js?scope=shared-a", + ); + const commandQueueB = await importFreshModule( + import.meta.url, + "./command-queue.js?scope=shared-b", + ); + const lane = `shared-state-${Date.now()}-${Math.random().toString(16).slice(2)}`; + + let release!: () => void; + const blocker = new Promise((resolve) => { + release = resolve; + }); + + commandQueueA.resetAllLanes(); + + try { + const task = commandQueueA.enqueueCommandInLane(lane, async () => { + await blocker; + return "done"; + }); + + await vi.waitFor(() => { + expect(commandQueueB.getQueueSize(lane)).toBe(1); + expect(commandQueueB.getActiveTaskCount()).toBe(1); + }); + + release(); + await expect(task).resolves.toBe("done"); + expect(commandQueueB.getQueueSize(lane)).toBe(0); + } finally { + release(); + commandQueueA.resetAllLanes(); + } + }); }); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 7b4a386bd..956b386a6 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -1,4 +1,5 @@ import { diagnosticLogger as diag, logLaneDequeue, logLaneEnqueue } from "../logging/diagnostic.js"; +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { CommandLane } from "./lanes.js"; /** * Dedicated error type thrown when a queued command is rejected because @@ -23,9 +24,6 @@ export class GatewayDrainingError extends Error { } } -// Set while gateway is draining for restart; new enqueues are rejected. -let gatewayDraining = false; - // Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow // low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for @@ -49,11 +47,20 @@ type LaneState = { generation: number; }; -const lanes = new Map(); -let nextTaskId = 1; +/** + * Keep queue runtime state on globalThis so every bundled entry/chunk shares + * the same lanes, counters, and draining flag in production builds. + */ +const COMMAND_QUEUE_STATE_KEY = Symbol.for("openclaw.commandQueueState"); + +const queueState = resolveGlobalSingleton(COMMAND_QUEUE_STATE_KEY, () => ({ + gatewayDraining: false, + lanes: new Map(), + nextTaskId: 1, +})); function getLaneState(lane: string): LaneState { - const existing = lanes.get(lane); + const existing = queueState.lanes.get(lane); if (existing) { return existing; } @@ -65,7 +72,7 @@ function getLaneState(lane: string): LaneState { draining: false, generation: 0, }; - lanes.set(lane, created); + queueState.lanes.set(lane, created); return created; } @@ -105,7 +112,7 @@ function drainLane(lane: string) { ); } logLaneDequeue(lane, waitedMs, state.queue.length); - const taskId = nextTaskId++; + const taskId = queueState.nextTaskId++; const taskGeneration = state.generation; state.activeTaskIds.add(taskId); void (async () => { @@ -148,7 +155,7 @@ function drainLane(lane: string) { * `GatewayDrainingError` instead of being silently killed on shutdown. */ export function markGatewayDraining(): void { - gatewayDraining = true; + queueState.gatewayDraining = true; } export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { @@ -166,7 +173,7 @@ export function enqueueCommandInLane( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { - if (gatewayDraining) { + if (queueState.gatewayDraining) { return Promise.reject(new GatewayDrainingError()); } const cleaned = lane.trim() || CommandLane.Main; @@ -198,7 +205,7 @@ export function enqueueCommand( export function getQueueSize(lane: string = CommandLane.Main) { const resolved = lane.trim() || CommandLane.Main; - const state = lanes.get(resolved); + const state = queueState.lanes.get(resolved); if (!state) { return 0; } @@ -207,7 +214,7 @@ export function getQueueSize(lane: string = CommandLane.Main) { export function getTotalQueueSize() { let total = 0; - for (const s of lanes.values()) { + for (const s of queueState.lanes.values()) { total += s.queue.length + s.activeTaskIds.size; } return total; @@ -215,7 +222,7 @@ export function getTotalQueueSize() { export function clearCommandLane(lane: string = CommandLane.Main) { const cleaned = lane.trim() || CommandLane.Main; - const state = lanes.get(cleaned); + const state = queueState.lanes.get(cleaned); if (!state) { return 0; } @@ -242,9 +249,9 @@ export function clearCommandLane(lane: string = CommandLane.Main) { * `enqueueCommandInLane()` call (which may never come). */ export function resetAllLanes(): void { - gatewayDraining = false; + queueState.gatewayDraining = false; const lanesToDrain: string[] = []; - for (const state of lanes.values()) { + for (const state of queueState.lanes.values()) { state.generation += 1; state.activeTaskIds.clear(); state.draining = false; @@ -264,7 +271,7 @@ export function resetAllLanes(): void { */ export function getActiveTaskCount(): number { let total = 0; - for (const s of lanes.values()) { + for (const s of queueState.lanes.values()) { total += s.activeTaskIds.size; } return total; @@ -283,7 +290,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea const POLL_INTERVAL_MS = 50; const deadline = Date.now() + timeoutMs; const activeAtStart = new Set(); - for (const state of lanes.values()) { + for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) { activeAtStart.add(taskId); } @@ -297,7 +304,7 @@ export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolea } let hasPending = false; - for (const state of lanes.values()) { + for (const state of queueState.lanes.values()) { for (const taskId of state.activeTaskIds) { if (activeAtStart.has(taskId)) { hasPending = true; diff --git a/src/shared/global-singleton.test.ts b/src/shared/global-singleton.test.ts new file mode 100644 index 000000000..0f0a29c50 --- /dev/null +++ b/src/shared/global-singleton.test.ts @@ -0,0 +1,39 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { resolveGlobalMap, resolveGlobalSingleton } from "./global-singleton.js"; + +const TEST_KEY = Symbol("global-singleton:test"); +const TEST_MAP_KEY = Symbol("global-singleton:test-map"); + +afterEach(() => { + delete (globalThis as Record)[TEST_KEY]; + delete (globalThis as Record)[TEST_MAP_KEY]; +}); + +describe("resolveGlobalSingleton", () => { + it("reuses an initialized singleton", () => { + const create = vi.fn(() => ({ value: 1 })); + + const first = resolveGlobalSingleton(TEST_KEY, create); + const second = resolveGlobalSingleton(TEST_KEY, create); + + expect(first).toBe(second); + expect(create).toHaveBeenCalledTimes(1); + }); + + it("does not re-run the factory when undefined was already stored", () => { + const create = vi.fn(() => undefined); + + expect(resolveGlobalSingleton(TEST_KEY, create)).toBeUndefined(); + expect(resolveGlobalSingleton(TEST_KEY, create)).toBeUndefined(); + expect(create).toHaveBeenCalledTimes(1); + }); +}); + +describe("resolveGlobalMap", () => { + it("reuses the same map instance", () => { + const first = resolveGlobalMap(TEST_MAP_KEY); + const second = resolveGlobalMap(TEST_MAP_KEY); + + expect(first).toBe(second); + }); +}); diff --git a/src/shared/global-singleton.ts b/src/shared/global-singleton.ts new file mode 100644 index 000000000..2d3ea3820 --- /dev/null +++ b/src/shared/global-singleton.ts @@ -0,0 +1,14 @@ +export function resolveGlobalSingleton(key: symbol, create: () => T): T { + const globalStore = globalThis as Record; + const existing = globalStore[key] as T | undefined; + if (Object.prototype.hasOwnProperty.call(globalStore, key)) { + return existing; + } + const created = create(); + globalStore[key] = created; + return created; +} + +export function resolveGlobalMap(key: symbol): Map { + return resolveGlobalSingleton(key, () => new Map()); +} diff --git a/src/slack/sent-thread-cache.test.ts b/src/slack/sent-thread-cache.test.ts index 05af19588..7421a7277 100644 --- a/src/slack/sent-thread-cache.test.ts +++ b/src/slack/sent-thread-cache.test.ts @@ -1,4 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../test/helpers/import-fresh.js"; import { clearSlackThreadParticipationCache, hasSlackThreadParticipation, @@ -49,6 +50,29 @@ describe("slack sent-thread-cache", () => { expect(hasSlackThreadParticipation("A1", "C456", "1700000000.000002")).toBe(false); }); + it("shares thread participation across distinct module instances", async () => { + const cacheA = await importFreshModule( + import.meta.url, + "./sent-thread-cache.js?scope=shared-a", + ); + const cacheB = await importFreshModule( + import.meta.url, + "./sent-thread-cache.js?scope=shared-b", + ); + + cacheA.clearSlackThreadParticipationCache(); + + try { + cacheA.recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); + expect(cacheB.hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(true); + + cacheB.clearSlackThreadParticipationCache(); + expect(cacheA.hasSlackThreadParticipation("A1", "C123", "1700000000.000001")).toBe(false); + } finally { + cacheA.clearSlackThreadParticipationCache(); + } + }); + it("expired entries return false and are cleaned up on read", () => { recordSlackThreadParticipation("A1", "C123", "1700000000.000001"); // Advance time past the 24-hour TTL diff --git a/src/slack/sent-thread-cache.ts b/src/slack/sent-thread-cache.ts index 7fe8037c7..b3c2a3c24 100644 --- a/src/slack/sent-thread-cache.ts +++ b/src/slack/sent-thread-cache.ts @@ -1,3 +1,5 @@ +import { resolveGlobalMap } from "../shared/global-singleton.js"; + /** * In-memory cache of Slack threads the bot has participated in. * Used to auto-respond in threads without requiring @mention after the first reply. @@ -7,7 +9,13 @@ const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours const MAX_ENTRIES = 5000; -const threadParticipation = new Map(); +/** + * Keep Slack thread participation shared across bundled chunks so thread + * auto-reply gating does not diverge between prepare/dispatch call paths. + */ +const SLACK_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.slackThreadParticipation"); + +const threadParticipation = resolveGlobalMap(SLACK_THREAD_PARTICIPATION_KEY); function makeKey(accountId: string, channelId: string, threadTs: string): string { return `${accountId}:${channelId}:${threadTs}`; diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 58990c41a..07221ccc6 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -1,6 +1,7 @@ import type { Bot } from "grammy"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { createTelegramDraftStream } from "./draft-stream.js"; +import { importFreshModule } from "../../test/helpers/import-fresh.js"; +import { __testing, createTelegramDraftStream } from "./draft-stream.js"; type TelegramDraftStreamParams = Parameters[0]; @@ -65,6 +66,10 @@ function createForceNewMessageHarness(params: { throttleMs?: number } = {}) { } describe("createTelegramDraftStream", () => { + afterEach(() => { + __testing.resetTelegramDraftStreamForTests(); + }); + it("sends stream preview message with message_thread_id when provided", async () => { const api = createMockDraftApi(); const stream = createForumDraftStream(api); @@ -355,6 +360,46 @@ describe("createTelegramDraftStream", () => { expect(api.editMessageText).not.toHaveBeenCalled(); }); + it("shares draft-id allocation across distinct module instances", async () => { + const draftA = await importFreshModule( + import.meta.url, + "./draft-stream.js?scope=shared-a", + ); + const draftB = await importFreshModule( + import.meta.url, + "./draft-stream.js?scope=shared-b", + ); + const apiA = createMockDraftApi(); + const apiB = createMockDraftApi(); + + draftA.__testing.resetTelegramDraftStreamForTests(); + + try { + const streamA = draftA.createTelegramDraftStream({ + api: apiA as unknown as Bot["api"], + chatId: 123, + thread: { id: 42, scope: "dm" }, + previewTransport: "draft", + }); + const streamB = draftB.createTelegramDraftStream({ + api: apiB as unknown as Bot["api"], + chatId: 123, + thread: { id: 42, scope: "dm" }, + previewTransport: "draft", + }); + + streamA.update("Message A"); + await streamA.flush(); + streamB.update("Message B"); + await streamB.flush(); + + expect(apiA.sendMessageDraft.mock.calls[0]?.[1]).toBe(1); + expect(apiB.sendMessageDraft.mock.calls[0]?.[1]).toBe(2); + } finally { + draftA.__testing.resetTelegramDraftStreamForTests(); + } + }); + it("creates new message after forceNewMessage is called", async () => { const { api, stream } = createForceNewMessageHarness(); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index ddb059531..afab4680e 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -1,5 +1,6 @@ import type { Bot } from "grammy"; import { createFinalizableDraftLifecycle } from "../channels/draft-stream-controls.js"; +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js"; @@ -21,11 +22,20 @@ type TelegramSendMessageDraft = ( }, ) => Promise; -let nextDraftId = 0; +/** + * Keep draft-id allocation shared across bundled chunks so concurrent preview + * lanes do not accidentally reuse draft ids when code-split entries coexist. + */ +const TELEGRAM_DRAFT_STREAM_STATE_KEY = Symbol.for("openclaw.telegramDraftStreamState"); + +const draftStreamState = resolveGlobalSingleton(TELEGRAM_DRAFT_STREAM_STATE_KEY, () => ({ + nextDraftId: 0, +})); function allocateTelegramDraftId(): number { - nextDraftId = nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : nextDraftId + 1; - return nextDraftId; + draftStreamState.nextDraftId = + draftStreamState.nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : draftStreamState.nextDraftId + 1; + return draftStreamState.nextDraftId; } function resolveSendMessageDraftApi(api: Bot["api"]): TelegramSendMessageDraft | undefined { @@ -441,3 +451,9 @@ export function createTelegramDraftStream(params: { sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number", }; } + +export const __testing = { + resetTelegramDraftStreamForTests() { + draftStreamState.nextDraftId = 0; + }, +}; diff --git a/src/telegram/send.test.ts b/src/telegram/send.test.ts index 2bd6556ee..f2875af1d 100644 --- a/src/telegram/send.test.ts +++ b/src/telegram/send.test.ts @@ -1,5 +1,6 @@ import type { Bot } from "grammy"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../test/helpers/import-fresh.js"; import { getTelegramSendTestMocks, importTelegramSendModule, @@ -88,6 +89,29 @@ describe("sent-message-cache", () => { clearSentMessageCache(); expect(wasSentByBot(123, 1)).toBe(false); }); + + it("shares sent-message state across distinct module instances", async () => { + const cacheA = await importFreshModule( + import.meta.url, + "./sent-message-cache.js?scope=shared-a", + ); + const cacheB = await importFreshModule( + import.meta.url, + "./sent-message-cache.js?scope=shared-b", + ); + + cacheA.clearSentMessageCache(); + + try { + cacheA.recordSentMessage(123, 1); + expect(cacheB.wasSentByBot(123, 1)).toBe(true); + + cacheB.clearSentMessageCache(); + expect(cacheA.wasSentByBot(123, 1)).toBe(false); + } finally { + cacheA.clearSentMessageCache(); + } + }); }); describe("buildInlineKeyboard", () => { diff --git a/src/telegram/sent-message-cache.ts b/src/telegram/sent-message-cache.ts index 0380f2454..974510669 100644 --- a/src/telegram/sent-message-cache.ts +++ b/src/telegram/sent-message-cache.ts @@ -1,3 +1,5 @@ +import { resolveGlobalMap } from "../shared/global-singleton.js"; + /** * In-memory cache of sent message IDs per chat. * Used to identify bot's own messages for reaction filtering ("own" mode). @@ -9,7 +11,13 @@ type CacheEntry = { timestamps: Map; }; -const sentMessages = new Map(); +/** + * Keep sent-message tracking shared across bundled chunks so Telegram reaction + * filters see the same sent-message history regardless of which chunk recorded it. + */ +const TELEGRAM_SENT_MESSAGES_KEY = Symbol.for("openclaw.telegramSentMessages"); + +const sentMessages = resolveGlobalMap(TELEGRAM_SENT_MESSAGES_KEY); function getChatKey(chatId: number | string): string { return String(chatId); diff --git a/src/telegram/thread-bindings.test.ts b/src/telegram/thread-bindings.test.ts index 4479fc786..fc32ace25 100644 --- a/src/telegram/thread-bindings.test.ts +++ b/src/telegram/thread-bindings.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../test/helpers/import-fresh.js"; import { resolveStateDir } from "../config/paths.js"; import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; import { @@ -79,6 +80,53 @@ describe("telegram thread bindings", () => { }); }); + it("shares binding state across distinct module instances", async () => { + const bindingsA = await importFreshModule( + import.meta.url, + "./thread-bindings.js?scope=shared-a", + ); + const bindingsB = await importFreshModule( + import.meta.url, + "./thread-bindings.js?scope=shared-b", + ); + + bindingsA.__testing.resetTelegramThreadBindingsForTests(); + + try { + const managerA = bindingsA.createTelegramThreadBindingManager({ + accountId: "shared-runtime", + persist: false, + enableSweeper: false, + }); + const managerB = bindingsB.createTelegramThreadBindingManager({ + accountId: "shared-runtime", + persist: false, + enableSweeper: false, + }); + + expect(managerB).toBe(managerA); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-shared", + targetKind: "subagent", + conversation: { + channel: "telegram", + accountId: "shared-runtime", + conversationId: "-100200300:topic:44", + }, + placement: "current", + }); + + expect( + bindingsB + .getTelegramThreadBindingManager("shared-runtime") + ?.getByConversationId("-100200300:topic:44")?.targetSessionKey, + ).toBe("agent:main:subagent:child-shared"); + } finally { + bindingsA.__testing.resetTelegramThreadBindingsForTests(); + } + }); + it("updates lifecycle windows by session key", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z")); diff --git a/src/telegram/thread-bindings.ts b/src/telegram/thread-bindings.ts index 68218e904..ea2fd11ac 100644 --- a/src/telegram/thread-bindings.ts +++ b/src/telegram/thread-bindings.ts @@ -13,6 +13,7 @@ import { type SessionBindingRecord, } from "../infra/outbound/session-binding-service.js"; import { normalizeAccountId } from "../routing/session-key.js"; +import { resolveGlobalSingleton } from "../shared/global-singleton.js"; const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000; const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0; @@ -62,8 +63,26 @@ export type TelegramThreadBindingManager = { stop: () => void; }; -const MANAGERS_BY_ACCOUNT_ID = new Map(); -const BINDINGS_BY_ACCOUNT_CONVERSATION = new Map(); +type TelegramThreadBindingsState = { + managersByAccountId: Map; + bindingsByAccountConversation: Map; +}; + +/** + * Keep Telegram thread binding state shared across bundled chunks so routing, + * binding lookups, and binding mutations all observe the same live registry. + */ +const TELEGRAM_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.telegramThreadBindingsState"); + +const threadBindingsState = resolveGlobalSingleton( + TELEGRAM_THREAD_BINDINGS_STATE_KEY, + () => ({ + managersByAccountId: new Map(), + bindingsByAccountConversation: new Map(), + }), +); +const MANAGERS_BY_ACCOUNT_ID = threadBindingsState.managersByAccountId; +const BINDINGS_BY_ACCOUNT_CONVERSATION = threadBindingsState.bindingsByAccountConversation; function normalizeDurationMs(raw: unknown, fallback: number): number { if (typeof raw !== "number" || !Number.isFinite(raw)) { diff --git a/test/helpers/import-fresh.ts b/test/helpers/import-fresh.ts new file mode 100644 index 000000000..577e25cd8 --- /dev/null +++ b/test/helpers/import-fresh.ts @@ -0,0 +1,8 @@ +export async function importFreshModule( + from: string, + specifier: string, +): Promise { + // Vitest keys module instances by the full URL string, including the query + // suffix. These tests rely on that behavior to emulate code-split chunks. + return (await import(/* @vite-ignore */ new URL(specifier, from).href)) as TModule; +}