diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d74a2c81..3a7d7552a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ Docs: https://docs.openclaw.ai - Plugins/Discovery precedence: load bundled plugins before auto-discovered global extensions so bundled channel plugins win duplicate-ID resolution by default (explicit `plugins.load.paths` overrides remain highest precedence), with loader regression coverage. Landed from contributor PR #29710 by @Sid-Qin. Thanks @Sid-Qin. - Discord/Reconnect integrity: release Discord message listener lane immediately while preserving serialized handler execution, add HELLO-stall resume-first recovery with bounded fresh-identify fallback after repeated stalls, and extend lifecycle/listener regression coverage for forced reconnect scenarios. Landed from contributor PR #29508 by @cgdusek. Thanks @cgdusek. - Matrix/Conduit compatibility: avoid blocking startup on non-resolving Matrix sync start, preserve startup error propagation, prevent duplicate monitor listener registration, remove unreliable 2-member DM heuristics, accept `!room` IDs without alias resolution, and add matrix monitor/client regression coverage. Landed from contributor PR #31023 by @efe-arv. Thanks @efe-arv. +- Discord/Reconnect watchdog: add a shared armable transport stall-watchdog and wire Discord gateway lifecycle force-stop semantics for silent close/reconnect zombies, with gateway/lifecycle watchdog regression coverage and runtime status liveness updates. Follow-up to contributor PR #31025 by @theotarr and PR #30530 by @liuxiaopai-ai. Thanks @theotarr and @liuxiaopai-ai. - Security/Skills: harden skill installer metadata parsing by rejecting unsafe installer specs (brew/node/go/uv/download) and constrain plugin-declared skill directories to the plugin root (including symlink-escape checks), with regression coverage. - Discord/DM command auth: unify DM allowlist + pairing-store authorization across message preflight and native command interactions so DM command gating is consistent for `open`/`pairing`/`allowlist` policies. - Sessions/Usage accounting: persist `cacheRead`/`cacheWrite` from the latest call snapshot (`lastCallUsage`) instead of accumulated multi-call totals, preventing inflated token/cost reporting in long tool/compaction runs. (#31005) diff --git a/extensions/discord/src/channel.ts b/extensions/discord/src/channel.ts index 5ef3ab09c..3a36a6117 100644 --- a/extensions/discord/src/channel.ts +++ b/extensions/discord/src/channel.ts @@ -343,6 +343,11 @@ export const discordPlugin: ChannelPlugin = { defaultRuntime: { accountId: DEFAULT_ACCOUNT_ID, running: false, + connected: false, + reconnectAttempts: 0, + lastConnectedAt: null, + lastDisconnect: null, + lastEventAt: null, lastStartAt: null, lastStopAt: null, lastError: null, @@ -394,6 +399,11 @@ export const discordPlugin: ChannelPlugin = { lastStartAt: runtime?.lastStartAt ?? null, lastStopAt: runtime?.lastStopAt ?? null, lastError: runtime?.lastError ?? null, + connected: runtime?.connected ?? false, + reconnectAttempts: runtime?.reconnectAttempts, + lastConnectedAt: runtime?.lastConnectedAt ?? null, + lastDisconnect: runtime?.lastDisconnect ?? null, + lastEventAt: runtime?.lastEventAt ?? null, application: app ?? undefined, bot: bot ?? undefined, probe, @@ -445,6 +455,7 @@ export const discordPlugin: ChannelPlugin = { abortSignal: ctx.abortSignal, mediaMaxMb: account.config.mediaMaxMb, historyLimit: account.config.historyLimit, + setStatus: (patch) => ctx.setStatus({ accountId: account.accountId, ...patch }), }); }, }, diff --git a/src/channels/transport/stall-watchdog.test.ts b/src/channels/transport/stall-watchdog.test.ts new file mode 100644 index 000000000..1dfbb6d8d --- /dev/null +++ b/src/channels/transport/stall-watchdog.test.ts @@ -0,0 +1,74 @@ +import { describe, expect, it, vi } from "vitest"; +import { createArmableStallWatchdog } from "./stall-watchdog.js"; + +describe("createArmableStallWatchdog", () => { + it("fires onTimeout once when armed and idle exceeds timeout", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + const watchdog = createArmableStallWatchdog({ + label: "test-watchdog", + timeoutMs: 1_000, + checkIntervalMs: 100, + onTimeout, + }); + + watchdog.arm(); + await vi.advanceTimersByTimeAsync(1_500); + + expect(onTimeout).toHaveBeenCalledTimes(1); + expect(watchdog.isArmed()).toBe(false); + watchdog.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("does not fire when disarmed before timeout", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + const watchdog = createArmableStallWatchdog({ + label: "test-watchdog", + timeoutMs: 1_000, + checkIntervalMs: 100, + onTimeout, + }); + + watchdog.arm(); + await vi.advanceTimersByTimeAsync(500); + watchdog.disarm(); + await vi.advanceTimersByTimeAsync(2_000); + + expect(onTimeout).not.toHaveBeenCalled(); + watchdog.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("extends timeout window when touched", async () => { + vi.useFakeTimers(); + try { + const onTimeout = vi.fn(); + const watchdog = createArmableStallWatchdog({ + label: "test-watchdog", + timeoutMs: 1_000, + checkIntervalMs: 100, + onTimeout, + }); + + watchdog.arm(); + await vi.advanceTimersByTimeAsync(700); + watchdog.touch(); + await vi.advanceTimersByTimeAsync(700); + expect(onTimeout).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(400); + expect(onTimeout).toHaveBeenCalledTimes(1); + watchdog.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/src/channels/transport/stall-watchdog.ts b/src/channels/transport/stall-watchdog.ts new file mode 100644 index 000000000..273e1330c --- /dev/null +++ b/src/channels/transport/stall-watchdog.ts @@ -0,0 +1,103 @@ +import type { RuntimeEnv } from "../../runtime.js"; + +export type StallWatchdogTimeoutMeta = { + idleMs: number; + timeoutMs: number; +}; + +export type ArmableStallWatchdog = { + arm: (atMs?: number) => void; + touch: (atMs?: number) => void; + disarm: () => void; + stop: () => void; + isArmed: () => boolean; +}; + +export function createArmableStallWatchdog(params: { + label: string; + timeoutMs: number; + checkIntervalMs?: number; + abortSignal?: AbortSignal; + runtime?: RuntimeEnv; + onTimeout: (meta: StallWatchdogTimeoutMeta) => void; +}): ArmableStallWatchdog { + const timeoutMs = Math.max(1, Math.floor(params.timeoutMs)); + const checkIntervalMs = Math.max( + 100, + Math.floor(params.checkIntervalMs ?? Math.min(5_000, Math.max(250, timeoutMs / 6))), + ); + + let armed = false; + let stopped = false; + let lastActivityAt = Date.now(); + let timer: ReturnType | null = null; + + const clearTimer = () => { + if (!timer) { + return; + } + clearInterval(timer); + timer = null; + }; + + const disarm = () => { + armed = false; + }; + + const stop = () => { + if (stopped) { + return; + } + stopped = true; + disarm(); + clearTimer(); + params.abortSignal?.removeEventListener("abort", stop); + }; + + const arm = (atMs?: number) => { + if (stopped) { + return; + } + lastActivityAt = atMs ?? Date.now(); + armed = true; + }; + + const touch = (atMs?: number) => { + if (stopped) { + return; + } + lastActivityAt = atMs ?? Date.now(); + }; + + const check = () => { + if (!armed || stopped) { + return; + } + const now = Date.now(); + const idleMs = now - lastActivityAt; + if (idleMs < timeoutMs) { + return; + } + disarm(); + params.runtime?.error?.( + `[${params.label}] transport watchdog timeout: idle ${Math.round(idleMs / 1000)}s (limit ${Math.round(timeoutMs / 1000)}s)`, + ); + params.onTimeout({ idleMs, timeoutMs }); + }; + + if (params.abortSignal?.aborted) { + stop(); + } else { + params.abortSignal?.addEventListener("abort", stop, { once: true }); + timer = setInterval(check, checkIntervalMs); + timer.unref?.(); + } + + return { + arm, + touch, + disarm, + stop, + isArmed: () => armed, + }; +} diff --git a/src/discord/monitor.gateway.test.ts b/src/discord/monitor.gateway.test.ts index 95c3f9e23..d349edd4c 100644 --- a/src/discord/monitor.gateway.test.ts +++ b/src/discord/monitor.gateway.test.ts @@ -81,4 +81,48 @@ describe("waitForDiscordGatewayStop", () => { await expect(promise).resolves.toBeUndefined(); }); + + it("rejects via registerForceStop and disconnects gateway", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const abort = new AbortController(); + let forceStop: ((err: unknown) => void) | undefined; + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + registerForceStop: (fn) => { + forceStop = fn; + }, + }); + + expect(forceStop).toBeDefined(); + + forceStop?.(new Error("reconnect watchdog timeout")); + + await expect(promise).rejects.toThrow("reconnect watchdog timeout"); + expect(disconnect).toHaveBeenCalledTimes(1); + expect(emitter.listenerCount("error")).toBe(0); + }); + + it("ignores forceStop after promise already settled", async () => { + const emitter = new EventEmitter(); + const disconnect = vi.fn(); + const abort = new AbortController(); + let forceStop: ((err: unknown) => void) | undefined; + + const promise = waitForDiscordGatewayStop({ + gateway: { emitter, disconnect }, + abortSignal: abort.signal, + registerForceStop: (fn) => { + forceStop = fn; + }, + }); + + abort.abort(); + await expect(promise).resolves.toBeUndefined(); + + forceStop?.(new Error("too late")); + expect(disconnect).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/discord/monitor.gateway.ts b/src/discord/monitor.gateway.ts index 5cb190e8d..624153cad 100644 --- a/src/discord/monitor.gateway.ts +++ b/src/discord/monitor.gateway.ts @@ -14,6 +14,7 @@ export async function waitForDiscordGatewayStop(params: { abortSignal?: AbortSignal; onGatewayError?: (err: unknown) => void; shouldStopOnError?: (err: unknown) => boolean; + registerForceStop?: (forceStop: (err: unknown) => void) => void; }): Promise { const { gateway, abortSignal, onGatewayError, shouldStopOnError } = params; const emitter = gateway?.emitter; @@ -57,6 +58,9 @@ export async function waitForDiscordGatewayStop(params: { finishReject(err); } }; + const onForceStop = (err: unknown) => { + finishReject(err); + }; if (abortSignal?.aborted) { onAbort(); @@ -65,5 +69,6 @@ export async function waitForDiscordGatewayStop(params: { abortSignal?.addEventListener("abort", onAbort, { once: true }); emitter?.on("error", onGatewayErrorEvent); + params.registerForceStop?.(onForceStop); }); } diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index edbbb7f2c..484876861 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -54,6 +54,7 @@ type DiscordReactionListenerParams = { allowNameMatching: boolean; guildEntries?: Record; logger: Logger; + onEvent?: () => void; }; const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000; @@ -123,11 +124,13 @@ export class DiscordMessageListener extends MessageCreateListener { constructor( private handler: DiscordMessageHandler, private logger?: Logger, + private onEvent?: () => void, ) { super(); } async handle(data: DiscordMessageEvent, client: Client) { + this.onEvent?.(); // Release Carbon's dispatch lane immediately, but keep our message handler // serialized to avoid unbounded parallel model/IO work on traffic bursts. this.messageQueue = this.messageQueue @@ -157,6 +160,7 @@ export class DiscordReactionListener extends MessageReactionAddListener { } async handle(data: DiscordReactionEvent, client: Client) { + this.params.onEvent?.(); await runDiscordReactionHandler({ data, client, @@ -174,6 +178,7 @@ export class DiscordReactionRemoveListener extends MessageReactionRemoveListener } async handle(data: DiscordReactionEvent, client: Client) { + this.params.onEvent?.(); await runDiscordReactionHandler({ data, client, diff --git a/src/discord/monitor/native-command.model-picker.test.ts b/src/discord/monitor/native-command.model-picker.test.ts index 59598b39f..e82777576 100644 --- a/src/discord/monitor/native-command.model-picker.test.ts +++ b/src/discord/monitor/native-command.model-picker.test.ts @@ -310,7 +310,7 @@ describe("Discord model picker interactions", () => { .mockResolvedValue(); const dispatchSpy = vi .spyOn(dispatcherModule, "dispatchReplyWithDispatcher") - .mockImplementation(() => new Promise(() => {}) as never); + .mockResolvedValue({} as never); const withTimeoutSpy = vi .spyOn(timeoutModule, "withTimeout") .mockRejectedValue(new Error("timeout")); diff --git a/src/discord/monitor/provider.lifecycle.test.ts b/src/discord/monitor/provider.lifecycle.test.ts index 5e1986f18..81610424a 100644 --- a/src/discord/monitor/provider.lifecycle.test.ts +++ b/src/discord/monitor/provider.lifecycle.test.ts @@ -83,6 +83,7 @@ describe("runDiscordGatewayLifecycle", () => { start, stop, threadStop, + runtimeLog, runtimeError, releaseEarlyGatewayErrorGuard, lifecycleParams: { @@ -315,4 +316,77 @@ describe("runDiscordGatewayLifecycle", () => { vi.useRealTimers(); } }); + + it("force-stops when reconnect stalls after a close event", async () => { + vi.useFakeTimers(); + try { + const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js"); + const emitter = new EventEmitter(); + const gateway = { + isConnected: false, + options: {}, + disconnect: vi.fn(), + connect: vi.fn(), + emitter, + }; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + waitForDiscordGatewayStopMock.mockImplementationOnce( + (waitParams: { registerForceStop?: (stop: (err: unknown) => void) => void }) => + new Promise((_resolve, reject) => { + waitParams.registerForceStop?.((err) => reject(err)); + }), + ); + const { lifecycleParams } = createLifecycleHarness({ gateway }); + + const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); + lifecyclePromise.catch(() => {}); + emitter.emit("debug", "WebSocket connection closed with code 1006"); + + await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000); + await expect(lifecyclePromise).rejects.toThrow("reconnect watchdog timeout"); + } finally { + vi.useRealTimers(); + } + }); + + it("does not force-stop when reconnect resumes before watchdog timeout", async () => { + vi.useFakeTimers(); + try { + const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js"); + const emitter = new EventEmitter(); + const gateway = { + isConnected: false, + options: {}, + disconnect: vi.fn(), + connect: vi.fn(), + emitter, + }; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + let resolveWait: (() => void) | undefined; + waitForDiscordGatewayStopMock.mockImplementationOnce( + (waitParams: { registerForceStop?: (stop: (err: unknown) => void) => void }) => + new Promise((resolve, reject) => { + resolveWait = resolve; + waitParams.registerForceStop?.((err) => reject(err)); + }), + ); + const { lifecycleParams, runtimeLog } = createLifecycleHarness({ gateway }); + + const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams); + emitter.emit("debug", "WebSocket connection closed with code 1006"); + await vi.advanceTimersByTimeAsync(60_000); + + gateway.isConnected = true; + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000); + + expect(runtimeLog).not.toHaveBeenCalledWith( + expect.stringContaining("reconnect watchdog timeout"), + ); + resolveWait?.(); + await expect(lifecyclePromise).resolves.toBeUndefined(); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/src/discord/monitor/provider.lifecycle.ts b/src/discord/monitor/provider.lifecycle.ts index e208114c2..4504f6d03 100644 --- a/src/discord/monitor/provider.lifecycle.ts +++ b/src/discord/monitor/provider.lifecycle.ts @@ -1,11 +1,13 @@ import type { Client } from "@buape/carbon"; import type { GatewayPlugin } from "@buape/carbon/gateway"; +import { createArmableStallWatchdog } from "../../channels/transport/stall-watchdog.js"; import { danger } from "../../globals.js"; import type { RuntimeEnv } from "../../runtime.js"; import { attachDiscordGatewayLogging } from "../gateway-logging.js"; import { getDiscordGatewayEmitter, waitForDiscordGatewayStop } from "../monitor.gateway.js"; import type { DiscordVoiceManager } from "../voice/manager.js"; import { registerGateway, unregisterGateway } from "./gateway-registry.js"; +import type { DiscordMonitorStatusSink } from "./status.js"; type ExecApprovalsHandler = { start: () => Promise; @@ -24,7 +26,12 @@ export async function runDiscordGatewayLifecycle(params: { threadBindings: { stop: () => void }; pendingGatewayErrors?: unknown[]; releaseEarlyGatewayErrorGuard?: () => void; + statusSink?: DiscordMonitorStatusSink; }) { + const HELLO_TIMEOUT_MS = 30000; + const HELLO_CONNECTED_POLL_MS = 250; + const MAX_CONSECUTIVE_HELLO_STALLS = 3; + const RECONNECT_STALL_TIMEOUT_MS = 5 * 60_000; const gateway = params.client.getPlugin("gateway"); if (gateway) { registerGateway(params.accountId, gateway); @@ -34,8 +41,58 @@ export async function runDiscordGatewayLifecycle(params: { emitter: gatewayEmitter, runtime: params.runtime, }); + let lifecycleStopping = false; + let forceStopHandler: ((err: unknown) => void) | undefined; + let queuedForceStopError: unknown; + + const pushStatus = (patch: Parameters[0]) => { + params.statusSink?.(patch); + }; + + const triggerForceStop = (err: unknown) => { + if (forceStopHandler) { + forceStopHandler(err); + return; + } + queuedForceStopError = err; + }; + + const reconnectStallWatchdog = createArmableStallWatchdog({ + label: `discord:${params.accountId}:reconnect`, + timeoutMs: RECONNECT_STALL_TIMEOUT_MS, + abortSignal: params.abortSignal, + runtime: params.runtime, + onTimeout: () => { + if (params.abortSignal?.aborted || lifecycleStopping) { + return; + } + const at = Date.now(); + const error = new Error( + `discord reconnect watchdog timeout after ${RECONNECT_STALL_TIMEOUT_MS}ms`, + ); + pushStatus({ + connected: false, + lastEventAt: at, + lastDisconnect: { + at, + error: error.message, + }, + lastError: error.message, + }); + params.runtime.error?.( + danger( + `discord: reconnect watchdog timeout after ${RECONNECT_STALL_TIMEOUT_MS}ms; force-stopping monitor task`, + ), + ); + triggerForceStop(error); + }, + }); const onAbort = () => { + lifecycleStopping = true; + reconnectStallWatchdog.disarm(); + const at = Date.now(); + pushStatus({ connected: false, lastEventAt: at }); if (!gateway) { return; } @@ -50,9 +107,6 @@ export async function runDiscordGatewayLifecycle(params: { params.abortSignal?.addEventListener("abort", onAbort, { once: true }); } - const HELLO_TIMEOUT_MS = 30000; - const HELLO_CONNECTED_POLL_MS = 250; - const MAX_CONSECUTIVE_HELLO_STALLS = 3; let helloTimeoutId: ReturnType | undefined; let helloConnectedPollId: ReturnType | undefined; let consecutiveHelloStalls = 0; @@ -69,6 +123,14 @@ export async function runDiscordGatewayLifecycle(params: { const resetHelloStallCounter = () => { consecutiveHelloStalls = 0; }; + const parseGatewayCloseCode = (message: string): number | undefined => { + const match = /code\s+(\d{3,5})/i.exec(message); + if (!match?.[1]) { + return undefined; + } + const code = Number.parseInt(match[1], 10); + return Number.isFinite(code) ? code : undefined; + }; const clearResumeState = () => { const mutableGateway = gateway as | (GatewayPlugin & { @@ -90,27 +152,53 @@ export async function runDiscordGatewayLifecycle(params: { }; const onGatewayDebug = (msg: unknown) => { const message = String(msg); + const at = Date.now(); + pushStatus({ lastEventAt: at }); if (message.includes("WebSocket connection closed")) { // Carbon marks `isConnected` true only after READY/RESUMED and flips it // false during reconnect handling after this debug line is emitted. if (gateway?.isConnected) { resetHelloStallCounter(); } + reconnectStallWatchdog.arm(at); + pushStatus({ + connected: false, + lastDisconnect: { + at, + status: parseGatewayCloseCode(message), + }, + }); clearHelloWatch(); return; } if (!message.includes("WebSocket connection opened")) { return; } + reconnectStallWatchdog.disarm(); clearHelloWatch(); let sawConnected = gateway?.isConnected === true; + if (sawConnected) { + pushStatus({ + connected: true, + lastConnectedAt: at, + lastDisconnect: null, + }); + } helloConnectedPollId = setInterval(() => { if (!gateway?.isConnected) { return; } sawConnected = true; resetHelloStallCounter(); + const connectedAt = Date.now(); + reconnectStallWatchdog.disarm(); + pushStatus({ + connected: true, + lastEventAt: connectedAt, + lastConnectedAt: connectedAt, + lastDisconnect: null, + }); if (helloConnectedPollId) { clearInterval(helloConnectedPollId); helloConnectedPollId = undefined; @@ -127,6 +215,16 @@ export async function runDiscordGatewayLifecycle(params: { } else { consecutiveHelloStalls += 1; const forceFreshIdentify = consecutiveHelloStalls >= MAX_CONSECUTIVE_HELLO_STALLS; + const stalledAt = Date.now(); + reconnectStallWatchdog.arm(stalledAt); + pushStatus({ + connected: false, + lastEventAt: stalledAt, + lastDisconnect: { + at: stalledAt, + error: "hello-timeout", + }, + }); params.runtime.log?.( danger( forceFreshIdentify @@ -199,15 +297,25 @@ export async function runDiscordGatewayLifecycle(params: { abortSignal: params.abortSignal, onGatewayError: logGatewayError, shouldStopOnError: shouldStopOnGatewayError, + registerForceStop: (forceStop) => { + forceStopHandler = forceStop; + if (queuedForceStopError !== undefined) { + const queued = queuedForceStopError; + queuedForceStopError = undefined; + forceStop(queued); + } + }, }); } catch (err) { if (!sawDisallowedIntents && !params.isDisallowedIntentsError(err)) { throw err; } } finally { + lifecycleStopping = true; params.releaseEarlyGatewayErrorGuard?.(); unregisterGateway(params.accountId); stopGatewayLogging(); + reconnectStallWatchdog.stop(); clearHelloWatch(); gatewayEmitter?.removeListener("debug", onGatewayDebug); params.abortSignal?.removeEventListener("abort", onAbort); diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index 942651525..f8614419c 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -71,6 +71,7 @@ import { resolveDiscordPresenceUpdate } from "./presence.js"; import { resolveDiscordAllowlistConfig } from "./provider.allowlist.js"; import { runDiscordGatewayLifecycle } from "./provider.lifecycle.js"; import { resolveDiscordRestFetch } from "./rest-fetch.js"; +import type { DiscordMonitorStatusSink } from "./status.js"; import { createNoopThreadBindingManager, createThreadBindingManager, @@ -87,6 +88,7 @@ export type MonitorDiscordOpts = { mediaMaxMb?: number; historyLimit?: number; replyToMode?: ReplyToMode; + setStatus?: DiscordMonitorStatusSink; }; function summarizeAllowList(list?: string[]) { @@ -590,8 +592,17 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { threadBindings, discordRestFetch, }); + const trackInboundEvent = opts.setStatus + ? () => { + const at = Date.now(); + opts.setStatus?.({ lastEventAt: at, lastInboundAt: at }); + } + : undefined; - registerDiscordListener(client.listeners, new DiscordMessageListener(messageHandler, logger)); + registerDiscordListener( + client.listeners, + new DiscordMessageListener(messageHandler, logger, trackInboundEvent), + ); registerDiscordListener( client.listeners, new DiscordReactionListener({ @@ -608,6 +619,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { allowNameMatching: isDangerousNameMatchingEnabled(discordCfg), guildEntries, logger, + onEvent: trackInboundEvent, }), ); registerDiscordListener( @@ -626,6 +638,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { allowNameMatching: isDangerousNameMatchingEnabled(discordCfg), guildEntries, logger, + onEvent: trackInboundEvent, }), ); @@ -645,6 +658,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { client, runtime, abortSignal: opts.abortSignal, + statusSink: opts.setStatus, isDisallowedIntentsError: isDiscordDisallowedIntentsError, voiceManager, voiceManagerRef, diff --git a/src/discord/monitor/status.ts b/src/discord/monitor/status.ts new file mode 100644 index 000000000..403fc7eee --- /dev/null +++ b/src/discord/monitor/status.ts @@ -0,0 +1,18 @@ +export type DiscordMonitorStatusPatch = { + connected?: boolean; + lastEventAt?: number | null; + lastConnectedAt?: number | null; + lastDisconnect?: + | string + | { + at: number; + status?: number; + error?: string; + loggedOut?: boolean; + } + | null; + lastInboundAt?: number | null; + lastError?: string | null; +}; + +export type DiscordMonitorStatusSink = (patch: DiscordMonitorStatusPatch) => void;