fix(discord): unify reconnect watchdog and land #31025/#30530
Landed follow-up intent from contributor PR #31025 (@theotarr) and PR #30530 (@liuxiaopai-ai). Co-authored-by: theotarr <theotarr@users.noreply.github.com> Co-authored-by: liuxiaopai-ai <liuxiaopai-ai@users.noreply.github.com>
This commit is contained in:
@@ -96,6 +96,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins/Discovery precedence: load bundled plugins before auto-discovered global extensions so bundled channel plugins win duplicate-ID resolution by default (explicit `plugins.load.paths` overrides remain highest precedence), with loader regression coverage. Landed from contributor PR #29710 by @Sid-Qin. Thanks @Sid-Qin.
|
||||
- Discord/Reconnect integrity: release Discord message listener lane immediately while preserving serialized handler execution, add HELLO-stall resume-first recovery with bounded fresh-identify fallback after repeated stalls, and extend lifecycle/listener regression coverage for forced reconnect scenarios. Landed from contributor PR #29508 by @cgdusek. Thanks @cgdusek.
|
||||
- Matrix/Conduit compatibility: avoid blocking startup on non-resolving Matrix sync start, preserve startup error propagation, prevent duplicate monitor listener registration, remove unreliable 2-member DM heuristics, accept `!room` IDs without alias resolution, and add matrix monitor/client regression coverage. Landed from contributor PR #31023 by @efe-arv. Thanks @efe-arv.
|
||||
- Discord/Reconnect watchdog: add a shared armable transport stall-watchdog and wire Discord gateway lifecycle force-stop semantics for silent close/reconnect zombies, with gateway/lifecycle watchdog regression coverage and runtime status liveness updates. Follow-up to contributor PR #31025 by @theotarr and PR #30530 by @liuxiaopai-ai. Thanks @theotarr and @liuxiaopai-ai.
|
||||
- Security/Skills: harden skill installer metadata parsing by rejecting unsafe installer specs (brew/node/go/uv/download) and constrain plugin-declared skill directories to the plugin root (including symlink-escape checks), with regression coverage.
|
||||
- Discord/DM command auth: unify DM allowlist + pairing-store authorization across message preflight and native command interactions so DM command gating is consistent for `open`/`pairing`/`allowlist` policies.
|
||||
- Sessions/Usage accounting: persist `cacheRead`/`cacheWrite` from the latest call snapshot (`lastCallUsage`) instead of accumulated multi-call totals, preventing inflated token/cost reporting in long tool/compaction runs. (#31005)
|
||||
|
||||
@@ -343,6 +343,11 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount> = {
|
||||
defaultRuntime: {
|
||||
accountId: DEFAULT_ACCOUNT_ID,
|
||||
running: false,
|
||||
connected: false,
|
||||
reconnectAttempts: 0,
|
||||
lastConnectedAt: null,
|
||||
lastDisconnect: null,
|
||||
lastEventAt: null,
|
||||
lastStartAt: null,
|
||||
lastStopAt: null,
|
||||
lastError: null,
|
||||
@@ -394,6 +399,11 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount> = {
|
||||
lastStartAt: runtime?.lastStartAt ?? null,
|
||||
lastStopAt: runtime?.lastStopAt ?? null,
|
||||
lastError: runtime?.lastError ?? null,
|
||||
connected: runtime?.connected ?? false,
|
||||
reconnectAttempts: runtime?.reconnectAttempts,
|
||||
lastConnectedAt: runtime?.lastConnectedAt ?? null,
|
||||
lastDisconnect: runtime?.lastDisconnect ?? null,
|
||||
lastEventAt: runtime?.lastEventAt ?? null,
|
||||
application: app ?? undefined,
|
||||
bot: bot ?? undefined,
|
||||
probe,
|
||||
@@ -445,6 +455,7 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount> = {
|
||||
abortSignal: ctx.abortSignal,
|
||||
mediaMaxMb: account.config.mediaMaxMb,
|
||||
historyLimit: account.config.historyLimit,
|
||||
setStatus: (patch) => ctx.setStatus({ accountId: account.accountId, ...patch }),
|
||||
});
|
||||
},
|
||||
},
|
||||
|
||||
74
src/channels/transport/stall-watchdog.test.ts
Normal file
74
src/channels/transport/stall-watchdog.test.ts
Normal file
@@ -0,0 +1,74 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createArmableStallWatchdog } from "./stall-watchdog.js";
|
||||
|
||||
describe("createArmableStallWatchdog", () => {
|
||||
it("fires onTimeout once when armed and idle exceeds timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const onTimeout = vi.fn();
|
||||
const watchdog = createArmableStallWatchdog({
|
||||
label: "test-watchdog",
|
||||
timeoutMs: 1_000,
|
||||
checkIntervalMs: 100,
|
||||
onTimeout,
|
||||
});
|
||||
|
||||
watchdog.arm();
|
||||
await vi.advanceTimersByTimeAsync(1_500);
|
||||
|
||||
expect(onTimeout).toHaveBeenCalledTimes(1);
|
||||
expect(watchdog.isArmed()).toBe(false);
|
||||
watchdog.stop();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not fire when disarmed before timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const onTimeout = vi.fn();
|
||||
const watchdog = createArmableStallWatchdog({
|
||||
label: "test-watchdog",
|
||||
timeoutMs: 1_000,
|
||||
checkIntervalMs: 100,
|
||||
onTimeout,
|
||||
});
|
||||
|
||||
watchdog.arm();
|
||||
await vi.advanceTimersByTimeAsync(500);
|
||||
watchdog.disarm();
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
|
||||
expect(onTimeout).not.toHaveBeenCalled();
|
||||
watchdog.stop();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("extends timeout window when touched", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const onTimeout = vi.fn();
|
||||
const watchdog = createArmableStallWatchdog({
|
||||
label: "test-watchdog",
|
||||
timeoutMs: 1_000,
|
||||
checkIntervalMs: 100,
|
||||
onTimeout,
|
||||
});
|
||||
|
||||
watchdog.arm();
|
||||
await vi.advanceTimersByTimeAsync(700);
|
||||
watchdog.touch();
|
||||
await vi.advanceTimersByTimeAsync(700);
|
||||
expect(onTimeout).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(400);
|
||||
expect(onTimeout).toHaveBeenCalledTimes(1);
|
||||
watchdog.stop();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
103
src/channels/transport/stall-watchdog.ts
Normal file
103
src/channels/transport/stall-watchdog.ts
Normal file
@@ -0,0 +1,103 @@
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
|
||||
export type StallWatchdogTimeoutMeta = {
|
||||
idleMs: number;
|
||||
timeoutMs: number;
|
||||
};
|
||||
|
||||
export type ArmableStallWatchdog = {
|
||||
arm: (atMs?: number) => void;
|
||||
touch: (atMs?: number) => void;
|
||||
disarm: () => void;
|
||||
stop: () => void;
|
||||
isArmed: () => boolean;
|
||||
};
|
||||
|
||||
export function createArmableStallWatchdog(params: {
|
||||
label: string;
|
||||
timeoutMs: number;
|
||||
checkIntervalMs?: number;
|
||||
abortSignal?: AbortSignal;
|
||||
runtime?: RuntimeEnv;
|
||||
onTimeout: (meta: StallWatchdogTimeoutMeta) => void;
|
||||
}): ArmableStallWatchdog {
|
||||
const timeoutMs = Math.max(1, Math.floor(params.timeoutMs));
|
||||
const checkIntervalMs = Math.max(
|
||||
100,
|
||||
Math.floor(params.checkIntervalMs ?? Math.min(5_000, Math.max(250, timeoutMs / 6))),
|
||||
);
|
||||
|
||||
let armed = false;
|
||||
let stopped = false;
|
||||
let lastActivityAt = Date.now();
|
||||
let timer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const clearTimer = () => {
|
||||
if (!timer) {
|
||||
return;
|
||||
}
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
};
|
||||
|
||||
const disarm = () => {
|
||||
armed = false;
|
||||
};
|
||||
|
||||
const stop = () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
disarm();
|
||||
clearTimer();
|
||||
params.abortSignal?.removeEventListener("abort", stop);
|
||||
};
|
||||
|
||||
const arm = (atMs?: number) => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
lastActivityAt = atMs ?? Date.now();
|
||||
armed = true;
|
||||
};
|
||||
|
||||
const touch = (atMs?: number) => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
lastActivityAt = atMs ?? Date.now();
|
||||
};
|
||||
|
||||
const check = () => {
|
||||
if (!armed || stopped) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
const idleMs = now - lastActivityAt;
|
||||
if (idleMs < timeoutMs) {
|
||||
return;
|
||||
}
|
||||
disarm();
|
||||
params.runtime?.error?.(
|
||||
`[${params.label}] transport watchdog timeout: idle ${Math.round(idleMs / 1000)}s (limit ${Math.round(timeoutMs / 1000)}s)`,
|
||||
);
|
||||
params.onTimeout({ idleMs, timeoutMs });
|
||||
};
|
||||
|
||||
if (params.abortSignal?.aborted) {
|
||||
stop();
|
||||
} else {
|
||||
params.abortSignal?.addEventListener("abort", stop, { once: true });
|
||||
timer = setInterval(check, checkIntervalMs);
|
||||
timer.unref?.();
|
||||
}
|
||||
|
||||
return {
|
||||
arm,
|
||||
touch,
|
||||
disarm,
|
||||
stop,
|
||||
isArmed: () => armed,
|
||||
};
|
||||
}
|
||||
@@ -81,4 +81,48 @@ describe("waitForDiscordGatewayStop", () => {
|
||||
|
||||
await expect(promise).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("rejects via registerForceStop and disconnects gateway", async () => {
|
||||
const emitter = new EventEmitter();
|
||||
const disconnect = vi.fn();
|
||||
const abort = new AbortController();
|
||||
let forceStop: ((err: unknown) => void) | undefined;
|
||||
|
||||
const promise = waitForDiscordGatewayStop({
|
||||
gateway: { emitter, disconnect },
|
||||
abortSignal: abort.signal,
|
||||
registerForceStop: (fn) => {
|
||||
forceStop = fn;
|
||||
},
|
||||
});
|
||||
|
||||
expect(forceStop).toBeDefined();
|
||||
|
||||
forceStop?.(new Error("reconnect watchdog timeout"));
|
||||
|
||||
await expect(promise).rejects.toThrow("reconnect watchdog timeout");
|
||||
expect(disconnect).toHaveBeenCalledTimes(1);
|
||||
expect(emitter.listenerCount("error")).toBe(0);
|
||||
});
|
||||
|
||||
it("ignores forceStop after promise already settled", async () => {
|
||||
const emitter = new EventEmitter();
|
||||
const disconnect = vi.fn();
|
||||
const abort = new AbortController();
|
||||
let forceStop: ((err: unknown) => void) | undefined;
|
||||
|
||||
const promise = waitForDiscordGatewayStop({
|
||||
gateway: { emitter, disconnect },
|
||||
abortSignal: abort.signal,
|
||||
registerForceStop: (fn) => {
|
||||
forceStop = fn;
|
||||
},
|
||||
});
|
||||
|
||||
abort.abort();
|
||||
await expect(promise).resolves.toBeUndefined();
|
||||
|
||||
forceStop?.(new Error("too late"));
|
||||
expect(disconnect).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -14,6 +14,7 @@ export async function waitForDiscordGatewayStop(params: {
|
||||
abortSignal?: AbortSignal;
|
||||
onGatewayError?: (err: unknown) => void;
|
||||
shouldStopOnError?: (err: unknown) => boolean;
|
||||
registerForceStop?: (forceStop: (err: unknown) => void) => void;
|
||||
}): Promise<void> {
|
||||
const { gateway, abortSignal, onGatewayError, shouldStopOnError } = params;
|
||||
const emitter = gateway?.emitter;
|
||||
@@ -57,6 +58,9 @@ export async function waitForDiscordGatewayStop(params: {
|
||||
finishReject(err);
|
||||
}
|
||||
};
|
||||
const onForceStop = (err: unknown) => {
|
||||
finishReject(err);
|
||||
};
|
||||
|
||||
if (abortSignal?.aborted) {
|
||||
onAbort();
|
||||
@@ -65,5 +69,6 @@ export async function waitForDiscordGatewayStop(params: {
|
||||
|
||||
abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
emitter?.on("error", onGatewayErrorEvent);
|
||||
params.registerForceStop?.(onForceStop);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ type DiscordReactionListenerParams = {
|
||||
allowNameMatching: boolean;
|
||||
guildEntries?: Record<string, import("./allow-list.js").DiscordGuildEntryResolved>;
|
||||
logger: Logger;
|
||||
onEvent?: () => void;
|
||||
};
|
||||
|
||||
const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000;
|
||||
@@ -123,11 +124,13 @@ export class DiscordMessageListener extends MessageCreateListener {
|
||||
constructor(
|
||||
private handler: DiscordMessageHandler,
|
||||
private logger?: Logger,
|
||||
private onEvent?: () => void,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async handle(data: DiscordMessageEvent, client: Client) {
|
||||
this.onEvent?.();
|
||||
// Release Carbon's dispatch lane immediately, but keep our message handler
|
||||
// serialized to avoid unbounded parallel model/IO work on traffic bursts.
|
||||
this.messageQueue = this.messageQueue
|
||||
@@ -157,6 +160,7 @@ export class DiscordReactionListener extends MessageReactionAddListener {
|
||||
}
|
||||
|
||||
async handle(data: DiscordReactionEvent, client: Client) {
|
||||
this.params.onEvent?.();
|
||||
await runDiscordReactionHandler({
|
||||
data,
|
||||
client,
|
||||
@@ -174,6 +178,7 @@ export class DiscordReactionRemoveListener extends MessageReactionRemoveListener
|
||||
}
|
||||
|
||||
async handle(data: DiscordReactionEvent, client: Client) {
|
||||
this.params.onEvent?.();
|
||||
await runDiscordReactionHandler({
|
||||
data,
|
||||
client,
|
||||
|
||||
@@ -310,7 +310,7 @@ describe("Discord model picker interactions", () => {
|
||||
.mockResolvedValue();
|
||||
const dispatchSpy = vi
|
||||
.spyOn(dispatcherModule, "dispatchReplyWithDispatcher")
|
||||
.mockImplementation(() => new Promise(() => {}) as never);
|
||||
.mockResolvedValue({} as never);
|
||||
const withTimeoutSpy = vi
|
||||
.spyOn(timeoutModule, "withTimeout")
|
||||
.mockRejectedValue(new Error("timeout"));
|
||||
|
||||
@@ -83,6 +83,7 @@ describe("runDiscordGatewayLifecycle", () => {
|
||||
start,
|
||||
stop,
|
||||
threadStop,
|
||||
runtimeLog,
|
||||
runtimeError,
|
||||
releaseEarlyGatewayErrorGuard,
|
||||
lifecycleParams: {
|
||||
@@ -315,4 +316,77 @@ describe("runDiscordGatewayLifecycle", () => {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("force-stops when reconnect stalls after a close event", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
|
||||
const emitter = new EventEmitter();
|
||||
const gateway = {
|
||||
isConnected: false,
|
||||
options: {},
|
||||
disconnect: vi.fn(),
|
||||
connect: vi.fn(),
|
||||
emitter,
|
||||
};
|
||||
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
|
||||
waitForDiscordGatewayStopMock.mockImplementationOnce(
|
||||
(waitParams: { registerForceStop?: (stop: (err: unknown) => void) => void }) =>
|
||||
new Promise<void>((_resolve, reject) => {
|
||||
waitParams.registerForceStop?.((err) => reject(err));
|
||||
}),
|
||||
);
|
||||
const { lifecycleParams } = createLifecycleHarness({ gateway });
|
||||
|
||||
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
|
||||
lifecyclePromise.catch(() => {});
|
||||
emitter.emit("debug", "WebSocket connection closed with code 1006");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000);
|
||||
await expect(lifecyclePromise).rejects.toThrow("reconnect watchdog timeout");
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not force-stop when reconnect resumes before watchdog timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
|
||||
const emitter = new EventEmitter();
|
||||
const gateway = {
|
||||
isConnected: false,
|
||||
options: {},
|
||||
disconnect: vi.fn(),
|
||||
connect: vi.fn(),
|
||||
emitter,
|
||||
};
|
||||
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
|
||||
let resolveWait: (() => void) | undefined;
|
||||
waitForDiscordGatewayStopMock.mockImplementationOnce(
|
||||
(waitParams: { registerForceStop?: (stop: (err: unknown) => void) => void }) =>
|
||||
new Promise<void>((resolve, reject) => {
|
||||
resolveWait = resolve;
|
||||
waitParams.registerForceStop?.((err) => reject(err));
|
||||
}),
|
||||
);
|
||||
const { lifecycleParams, runtimeLog } = createLifecycleHarness({ gateway });
|
||||
|
||||
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
|
||||
emitter.emit("debug", "WebSocket connection closed with code 1006");
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
|
||||
gateway.isConnected = true;
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(5 * 60_000 + 1_000);
|
||||
|
||||
expect(runtimeLog).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("reconnect watchdog timeout"),
|
||||
);
|
||||
resolveWait?.();
|
||||
await expect(lifecyclePromise).resolves.toBeUndefined();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import type { Client } from "@buape/carbon";
|
||||
import type { GatewayPlugin } from "@buape/carbon/gateway";
|
||||
import { createArmableStallWatchdog } from "../../channels/transport/stall-watchdog.js";
|
||||
import { danger } from "../../globals.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
import { attachDiscordGatewayLogging } from "../gateway-logging.js";
|
||||
import { getDiscordGatewayEmitter, waitForDiscordGatewayStop } from "../monitor.gateway.js";
|
||||
import type { DiscordVoiceManager } from "../voice/manager.js";
|
||||
import { registerGateway, unregisterGateway } from "./gateway-registry.js";
|
||||
import type { DiscordMonitorStatusSink } from "./status.js";
|
||||
|
||||
type ExecApprovalsHandler = {
|
||||
start: () => Promise<void>;
|
||||
@@ -24,7 +26,12 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
threadBindings: { stop: () => void };
|
||||
pendingGatewayErrors?: unknown[];
|
||||
releaseEarlyGatewayErrorGuard?: () => void;
|
||||
statusSink?: DiscordMonitorStatusSink;
|
||||
}) {
|
||||
const HELLO_TIMEOUT_MS = 30000;
|
||||
const HELLO_CONNECTED_POLL_MS = 250;
|
||||
const MAX_CONSECUTIVE_HELLO_STALLS = 3;
|
||||
const RECONNECT_STALL_TIMEOUT_MS = 5 * 60_000;
|
||||
const gateway = params.client.getPlugin<GatewayPlugin>("gateway");
|
||||
if (gateway) {
|
||||
registerGateway(params.accountId, gateway);
|
||||
@@ -34,8 +41,58 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
emitter: gatewayEmitter,
|
||||
runtime: params.runtime,
|
||||
});
|
||||
let lifecycleStopping = false;
|
||||
let forceStopHandler: ((err: unknown) => void) | undefined;
|
||||
let queuedForceStopError: unknown;
|
||||
|
||||
const pushStatus = (patch: Parameters<DiscordMonitorStatusSink>[0]) => {
|
||||
params.statusSink?.(patch);
|
||||
};
|
||||
|
||||
const triggerForceStop = (err: unknown) => {
|
||||
if (forceStopHandler) {
|
||||
forceStopHandler(err);
|
||||
return;
|
||||
}
|
||||
queuedForceStopError = err;
|
||||
};
|
||||
|
||||
const reconnectStallWatchdog = createArmableStallWatchdog({
|
||||
label: `discord:${params.accountId}:reconnect`,
|
||||
timeoutMs: RECONNECT_STALL_TIMEOUT_MS,
|
||||
abortSignal: params.abortSignal,
|
||||
runtime: params.runtime,
|
||||
onTimeout: () => {
|
||||
if (params.abortSignal?.aborted || lifecycleStopping) {
|
||||
return;
|
||||
}
|
||||
const at = Date.now();
|
||||
const error = new Error(
|
||||
`discord reconnect watchdog timeout after ${RECONNECT_STALL_TIMEOUT_MS}ms`,
|
||||
);
|
||||
pushStatus({
|
||||
connected: false,
|
||||
lastEventAt: at,
|
||||
lastDisconnect: {
|
||||
at,
|
||||
error: error.message,
|
||||
},
|
||||
lastError: error.message,
|
||||
});
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord: reconnect watchdog timeout after ${RECONNECT_STALL_TIMEOUT_MS}ms; force-stopping monitor task`,
|
||||
),
|
||||
);
|
||||
triggerForceStop(error);
|
||||
},
|
||||
});
|
||||
|
||||
const onAbort = () => {
|
||||
lifecycleStopping = true;
|
||||
reconnectStallWatchdog.disarm();
|
||||
const at = Date.now();
|
||||
pushStatus({ connected: false, lastEventAt: at });
|
||||
if (!gateway) {
|
||||
return;
|
||||
}
|
||||
@@ -50,9 +107,6 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
params.abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
}
|
||||
|
||||
const HELLO_TIMEOUT_MS = 30000;
|
||||
const HELLO_CONNECTED_POLL_MS = 250;
|
||||
const MAX_CONSECUTIVE_HELLO_STALLS = 3;
|
||||
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
let helloConnectedPollId: ReturnType<typeof setInterval> | undefined;
|
||||
let consecutiveHelloStalls = 0;
|
||||
@@ -69,6 +123,14 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
const resetHelloStallCounter = () => {
|
||||
consecutiveHelloStalls = 0;
|
||||
};
|
||||
const parseGatewayCloseCode = (message: string): number | undefined => {
|
||||
const match = /code\s+(\d{3,5})/i.exec(message);
|
||||
if (!match?.[1]) {
|
||||
return undefined;
|
||||
}
|
||||
const code = Number.parseInt(match[1], 10);
|
||||
return Number.isFinite(code) ? code : undefined;
|
||||
};
|
||||
const clearResumeState = () => {
|
||||
const mutableGateway = gateway as
|
||||
| (GatewayPlugin & {
|
||||
@@ -90,27 +152,53 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
};
|
||||
const onGatewayDebug = (msg: unknown) => {
|
||||
const message = String(msg);
|
||||
const at = Date.now();
|
||||
pushStatus({ lastEventAt: at });
|
||||
if (message.includes("WebSocket connection closed")) {
|
||||
// Carbon marks `isConnected` true only after READY/RESUMED and flips it
|
||||
// false during reconnect handling after this debug line is emitted.
|
||||
if (gateway?.isConnected) {
|
||||
resetHelloStallCounter();
|
||||
}
|
||||
reconnectStallWatchdog.arm(at);
|
||||
pushStatus({
|
||||
connected: false,
|
||||
lastDisconnect: {
|
||||
at,
|
||||
status: parseGatewayCloseCode(message),
|
||||
},
|
||||
});
|
||||
clearHelloWatch();
|
||||
return;
|
||||
}
|
||||
if (!message.includes("WebSocket connection opened")) {
|
||||
return;
|
||||
}
|
||||
reconnectStallWatchdog.disarm();
|
||||
clearHelloWatch();
|
||||
|
||||
let sawConnected = gateway?.isConnected === true;
|
||||
if (sawConnected) {
|
||||
pushStatus({
|
||||
connected: true,
|
||||
lastConnectedAt: at,
|
||||
lastDisconnect: null,
|
||||
});
|
||||
}
|
||||
helloConnectedPollId = setInterval(() => {
|
||||
if (!gateway?.isConnected) {
|
||||
return;
|
||||
}
|
||||
sawConnected = true;
|
||||
resetHelloStallCounter();
|
||||
const connectedAt = Date.now();
|
||||
reconnectStallWatchdog.disarm();
|
||||
pushStatus({
|
||||
connected: true,
|
||||
lastEventAt: connectedAt,
|
||||
lastConnectedAt: connectedAt,
|
||||
lastDisconnect: null,
|
||||
});
|
||||
if (helloConnectedPollId) {
|
||||
clearInterval(helloConnectedPollId);
|
||||
helloConnectedPollId = undefined;
|
||||
@@ -127,6 +215,16 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
} else {
|
||||
consecutiveHelloStalls += 1;
|
||||
const forceFreshIdentify = consecutiveHelloStalls >= MAX_CONSECUTIVE_HELLO_STALLS;
|
||||
const stalledAt = Date.now();
|
||||
reconnectStallWatchdog.arm(stalledAt);
|
||||
pushStatus({
|
||||
connected: false,
|
||||
lastEventAt: stalledAt,
|
||||
lastDisconnect: {
|
||||
at: stalledAt,
|
||||
error: "hello-timeout",
|
||||
},
|
||||
});
|
||||
params.runtime.log?.(
|
||||
danger(
|
||||
forceFreshIdentify
|
||||
@@ -199,15 +297,25 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
abortSignal: params.abortSignal,
|
||||
onGatewayError: logGatewayError,
|
||||
shouldStopOnError: shouldStopOnGatewayError,
|
||||
registerForceStop: (forceStop) => {
|
||||
forceStopHandler = forceStop;
|
||||
if (queuedForceStopError !== undefined) {
|
||||
const queued = queuedForceStopError;
|
||||
queuedForceStopError = undefined;
|
||||
forceStop(queued);
|
||||
}
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
if (!sawDisallowedIntents && !params.isDisallowedIntentsError(err)) {
|
||||
throw err;
|
||||
}
|
||||
} finally {
|
||||
lifecycleStopping = true;
|
||||
params.releaseEarlyGatewayErrorGuard?.();
|
||||
unregisterGateway(params.accountId);
|
||||
stopGatewayLogging();
|
||||
reconnectStallWatchdog.stop();
|
||||
clearHelloWatch();
|
||||
gatewayEmitter?.removeListener("debug", onGatewayDebug);
|
||||
params.abortSignal?.removeEventListener("abort", onAbort);
|
||||
|
||||
@@ -71,6 +71,7 @@ import { resolveDiscordPresenceUpdate } from "./presence.js";
|
||||
import { resolveDiscordAllowlistConfig } from "./provider.allowlist.js";
|
||||
import { runDiscordGatewayLifecycle } from "./provider.lifecycle.js";
|
||||
import { resolveDiscordRestFetch } from "./rest-fetch.js";
|
||||
import type { DiscordMonitorStatusSink } from "./status.js";
|
||||
import {
|
||||
createNoopThreadBindingManager,
|
||||
createThreadBindingManager,
|
||||
@@ -87,6 +88,7 @@ export type MonitorDiscordOpts = {
|
||||
mediaMaxMb?: number;
|
||||
historyLimit?: number;
|
||||
replyToMode?: ReplyToMode;
|
||||
setStatus?: DiscordMonitorStatusSink;
|
||||
};
|
||||
|
||||
function summarizeAllowList(list?: string[]) {
|
||||
@@ -590,8 +592,17 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
threadBindings,
|
||||
discordRestFetch,
|
||||
});
|
||||
const trackInboundEvent = opts.setStatus
|
||||
? () => {
|
||||
const at = Date.now();
|
||||
opts.setStatus?.({ lastEventAt: at, lastInboundAt: at });
|
||||
}
|
||||
: undefined;
|
||||
|
||||
registerDiscordListener(client.listeners, new DiscordMessageListener(messageHandler, logger));
|
||||
registerDiscordListener(
|
||||
client.listeners,
|
||||
new DiscordMessageListener(messageHandler, logger, trackInboundEvent),
|
||||
);
|
||||
registerDiscordListener(
|
||||
client.listeners,
|
||||
new DiscordReactionListener({
|
||||
@@ -608,6 +619,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
allowNameMatching: isDangerousNameMatchingEnabled(discordCfg),
|
||||
guildEntries,
|
||||
logger,
|
||||
onEvent: trackInboundEvent,
|
||||
}),
|
||||
);
|
||||
registerDiscordListener(
|
||||
@@ -626,6 +638,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
allowNameMatching: isDangerousNameMatchingEnabled(discordCfg),
|
||||
guildEntries,
|
||||
logger,
|
||||
onEvent: trackInboundEvent,
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -645,6 +658,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
client,
|
||||
runtime,
|
||||
abortSignal: opts.abortSignal,
|
||||
statusSink: opts.setStatus,
|
||||
isDisallowedIntentsError: isDiscordDisallowedIntentsError,
|
||||
voiceManager,
|
||||
voiceManagerRef,
|
||||
|
||||
18
src/discord/monitor/status.ts
Normal file
18
src/discord/monitor/status.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
export type DiscordMonitorStatusPatch = {
|
||||
connected?: boolean;
|
||||
lastEventAt?: number | null;
|
||||
lastConnectedAt?: number | null;
|
||||
lastDisconnect?:
|
||||
| string
|
||||
| {
|
||||
at: number;
|
||||
status?: number;
|
||||
error?: string;
|
||||
loggedOut?: boolean;
|
||||
}
|
||||
| null;
|
||||
lastInboundAt?: number | null;
|
||||
lastError?: string | null;
|
||||
};
|
||||
|
||||
export type DiscordMonitorStatusSink = (patch: DiscordMonitorStatusPatch) => void;
|
||||
Reference in New Issue
Block a user