Files
openclaw/src/telegram/monitor.ts
Gustavo Madeira Santana ecb91bbb1a Add accountId and config support to Telegram webhook
The Telegram webhook and monitor now accept and pass through accountId and config parameters, enabling routing and configuration per Telegram account. Tests have been updated to verify correct bot instantiation and DM routing based on accountId bindings.
2026-01-13 03:34:32 +00:00

154 lines
4.2 KiB
TypeScript

import { type RunOptions, run } from "@grammyjs/runner";
import type { ClawdbotConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import { formatDurationMs } from "../infra/format-duration.js";
import type { RuntimeEnv } from "../runtime.js";
import { resolveTelegramAccount } from "./accounts.js";
import { createTelegramBot } from "./bot.js";
import { makeProxyFetch } from "./proxy.js";
import { startTelegramWebhook } from "./webhook.js";
export type MonitorTelegramOpts = {
token?: string;
accountId?: string;
config?: ClawdbotConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
useWebhook?: boolean;
webhookPath?: string;
webhookPort?: number;
webhookSecret?: string;
proxyFetch?: typeof fetch;
webhookUrl?: string;
};
export function createTelegramRunnerOptions(
cfg: ClawdbotConfig,
): RunOptions<unknown> {
return {
sink: {
concurrency: cfg.agents?.defaults?.maxConcurrent ?? 1,
},
runner: {
fetch: {
// Match grammY defaults
timeout: 30,
},
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
silent: true,
},
};
}
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs: 2000,
maxMs: 30_000,
factor: 1.8,
jitter: 0.25,
};
const isGetUpdatesConflict = (err: unknown) => {
if (!err || typeof err !== "object") return false;
const typed = err as {
error_code?: number;
errorCode?: number;
description?: string;
method?: string;
message?: string;
};
const errorCode = typed.error_code ?? typed.errorCode;
if (errorCode !== 409) return false;
const haystack = [typed.method, typed.description, typed.message]
.filter((value): value is string => typeof value === "string")
.join(" ")
.toLowerCase();
return haystack.includes("getupdates");
};
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const cfg = opts.config ?? loadConfig();
const account = resolveTelegramAccount({
cfg,
accountId: opts.accountId,
});
const token = opts.token?.trim() || account.token;
if (!token) {
throw new Error(
`Telegram bot token missing for account "${account.accountId}" (set telegram.accounts.${account.accountId}.botToken/tokenFile or TELEGRAM_BOT_TOKEN for default).`,
);
}
const proxyFetch =
opts.proxyFetch ??
(account.config.proxy
? makeProxyFetch(account.config.proxy as string)
: undefined);
const bot = createTelegramBot({
token,
runtime: opts.runtime,
proxyFetch,
config: cfg,
accountId: account.accountId,
});
if (opts.useWebhook) {
await startTelegramWebhook({
token,
accountId: account.accountId,
config: cfg,
path: opts.webhookPath,
port: opts.webhookPort,
secret: opts.webhookSecret,
runtime: opts.runtime as RuntimeEnv,
fetch: proxyFetch,
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
});
return;
}
// Use grammyjs/runner for concurrent update processing
const log = opts.runtime?.log ?? console.log;
let restartAttempts = 0;
while (!opts.abortSignal?.aborted) {
const runner = run(bot, createTelegramRunnerOptions(cfg));
const stopOnAbort = () => {
if (opts.abortSignal?.aborted) {
void runner.stop();
}
};
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
return;
} catch (err) {
if (opts.abortSignal?.aborted) {
throw err;
}
if (!isGetUpdatesConflict(err)) {
throw err;
}
restartAttempts += 1;
const delayMs = computeBackoff(
TELEGRAM_POLL_RESTART_POLICY,
restartAttempts,
);
log(
`Telegram getUpdates conflict; retrying in ${formatDurationMs(delayMs)}.`,
);
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) return;
throw sleepErr;
}
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
}
}
}