From 2015ab3194d908944c8f3f62d1da0a9d0370cd8a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 7 Mar 2026 20:47:33 +0000 Subject: [PATCH] fix(telegram): harden persisted offset confirmation and stall recovery Landed from #39111 by @MumuTW. Co-authored-by: MumuTW --- CHANGELOG.md | 1 + src/telegram/monitor.test.ts | 136 ++++++++++++++++++++++- src/telegram/monitor.ts | 82 ++++++++++++-- src/telegram/update-offset-store.test.ts | 28 +++++ src/telegram/update-offset-store.ts | 9 +- 5 files changed, 242 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f92ff04c6..e1d6c20cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -274,6 +274,7 @@ Docs: https://docs.openclaw.ai - Usage/token count formatting: round near-million token counts to millions (`1.0m`) instead of `1000k`, with explicit boundary coverage for `999_499` and `999_500`. (#39129) Thanks @CurryMessi. - Gateway/session bootstrap cache invalidation ordering: clear bootstrap snapshots only after active embedded-run shutdown wait completes, preventing dying runs from repopulating stale cache between `/new`/`sessions.reset` turns. (#38873) Thanks @MumuTW. - Browser/dispatcher error clarity: preserve dispatcher-side failure context in browser fetch errors while still appending operator guidance and explicit no-retry model hints, preventing misleading `"Can't reach service"` wrapping and avoiding LLM retry loops. (#39090) Thanks @NewdlDewdl. +- Telegram/polling offset safety: confirm persisted offsets before polling startup while validating stored `lastUpdateId` values as non-negative safe integers (with overflow guards) so malformed offset state cannot cause update skipping/dropping. (#39111) Thanks @MumuTW. ## 2026.3.2 diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index 4fe32147e..b5f072ebf 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -22,6 +22,10 @@ const api = { sendDocument: vi.fn(), setWebhook: vi.fn(), deleteWebhook: vi.fn(), + getUpdates: vi.fn(async () => []), + config: { + use: vi.fn(), + }, }; const { initSpy, runSpy, loadConfig } = vi.hoisted(() => ({ initSpy: vi.fn(async () => undefined), @@ -67,6 +71,9 @@ const { computeBackoff, sleepWithAbort } = vi.hoisted(() => ({ computeBackoff: vi.fn(() => 0), sleepWithAbort: vi.fn(async () => undefined), })); +const { readTelegramUpdateOffsetSpy } = vi.hoisted(() => ({ + readTelegramUpdateOffsetSpy: vi.fn(async () => null as number | null), +})); const { startTelegramWebhookSpy } = vi.hoisted(() => ({ startTelegramWebhookSpy: vi.fn(async () => ({ server: { close: vi.fn() }, stop: vi.fn() })), })); @@ -183,6 +190,11 @@ vi.mock("./webhook.js", () => ({ startTelegramWebhook: startTelegramWebhookSpy, })); +vi.mock("./update-offset-store.js", () => ({ + readTelegramUpdateOffset: readTelegramUpdateOffsetSpy, + writeTelegramUpdateOffset: vi.fn(async () => undefined), +})); + vi.mock("../auto-reply/reply.js", () => ({ getReplyFromConfig: async (ctx: { Body?: string }) => ({ text: `echo:${ctx.Body}`, @@ -198,6 +210,8 @@ describe("monitorTelegramProvider (grammY)", () => { channels: { telegram: {} }, }); initSpy.mockClear(); + readTelegramUpdateOffsetSpy.mockReset().mockResolvedValue(null); + api.getUpdates.mockReset().mockResolvedValue([]); runSpy.mockReset().mockImplementation(() => makeRunnerStub({ task: () => Promise.reject(new Error("runSpy called without explicit test stub")), @@ -218,9 +232,11 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("processes a DM and sends reply", async () => { - Object.values(api).forEach((fn) => { - fn?.mockReset?.(); - }); + for (const v of Object.values(api)) { + if (typeof v === "function" && "mockReset" in v) { + (v as ReturnType).mockReset(); + } + } await monitorWithAutoAbort(); expect(handlers.message).toBeDefined(); await handlers.message?.({ @@ -260,9 +276,11 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("requires mention in groups by default", async () => { - Object.values(api).forEach((fn) => { - fn?.mockReset?.(); - }); + for (const v of Object.values(api)) { + if (typeof v === "function" && "mockReset" in v) { + (v as ReturnType).mockReset(); + } + } await monitorWithAutoAbort(); await handlers.message?.({ message: { @@ -467,6 +485,112 @@ describe("monitorTelegramProvider (grammY)", () => { expect(settled).toHaveBeenCalledTimes(1); }); + it("force-restarts polling when getUpdates stalls (watchdog)", async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + const abort = new AbortController(); + let running = true; + let releaseTask: (() => void) | undefined; + const stop = vi.fn(async () => { + running = false; + releaseTask?.(); + }); + + runSpy + .mockImplementationOnce(() => + makeRunnerStub({ + task: () => + new Promise((resolve) => { + releaseTask = resolve; + }), + stop, + isRunning: () => running, + }), + ) + .mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); + + const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1)); + + // Advance time past the stall threshold (90s) + watchdog interval (30s) + vi.advanceTimersByTime(120_000); + await monitor; + + expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1); + expect(computeBackoff).toHaveBeenCalled(); + expect(runSpy).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); + + it("confirms persisted offset with Telegram before starting runner", async () => { + readTelegramUpdateOffsetSpy.mockResolvedValueOnce(549076203); + const abort = new AbortController(); + const order: string[] = []; + api.getUpdates.mockReset(); + api.getUpdates.mockImplementationOnce(async () => { + order.push("getUpdates"); + return []; + }); + api.deleteWebhook.mockReset(); + api.deleteWebhook.mockImplementationOnce(async () => { + order.push("deleteWebhook"); + return true; + }); + runSpy.mockImplementationOnce(() => { + order.push("run"); + return makeAbortRunner(abort); + }); + + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + + expect(api.getUpdates).toHaveBeenCalledWith({ offset: 549076204, limit: 1, timeout: 0 }); + expect(order).toEqual(["deleteWebhook", "getUpdates", "run"]); + }); + + it("skips offset confirmation when no persisted offset exists", async () => { + readTelegramUpdateOffsetSpy.mockResolvedValueOnce(null); + const abort = new AbortController(); + api.getUpdates.mockReset(); + api.deleteWebhook.mockReset(); + api.deleteWebhook.mockResolvedValueOnce(true); + mockRunOnceAndAbort(abort); + + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + + expect(api.getUpdates).not.toHaveBeenCalled(); + }); + + it("skips offset confirmation when persisted offset is invalid", async () => { + readTelegramUpdateOffsetSpy.mockResolvedValueOnce(-1 as number); + const abort = new AbortController(); + api.getUpdates.mockReset(); + api.deleteWebhook.mockReset(); + api.deleteWebhook.mockResolvedValueOnce(true); + mockRunOnceAndAbort(abort); + + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + + expect(api.getUpdates).not.toHaveBeenCalled(); + }); + + it("skips offset confirmation when persisted offset cannot be safely incremented", async () => { + readTelegramUpdateOffsetSpy.mockResolvedValueOnce(Number.MAX_SAFE_INTEGER); + const abort = new AbortController(); + api.getUpdates.mockReset(); + api.deleteWebhook.mockReset(); + api.deleteWebhook.mockResolvedValueOnce(true); + mockRunOnceAndAbort(abort); + + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + + expect(api.getUpdates).not.toHaveBeenCalled(); + }); + it("falls back to configured webhookSecret when not passed explicitly", async () => { await monitorTelegramProvider({ token: "tok", diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 7b252cf6b..2b5bdae8f 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -61,8 +61,21 @@ const TELEGRAM_POLL_RESTART_POLICY = { jitter: 0.25, }; +// Polling stall detection: if no getUpdates call is seen for this long, +// assume the runner is stuck and force-restart it. +// Default fetch timeout is 30s, so 3x gives ample margin for slow responses. +const POLL_STALL_THRESHOLD_MS = 90_000; +const POLL_WATCHDOG_INTERVAL_MS = 30_000; + type TelegramBot = ReturnType; +function normalizePersistedUpdateId(value: number | null): number | null { + if (!Number.isSafeInteger(value) || value < 0) { + return null; + } + return value; +} + const isGetUpdatesConflict = (err: unknown) => { if (!err || typeof err !== "object") { return false; @@ -137,19 +150,30 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const proxyFetch = opts.proxyFetch ?? (account.config.proxy ? makeProxyFetch(account.config.proxy) : undefined); - let lastUpdateId = await readTelegramUpdateOffset({ + const persistedOffsetRaw = await readTelegramUpdateOffset({ accountId: account.accountId, botToken: token, }); + let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw); + if (persistedOffsetRaw !== null && lastUpdateId === null) { + log( + `[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`, + ); + } const persistUpdateId = async (updateId: number) => { - if (lastUpdateId !== null && updateId <= lastUpdateId) { + const normalizedUpdateId = normalizePersistedUpdateId(updateId); + if (normalizedUpdateId === null) { + log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`); return; } - lastUpdateId = updateId; + if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) { + return; + } + lastUpdateId = normalizedUpdateId; try { await writeTelegramUpdateOffset({ accountId: account.accountId, - updateId, + updateId: normalizedUpdateId, botToken: token, }); } catch (err) { @@ -258,10 +282,35 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } }; + const confirmPersistedOffset = async (bot: TelegramBot): Promise => { + if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) { + return; + } + try { + await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 }); + } catch { + // Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate. + } + }; + const runPollingCycle = async (bot: TelegramBot): Promise<"continue" | "exit"> => { + // Confirm the persisted offset with Telegram so the runner (which starts + // at offset 0) does not re-fetch already-processed updates on restart. + await confirmPersistedOffset(bot); + + // Track getUpdates calls to detect polling stalls. + let lastGetUpdatesAt = Date.now(); + bot.api.config.use((prev, method, payload, signal) => { + if (method === "getUpdates") { + lastGetUpdatesAt = Date.now(); + } + return prev(method, payload, signal); + }); + const runner = run(bot, runnerOptions); activeRunner = runner; let stopPromise: Promise | undefined; + let stalledRestart = false; const stopRunner = () => { stopPromise ??= Promise.resolve(runner.stop()) .then(() => undefined) @@ -282,6 +331,22 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { void stopRunner(); } }; + + // Watchdog: detect when getUpdates calls have stalled and force-restart. + const watchdog = setInterval(() => { + if (opts.abortSignal?.aborted) { + return; + } + const elapsed = Date.now() - lastGetUpdatesAt; + if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) { + stalledRestart = true; + log( + `[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`, + ); + void stopRunner(); + } + }, POLL_WATCHDOG_INTERVAL_MS); + opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); try { // runner.task() returns a promise that resolves when the runner stops @@ -289,9 +354,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { if (opts.abortSignal?.aborted) { return "exit"; } - const reason = forceRestarted - ? "unhandled network error" - : "runner stopped (maxRetryTime exceeded or graceful stop)"; + const reason = stalledRestart + ? "polling stall detected" + : forceRestarted + ? "unhandled network error" + : "runner stopped (maxRetryTime exceeded or graceful stop)"; forceRestarted = false; const shouldRestart = await waitBeforeRestart( (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, @@ -314,6 +381,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { ); return shouldRestart ? "continue" : "exit"; } finally { + clearInterval(watchdog); opts.abortSignal?.removeEventListener("abort", stopOnAbort); await stopRunner(); await stopBot(); diff --git a/src/telegram/update-offset-store.test.ts b/src/telegram/update-offset-store.test.ts index 96b0ec039..8c00c3a15 100644 --- a/src/telegram/update-offset-store.test.ts +++ b/src/telegram/update-offset-store.test.ts @@ -78,4 +78,32 @@ describe("deleteTelegramUpdateOffset", () => { ).toBeNull(); }); }); + + it("ignores invalid persisted update IDs from disk", async () => { + await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => { + const offsetPath = path.join(stateDir, "telegram", "update-offset-default.json"); + await fs.mkdir(path.dirname(offsetPath), { recursive: true }); + await fs.writeFile( + offsetPath, + `${JSON.stringify({ version: 2, lastUpdateId: -1, botId: "111111" }, null, 2)}\n`, + "utf-8", + ); + expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull(); + + await fs.writeFile( + offsetPath, + `${JSON.stringify({ version: 2, lastUpdateId: Number.POSITIVE_INFINITY, botId: "111111" }, null, 2)}\n`, + "utf-8", + ); + expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull(); + }); + }); + + it("rejects writing invalid update IDs", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + await expect( + writeTelegramUpdateOffset({ accountId: "default", updateId: -1 as number }), + ).rejects.toThrow(/non-negative safe integer/i); + }); + }); }); diff --git a/src/telegram/update-offset-store.ts b/src/telegram/update-offset-store.ts index b6ed5eb6b..8a511788c 100644 --- a/src/telegram/update-offset-store.ts +++ b/src/telegram/update-offset-store.ts @@ -12,6 +12,10 @@ type TelegramUpdateOffsetState = { botId: string | null; }; +function isValidUpdateId(value: unknown): value is number { + return typeof value === "number" && Number.isSafeInteger(value) && value >= 0; +} + function normalizeAccountId(accountId?: string) { const trimmed = accountId?.trim(); if (!trimmed) { @@ -51,7 +55,7 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null { if (parsed?.version !== STORE_VERSION && parsed?.version !== 1) { return null; } - if (parsed.lastUpdateId !== null && typeof parsed.lastUpdateId !== "number") { + if (parsed.lastUpdateId !== null && !isValidUpdateId(parsed.lastUpdateId)) { return null; } if ( @@ -103,6 +107,9 @@ export async function writeTelegramUpdateOffset(params: { botToken?: string; env?: NodeJS.ProcessEnv; }): Promise { + if (!isValidUpdateId(params.updateId)) { + throw new Error("Telegram update offset must be a non-negative safe integer."); + } const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); const payload: TelegramUpdateOffsetState = { version: STORE_VERSION,