fix(telegram): add download timeout to prevent polling loop hang (#40098)
Merged via squash. Prepared head SHA: abdfa1a35f41615aaa52e06b5ba9581eeb8f8a6a Co-authored-by: tysoncung <45380903+tysoncung@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus
This commit is contained in:
@@ -49,6 +49,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/Control UI: resolve bundled dashboard assets through symlinked global wrappers and auto-detected package roots, while keeping configured and custom roots on the strict hardlink boundary. (#40385) Thanks @LarytheLord.
|
||||
- Docs/Changelog: correct the contributor credit for the bundled Control UI global-install fix to @LarytheLord. (#40420) Thanks @velvet-shark.
|
||||
- Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii.
|
||||
- Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung.
|
||||
|
||||
## 2026.3.7
|
||||
|
||||
|
||||
@@ -36,17 +36,16 @@ const renderGatewayPortHealthDiagnostics = vi.fn(() => ["diag: unhealthy port"])
|
||||
const renderRestartDiagnostics = vi.fn(() => ["diag: unhealthy runtime"]);
|
||||
const resolveGatewayPort = vi.fn(() => 18789);
|
||||
const findGatewayPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []);
|
||||
const probeGateway =
|
||||
vi.fn<
|
||||
(opts: {
|
||||
url: string;
|
||||
auth?: { token?: string; password?: string };
|
||||
timeoutMs: number;
|
||||
}) => Promise<{
|
||||
ok: boolean;
|
||||
configSnapshot: unknown;
|
||||
}>
|
||||
>();
|
||||
const probeGateway = vi.fn<
|
||||
(opts: {
|
||||
url: string;
|
||||
auth?: { token?: string; password?: string };
|
||||
timeoutMs: number;
|
||||
}) => Promise<{
|
||||
ok: boolean;
|
||||
configSnapshot: unknown;
|
||||
}>
|
||||
>();
|
||||
const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true);
|
||||
const loadConfig = vi.fn(() => ({}));
|
||||
|
||||
|
||||
@@ -12,6 +12,19 @@ function makeStream(chunks: Uint8Array[]) {
|
||||
});
|
||||
}
|
||||
|
||||
function makeStallingFetch(firstChunk: Uint8Array) {
|
||||
return vi.fn(async () => {
|
||||
return new Response(
|
||||
new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(firstChunk);
|
||||
},
|
||||
}),
|
||||
{ status: 200 },
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
describe("fetchRemoteMedia", () => {
|
||||
type LookupFn = NonNullable<Parameters<typeof fetchRemoteMedia>[0]["lookupFn"]>;
|
||||
|
||||
@@ -54,6 +67,26 @@ describe("fetchRemoteMedia", () => {
|
||||
).rejects.toThrow("exceeds maxBytes");
|
||||
});
|
||||
|
||||
it("aborts stalled body reads when idle timeout expires", async () => {
|
||||
const lookupFn = vi.fn(async () => [
|
||||
{ address: "93.184.216.34", family: 4 },
|
||||
]) as unknown as LookupFn;
|
||||
const fetchImpl = makeStallingFetch(new Uint8Array([1, 2]));
|
||||
|
||||
await expect(
|
||||
fetchRemoteMedia({
|
||||
url: "https://example.com/file.bin",
|
||||
fetchImpl,
|
||||
lookupFn,
|
||||
maxBytes: 1024,
|
||||
readIdleTimeoutMs: 20,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
code: "fetch_failed",
|
||||
name: "MediaFetchError",
|
||||
});
|
||||
}, 5_000);
|
||||
|
||||
it("blocks private IP literals before fetching", async () => {
|
||||
const fetchImpl = vi.fn();
|
||||
await expect(
|
||||
|
||||
@@ -31,6 +31,8 @@ type FetchMediaOptions = {
|
||||
filePathHint?: string;
|
||||
maxBytes?: number;
|
||||
maxRedirects?: number;
|
||||
/** Abort if the response body stops yielding data for this long (ms). */
|
||||
readIdleTimeoutMs?: number;
|
||||
ssrfPolicy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
};
|
||||
@@ -87,6 +89,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
|
||||
filePathHint,
|
||||
maxBytes,
|
||||
maxRedirects,
|
||||
readIdleTimeoutMs,
|
||||
ssrfPolicy,
|
||||
lookupFn,
|
||||
} = options;
|
||||
@@ -142,15 +145,27 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
|
||||
}
|
||||
}
|
||||
|
||||
const buffer = maxBytes
|
||||
? await readResponseWithLimit(res, maxBytes, {
|
||||
onOverflow: ({ maxBytes, res }) =>
|
||||
new MediaFetchError(
|
||||
"max_bytes",
|
||||
`Failed to fetch media from ${res.url || url}: payload exceeds maxBytes ${maxBytes}`,
|
||||
),
|
||||
})
|
||||
: Buffer.from(await res.arrayBuffer());
|
||||
let buffer: Buffer;
|
||||
try {
|
||||
buffer = maxBytes
|
||||
? await readResponseWithLimit(res, maxBytes, {
|
||||
onOverflow: ({ maxBytes, res }) =>
|
||||
new MediaFetchError(
|
||||
"max_bytes",
|
||||
`Failed to fetch media from ${res.url || url}: payload exceeds maxBytes ${maxBytes}`,
|
||||
),
|
||||
chunkTimeoutMs: readIdleTimeoutMs,
|
||||
})
|
||||
: Buffer.from(await res.arrayBuffer());
|
||||
} catch (err) {
|
||||
if (err instanceof MediaFetchError) {
|
||||
throw err;
|
||||
}
|
||||
throw new MediaFetchError(
|
||||
"fetch_failed",
|
||||
`Failed to fetch media from ${res.url || url}: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
let fileNameFromUrl: string | undefined;
|
||||
try {
|
||||
const parsed = new URL(finalUrl);
|
||||
|
||||
66
src/media/read-response-with-limit.test.ts
Normal file
66
src/media/read-response-with-limit.test.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { readResponseWithLimit } from "./read-response-with-limit.js";
|
||||
|
||||
function makeStream(chunks: Uint8Array[], delayMs?: number) {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
for (const chunk of chunks) {
|
||||
if (delayMs) {
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
}
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function makeStallingStream(initialChunks: Uint8Array[]) {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
for (const chunk of initialChunks) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
describe("readResponseWithLimit", () => {
|
||||
it("reads all chunks within the limit", async () => {
|
||||
const body = makeStream([new Uint8Array([1, 2]), new Uint8Array([3, 4])]);
|
||||
const res = new Response(body);
|
||||
const buf = await readResponseWithLimit(res, 100);
|
||||
expect(buf).toEqual(Buffer.from([1, 2, 3, 4]));
|
||||
});
|
||||
|
||||
it("throws when total exceeds maxBytes", async () => {
|
||||
const body = makeStream([new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])]);
|
||||
const res = new Response(body);
|
||||
await expect(readResponseWithLimit(res, 4)).rejects.toThrow(/too large/i);
|
||||
});
|
||||
|
||||
it("calls custom onOverflow", async () => {
|
||||
const body = makeStream([new Uint8Array(10)]);
|
||||
const res = new Response(body);
|
||||
await expect(
|
||||
readResponseWithLimit(res, 5, {
|
||||
onOverflow: ({ size, maxBytes }) => new Error(`custom: ${size} > ${maxBytes}`),
|
||||
}),
|
||||
).rejects.toThrow("custom: 10 > 5");
|
||||
});
|
||||
|
||||
it("times out when no new chunk arrives before idle timeout", async () => {
|
||||
const body = makeStallingStream([new Uint8Array([1, 2])]);
|
||||
const res = new Response(body);
|
||||
await expect(readResponseWithLimit(res, 1024, { chunkTimeoutMs: 50 })).rejects.toThrow(
|
||||
/stalled/i,
|
||||
);
|
||||
}, 5_000);
|
||||
|
||||
it("does not time out while chunks keep arriving", async () => {
|
||||
const body = makeStream([new Uint8Array([1]), new Uint8Array([2])], 10);
|
||||
const res = new Response(body);
|
||||
const buf = await readResponseWithLimit(res, 100, { chunkTimeoutMs: 500 });
|
||||
expect(buf).toEqual(Buffer.from([1, 2]));
|
||||
});
|
||||
});
|
||||
@@ -1,14 +1,55 @@
|
||||
async function readChunkWithIdleTimeout(
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
chunkTimeoutMs: number,
|
||||
): Promise<ReadableStreamReadResult<Uint8Array>> {
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
let timedOut = false;
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const clear = () => {
|
||||
if (timeoutId !== undefined) {
|
||||
clearTimeout(timeoutId);
|
||||
timeoutId = undefined;
|
||||
}
|
||||
};
|
||||
|
||||
timeoutId = setTimeout(() => {
|
||||
timedOut = true;
|
||||
clear();
|
||||
void reader.cancel().catch(() => undefined);
|
||||
reject(new Error(`Media download stalled: no data received for ${chunkTimeoutMs}ms`));
|
||||
}, chunkTimeoutMs);
|
||||
|
||||
void reader.read().then(
|
||||
(result) => {
|
||||
clear();
|
||||
if (!timedOut) {
|
||||
resolve(result);
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
clear();
|
||||
if (!timedOut) {
|
||||
reject(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
export async function readResponseWithLimit(
|
||||
res: Response,
|
||||
maxBytes: number,
|
||||
opts?: {
|
||||
onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error;
|
||||
chunkTimeoutMs?: number;
|
||||
},
|
||||
): Promise<Buffer> {
|
||||
const onOverflow =
|
||||
opts?.onOverflow ??
|
||||
((params: { size: number; maxBytes: number }) =>
|
||||
new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`));
|
||||
const chunkTimeoutMs = opts?.chunkTimeoutMs;
|
||||
|
||||
const body = res.body;
|
||||
if (!body || typeof body.getReader !== "function") {
|
||||
@@ -24,7 +65,9 @@ export async function readResponseWithLimit(
|
||||
let total = 0;
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
const { done, value } = chunkTimeoutMs
|
||||
? await readChunkWithIdleTimeout(reader, chunkTimeoutMs)
|
||||
: await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -100,6 +100,9 @@ function resolveRequiredFetchImpl(proxyFetch?: typeof fetch): typeof fetch {
|
||||
return fetchImpl;
|
||||
}
|
||||
|
||||
/** Default idle timeout for Telegram media downloads (30 seconds). */
|
||||
const TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS = 30_000;
|
||||
|
||||
async function downloadAndSaveTelegramFile(params: {
|
||||
filePath: string;
|
||||
token: string;
|
||||
@@ -113,6 +116,7 @@ async function downloadAndSaveTelegramFile(params: {
|
||||
fetchImpl: params.fetchImpl,
|
||||
filePathHint: params.filePath,
|
||||
maxBytes: params.maxBytes,
|
||||
readIdleTimeoutMs: TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS,
|
||||
ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY,
|
||||
});
|
||||
const originalName = params.telegramFileName ?? fetched.fileName ?? params.filePath;
|
||||
|
||||
Reference in New Issue
Block a user