fix(media): retain inbound media with recursive cleanup TTL (#38292)
* Config: add media retention TTL setting * Media: recurse persisted media cleanup * Gateway: add persisted media cleanup timer * Media: harden retention cleanup sweep * Media: make recursive retention cleanup opt-in * Media: retry writes after empty-dir cleanup race
This commit is contained in:
@@ -423,9 +423,11 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"nodeHost.browserProxy.allowProfiles":
|
||||
"Optional allowlist of browser profile names exposed through node proxy routing. Leave empty to expose all configured profiles, or use a tight list to enforce least-privilege profile access.",
|
||||
media:
|
||||
"Top-level media behavior shared across providers and tools that handle inbound files. Keep defaults unless you need stable filenames for external processing pipelines.",
|
||||
"Top-level media behavior shared across providers and tools that handle inbound files. Keep defaults unless you need stable filenames for external processing pipelines or longer-lived inbound media retention.",
|
||||
"media.preserveFilenames":
|
||||
"When enabled, uploaded media keeps its original filename instead of a generated temp-safe name. Turn this on when downstream automations depend on stable names, and leave off to reduce accidental filename leakage.",
|
||||
"media.ttlHours":
|
||||
"Optional retention window in hours for persisted inbound media cleanup across the full media tree. Leave unset to preserve legacy behavior, or set values like 24 (1 day) or 168 (7 days) when you want automatic cleanup.",
|
||||
audio:
|
||||
"Global audio ingestion settings used before higher-level tools process speech or media content. Configure this when you need deterministic transcription behavior for voice notes and clips.",
|
||||
"audio.transcription":
|
||||
|
||||
@@ -278,6 +278,7 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"nodeHost.browserProxy.allowProfiles": "Node Browser Proxy Allowed Profiles",
|
||||
media: "Media",
|
||||
"media.preserveFilenames": "Preserve Media Filenames",
|
||||
"media.ttlHours": "Media Retention TTL (hours)",
|
||||
audio: "Audio",
|
||||
"audio.transcription": "Audio Transcription",
|
||||
"audio.transcription.command": "Audio Transcription Command",
|
||||
|
||||
@@ -101,6 +101,12 @@ export type OpenClawConfig = {
|
||||
bindings?: AgentBinding[];
|
||||
broadcast?: BroadcastConfig;
|
||||
audio?: AudioConfig;
|
||||
media?: {
|
||||
/** Preserve original uploaded filenames when storing inbound media. */
|
||||
preserveFilenames?: boolean;
|
||||
/** Optional retention window for persisted inbound media cleanup. */
|
||||
ttlHours?: number;
|
||||
};
|
||||
messages?: MessagesConfig;
|
||||
commands?: CommandsConfig;
|
||||
approvals?: ApprovalsConfig;
|
||||
|
||||
@@ -423,6 +423,12 @@ export const OpenClawSchema = z
|
||||
media: z
|
||||
.object({
|
||||
preserveFilenames: z.boolean().optional(),
|
||||
ttlHours: z
|
||||
.number()
|
||||
.int()
|
||||
.min(1)
|
||||
.max(24 * 7)
|
||||
.optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
|
||||
@@ -21,6 +21,7 @@ export function createGatewayCloseHandler(params: {
|
||||
tickInterval: ReturnType<typeof setInterval>;
|
||||
healthInterval: ReturnType<typeof setInterval>;
|
||||
dedupeCleanup: ReturnType<typeof setInterval>;
|
||||
mediaCleanup: ReturnType<typeof setInterval> | null;
|
||||
agentUnsub: (() => void) | null;
|
||||
heartbeatUnsub: (() => void) | null;
|
||||
chatRunState: { clear: () => void };
|
||||
@@ -87,6 +88,9 @@ export function createGatewayCloseHandler(params: {
|
||||
clearInterval(params.tickInterval);
|
||||
clearInterval(params.healthInterval);
|
||||
clearInterval(params.dedupeCleanup);
|
||||
if (params.mediaCleanup) {
|
||||
clearInterval(params.mediaCleanup);
|
||||
}
|
||||
if (params.agentUnsub) {
|
||||
try {
|
||||
params.agentUnsub();
|
||||
|
||||
142
src/gateway/server-maintenance.test.ts
Normal file
142
src/gateway/server-maintenance.test.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
|
||||
const cleanOldMediaMock = vi.fn(async () => {});
|
||||
|
||||
vi.mock("../media/store.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../media/store.js")>();
|
||||
return {
|
||||
...actual,
|
||||
cleanOldMedia: cleanOldMediaMock,
|
||||
};
|
||||
});
|
||||
|
||||
describe("startGatewayMaintenanceTimers", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("does not schedule recursive media cleanup unless ttl is configured", async () => {
|
||||
vi.useFakeTimers();
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
|
||||
const timers = startGatewayMaintenanceTimers({
|
||||
broadcast: () => {},
|
||||
nodeSendToAllSubscribed: () => {},
|
||||
getPresenceVersion: () => 1,
|
||||
getHealthVersion: () => 1,
|
||||
refreshGatewayHealthSnapshot: async () => ({ ok: true }) as HealthSummary,
|
||||
logHealth: { error: () => {} },
|
||||
dedupe: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatRunState: { abortedRuns: new Map() },
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
removeChatRun: () => undefined,
|
||||
agentRunSeq: new Map(),
|
||||
nodeSendToSession: () => {},
|
||||
});
|
||||
|
||||
expect(cleanOldMediaMock).not.toHaveBeenCalled();
|
||||
expect(timers.mediaCleanup).toBeNull();
|
||||
|
||||
clearInterval(timers.tickInterval);
|
||||
clearInterval(timers.healthInterval);
|
||||
clearInterval(timers.dedupeCleanup);
|
||||
});
|
||||
|
||||
it("runs startup media cleanup and repeats it hourly", async () => {
|
||||
vi.useFakeTimers();
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
|
||||
const timers = startGatewayMaintenanceTimers({
|
||||
broadcast: () => {},
|
||||
nodeSendToAllSubscribed: () => {},
|
||||
getPresenceVersion: () => 1,
|
||||
getHealthVersion: () => 1,
|
||||
refreshGatewayHealthSnapshot: async () => ({ ok: true }) as HealthSummary,
|
||||
logHealth: { error: () => {} },
|
||||
dedupe: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatRunState: { abortedRuns: new Map() },
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
removeChatRun: () => undefined,
|
||||
agentRunSeq: new Map(),
|
||||
nodeSendToSession: () => {},
|
||||
mediaCleanupTtlMs: 24 * 60 * 60_000,
|
||||
});
|
||||
|
||||
expect(cleanOldMediaMock).toHaveBeenCalledWith(24 * 60 * 60_000, {
|
||||
recursive: true,
|
||||
pruneEmptyDirs: true,
|
||||
});
|
||||
|
||||
cleanOldMediaMock.mockClear();
|
||||
await vi.advanceTimersByTimeAsync(60 * 60_000);
|
||||
expect(cleanOldMediaMock).toHaveBeenCalledWith(24 * 60 * 60_000, {
|
||||
recursive: true,
|
||||
pruneEmptyDirs: true,
|
||||
});
|
||||
|
||||
clearInterval(timers.tickInterval);
|
||||
clearInterval(timers.healthInterval);
|
||||
clearInterval(timers.dedupeCleanup);
|
||||
if (timers.mediaCleanup) {
|
||||
clearInterval(timers.mediaCleanup);
|
||||
}
|
||||
});
|
||||
|
||||
it("skips overlapping media cleanup runs", async () => {
|
||||
vi.useFakeTimers();
|
||||
let resolveCleanup = () => {};
|
||||
let cleanupReady = false;
|
||||
cleanOldMediaMock.mockImplementation(
|
||||
() =>
|
||||
new Promise<void>((resolve) => {
|
||||
resolveCleanup = resolve;
|
||||
cleanupReady = true;
|
||||
}),
|
||||
);
|
||||
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
|
||||
|
||||
const timers = startGatewayMaintenanceTimers({
|
||||
broadcast: () => {},
|
||||
nodeSendToAllSubscribed: () => {},
|
||||
getPresenceVersion: () => 1,
|
||||
getHealthVersion: () => 1,
|
||||
refreshGatewayHealthSnapshot: async () => ({ ok: true }) as HealthSummary,
|
||||
logHealth: { error: () => {} },
|
||||
dedupe: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatRunState: { abortedRuns: new Map() },
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
removeChatRun: () => undefined,
|
||||
agentRunSeq: new Map(),
|
||||
nodeSendToSession: () => {},
|
||||
mediaCleanupTtlMs: 24 * 60 * 60_000,
|
||||
});
|
||||
|
||||
expect(cleanOldMediaMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60 * 60_000);
|
||||
expect(cleanOldMediaMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
if (cleanupReady) {
|
||||
resolveCleanup();
|
||||
}
|
||||
await Promise.resolve();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60 * 60_000);
|
||||
expect(cleanOldMediaMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
clearInterval(timers.tickInterval);
|
||||
clearInterval(timers.healthInterval);
|
||||
clearInterval(timers.dedupeCleanup);
|
||||
if (timers.mediaCleanup) {
|
||||
clearInterval(timers.mediaCleanup);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import { cleanOldMedia } from "../media/store.js";
|
||||
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import type { ChatRunEntry } from "./server-chat.js";
|
||||
import {
|
||||
@@ -37,10 +38,12 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
) => ChatRunEntry | undefined;
|
||||
agentRunSeq: Map<string, number>;
|
||||
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
|
||||
mediaCleanupTtlMs?: number;
|
||||
}): {
|
||||
tickInterval: ReturnType<typeof setInterval>;
|
||||
healthInterval: ReturnType<typeof setInterval>;
|
||||
dedupeCleanup: ReturnType<typeof setInterval>;
|
||||
mediaCleanup: ReturnType<typeof setInterval> | null;
|
||||
} {
|
||||
setBroadcastHealthUpdate((snap: HealthSummary) => {
|
||||
params.broadcast("health", snap, {
|
||||
@@ -129,5 +132,33 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
}
|
||||
}, 60_000);
|
||||
|
||||
return { tickInterval, healthInterval, dedupeCleanup };
|
||||
if (typeof params.mediaCleanupTtlMs !== "number") {
|
||||
return { tickInterval, healthInterval, dedupeCleanup, mediaCleanup: null };
|
||||
}
|
||||
|
||||
let mediaCleanupInFlight: Promise<void> | null = null;
|
||||
const runMediaCleanup = () => {
|
||||
if (mediaCleanupInFlight) {
|
||||
return mediaCleanupInFlight;
|
||||
}
|
||||
mediaCleanupInFlight = cleanOldMedia(params.mediaCleanupTtlMs, {
|
||||
recursive: true,
|
||||
pruneEmptyDirs: true,
|
||||
})
|
||||
.catch((err) => {
|
||||
params.logHealth.error(`media cleanup failed: ${formatError(err)}`);
|
||||
})
|
||||
.finally(() => {
|
||||
mediaCleanupInFlight = null;
|
||||
});
|
||||
return mediaCleanupInFlight;
|
||||
};
|
||||
|
||||
const mediaCleanup = setInterval(() => {
|
||||
void runMediaCleanup();
|
||||
}, 60 * 60_000);
|
||||
|
||||
void runMediaCleanup();
|
||||
|
||||
return { tickInterval, healthInterval, dedupeCleanup, mediaCleanup };
|
||||
}
|
||||
|
||||
@@ -119,6 +119,17 @@ export { __resetModelCatalogCacheForTest } from "./server-model-catalog.js";
|
||||
|
||||
ensureOpenClawCliOnPath();
|
||||
|
||||
const MAX_MEDIA_TTL_HOURS = 24 * 7;
|
||||
|
||||
function resolveMediaCleanupTtlMs(ttlHoursRaw: number): number {
|
||||
const ttlHours = Math.min(Math.max(ttlHoursRaw, 1), MAX_MEDIA_TTL_HOURS);
|
||||
const ttlMs = ttlHours * 60 * 60_000;
|
||||
if (!Number.isFinite(ttlMs) || !Number.isSafeInteger(ttlMs)) {
|
||||
throw new Error(`Invalid media.ttlHours: ${String(ttlHoursRaw)}`);
|
||||
}
|
||||
return ttlMs;
|
||||
}
|
||||
|
||||
const log = createSubsystemLogger("gateway");
|
||||
const logCanvas = log.child("canvas");
|
||||
const logDiscovery = log.child("discovery");
|
||||
@@ -680,8 +691,9 @@ export async function startGatewayServer(
|
||||
let tickInterval = noopInterval();
|
||||
let healthInterval = noopInterval();
|
||||
let dedupeCleanup = noopInterval();
|
||||
let mediaCleanup: ReturnType<typeof setInterval> | null = null;
|
||||
if (!minimalTestGateway) {
|
||||
({ tickInterval, healthInterval, dedupeCleanup } = startGatewayMaintenanceTimers({
|
||||
({ tickInterval, healthInterval, dedupeCleanup, mediaCleanup } = startGatewayMaintenanceTimers({
|
||||
broadcast,
|
||||
nodeSendToAllSubscribed,
|
||||
getPresenceVersion,
|
||||
@@ -696,6 +708,9 @@ export async function startGatewayServer(
|
||||
removeChatRun,
|
||||
agentRunSeq,
|
||||
nodeSendToSession,
|
||||
...(typeof cfgAtStart.media?.ttlHours === "number"
|
||||
? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) }
|
||||
: {}),
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -1002,6 +1017,7 @@ export async function startGatewayServer(
|
||||
tickInterval,
|
||||
healthInterval,
|
||||
dedupeCleanup,
|
||||
mediaCleanup,
|
||||
agentUnsub,
|
||||
heartbeatUnsub,
|
||||
chatRunState,
|
||||
|
||||
@@ -96,7 +96,7 @@ export function attachMediaRoutes(
|
||||
|
||||
// periodic cleanup
|
||||
setInterval(() => {
|
||||
void cleanOldMedia(ttlMs);
|
||||
void cleanOldMedia(ttlMs, { recursive: false });
|
||||
}, ttlMs).unref();
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import JSZip from "jszip";
|
||||
import sharp from "sharp";
|
||||
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
import { isPathWithinBase } from "../../test/helpers/paths.js";
|
||||
import { createTempHomeEnv, type TempHomeEnv } from "../test-utils/temp-home.js";
|
||||
|
||||
@@ -25,6 +25,10 @@ describe("media store", () => {
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
async function withTempStore<T>(
|
||||
fn: (store: typeof import("./store.js"), home: string) => Promise<T>,
|
||||
): Promise<T> {
|
||||
@@ -64,6 +68,33 @@ describe("media store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("retries buffer writes when cleanup prunes the target directory", async () => {
|
||||
await withTempStore(async (store) => {
|
||||
const originalWriteFile = fs.writeFile.bind(fs);
|
||||
let injectedEnoent = false;
|
||||
vi.spyOn(fs, "writeFile").mockImplementation(async (...args) => {
|
||||
const [filePath] = args;
|
||||
if (
|
||||
!injectedEnoent &&
|
||||
typeof filePath === "string" &&
|
||||
filePath.includes(`${path.sep}race-buffer${path.sep}`)
|
||||
) {
|
||||
injectedEnoent = true;
|
||||
await fs.rm(path.dirname(filePath), { recursive: true, force: true });
|
||||
const err = new Error("missing dir") as NodeJS.ErrnoException;
|
||||
err.code = "ENOENT";
|
||||
throw err;
|
||||
}
|
||||
return await originalWriteFile(...args);
|
||||
});
|
||||
|
||||
const saved = await store.saveMediaBuffer(Buffer.from("hello"), "text/plain", "race-buffer");
|
||||
const savedStat = await fs.stat(saved.path);
|
||||
expect(injectedEnoent).toBe(true);
|
||||
expect(savedStat.isFile()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("copies local files and cleans old media", async () => {
|
||||
await withTempStore(async (store, home) => {
|
||||
const srcFile = path.join(home, "tmp-src.txt");
|
||||
@@ -83,6 +114,36 @@ describe("media store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("retries local-source writes when cleanup prunes the target directory", async () => {
|
||||
await withTempStore(async (store, home) => {
|
||||
const srcFile = path.join(home, "tmp-src-race.txt");
|
||||
await fs.writeFile(srcFile, "local file");
|
||||
|
||||
const originalWriteFile = fs.writeFile.bind(fs);
|
||||
let injectedEnoent = false;
|
||||
vi.spyOn(fs, "writeFile").mockImplementation(async (...args) => {
|
||||
const [filePath] = args;
|
||||
if (
|
||||
!injectedEnoent &&
|
||||
typeof filePath === "string" &&
|
||||
filePath.includes(`${path.sep}race-source${path.sep}`)
|
||||
) {
|
||||
injectedEnoent = true;
|
||||
await fs.rm(path.dirname(filePath), { recursive: true, force: true });
|
||||
const err = new Error("missing dir") as NodeJS.ErrnoException;
|
||||
err.code = "ENOENT";
|
||||
throw err;
|
||||
}
|
||||
return await originalWriteFile(...args);
|
||||
});
|
||||
|
||||
const saved = await store.saveMediaSource(srcFile, undefined, "race-source");
|
||||
const savedStat = await fs.stat(saved.path);
|
||||
expect(injectedEnoent).toBe(true);
|
||||
expect(savedStat.isFile()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it.runIf(process.platform !== "win32")("rejects symlink sources", async () => {
|
||||
await withTempStore(async (store, home) => {
|
||||
const target = path.join(home, "sensitive.txt");
|
||||
@@ -116,6 +177,97 @@ describe("media store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("cleans old media files in nested subdirectories and preserves fresh siblings", async () => {
|
||||
await withTempStore(async (store) => {
|
||||
const oldNested = await store.saveMediaBuffer(
|
||||
Buffer.from("old nested"),
|
||||
"text/plain",
|
||||
path.join("remote-cache", "session-1", "images"),
|
||||
);
|
||||
const freshNested = await store.saveMediaBuffer(
|
||||
Buffer.from("fresh nested"),
|
||||
"text/plain",
|
||||
path.join("remote-cache", "session-1", "docs"),
|
||||
);
|
||||
const oldFlat = await store.saveMediaBuffer(Buffer.from("old flat"), "text/plain", "inbound");
|
||||
const past = Date.now() - 10_000;
|
||||
await fs.utimes(oldNested.path, past / 1000, past / 1000);
|
||||
await fs.utimes(oldFlat.path, past / 1000, past / 1000);
|
||||
|
||||
await store.cleanOldMedia(1_000, { recursive: true, pruneEmptyDirs: true });
|
||||
|
||||
await expect(fs.stat(oldNested.path)).rejects.toThrow();
|
||||
await expect(fs.stat(oldFlat.path)).rejects.toThrow();
|
||||
const freshStat = await fs.stat(freshNested.path);
|
||||
expect(freshStat.isFile()).toBe(true);
|
||||
await expect(fs.stat(path.dirname(oldNested.path))).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps nested remote-cache files during shallow cleanup", async () => {
|
||||
await withTempStore(async (store) => {
|
||||
const nested = await store.saveMediaBuffer(
|
||||
Buffer.from("old nested"),
|
||||
"text/plain",
|
||||
path.join("remote-cache", "session-1", "images"),
|
||||
);
|
||||
const past = Date.now() - 10_000;
|
||||
await fs.utimes(nested.path, past / 1000, past / 1000);
|
||||
|
||||
await store.cleanOldMedia(1_000);
|
||||
|
||||
const stat = await fs.stat(nested.path);
|
||||
expect(stat.isFile()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("prunes empty directory chains after recursive cleanup", async () => {
|
||||
await withTempStore(async (store) => {
|
||||
const nested = await store.saveMediaBuffer(
|
||||
Buffer.from("old nested"),
|
||||
"text/plain",
|
||||
path.join("remote-cache", "session-prune", "images"),
|
||||
);
|
||||
const mediaDir = await store.ensureMediaDir();
|
||||
const sessionDir = path.dirname(path.dirname(nested.path));
|
||||
const remoteCacheDir = path.dirname(sessionDir);
|
||||
const past = Date.now() - 10_000;
|
||||
await fs.utimes(nested.path, past / 1000, past / 1000);
|
||||
|
||||
await store.cleanOldMedia(1_000, { recursive: true, pruneEmptyDirs: true });
|
||||
|
||||
await expect(fs.stat(sessionDir)).rejects.toThrow();
|
||||
const remoteCacheStat = await fs.stat(remoteCacheDir);
|
||||
const mediaStat = await fs.stat(mediaDir);
|
||||
expect(remoteCacheStat.isDirectory()).toBe(true);
|
||||
expect(mediaStat.isDirectory()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it.runIf(process.platform !== "win32")(
|
||||
"does not follow symlinked top-level directories during recursive cleanup",
|
||||
async () => {
|
||||
await withTempStore(async (store, home) => {
|
||||
const mediaDir = await store.ensureMediaDir();
|
||||
const outsideDir = path.join(home, "outside-media");
|
||||
const outsideFile = path.join(outsideDir, "old.txt");
|
||||
const symlinkPath = path.join(mediaDir, "linked-dir");
|
||||
await fs.mkdir(outsideDir, { recursive: true });
|
||||
await fs.writeFile(outsideFile, "outside");
|
||||
const past = Date.now() - 10_000;
|
||||
await fs.utimes(outsideFile, past / 1000, past / 1000);
|
||||
await fs.symlink(outsideDir, symlinkPath);
|
||||
|
||||
await store.cleanOldMedia(1_000, { recursive: true, pruneEmptyDirs: true });
|
||||
|
||||
const outsideStat = await fs.stat(outsideFile);
|
||||
const symlinkStat = await fs.lstat(symlinkPath);
|
||||
expect(outsideStat.isFile()).toBe(true);
|
||||
expect(symlinkStat.isSymbolicLink()).toBe(true);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
it("sets correct mime for xlsx by extension", async () => {
|
||||
await withTempStore(async (store, home) => {
|
||||
const xlsxPath = path.join(home, "sheet.xlsx");
|
||||
|
||||
@@ -17,6 +17,10 @@ const DEFAULT_TTL_MS = 2 * 60 * 1000; // 2 minutes
|
||||
// Files are intentionally readable by non-owner UIDs so Docker sandbox containers can access
|
||||
// inbound media. The containing state/media directories remain 0o700, which is the trust boundary.
|
||||
const MEDIA_FILE_MODE = 0o644;
|
||||
type CleanOldMediaOptions = {
|
||||
recursive?: boolean;
|
||||
pruneEmptyDirs?: boolean;
|
||||
};
|
||||
type RequestImpl = typeof httpRequest;
|
||||
type ResolvePinnedHostnameImpl = typeof resolvePinnedHostname;
|
||||
|
||||
@@ -88,42 +92,82 @@ export async function ensureMediaDir() {
|
||||
return mediaDir;
|
||||
}
|
||||
|
||||
export async function cleanOldMedia(ttlMs = DEFAULT_TTL_MS) {
|
||||
const mediaDir = await ensureMediaDir();
|
||||
const entries = await fs.readdir(mediaDir).catch(() => []);
|
||||
const now = Date.now();
|
||||
const removeExpiredFilesInDir = async (dir: string) => {
|
||||
const dirEntries = await fs.readdir(dir).catch(() => []);
|
||||
await Promise.all(
|
||||
dirEntries.map(async (entry) => {
|
||||
const full = path.join(dir, entry);
|
||||
const stat = await fs.stat(full).catch(() => null);
|
||||
if (!stat || !stat.isFile()) {
|
||||
return;
|
||||
}
|
||||
if (now - stat.mtimeMs > ttlMs) {
|
||||
await fs.rm(full).catch(() => {});
|
||||
}
|
||||
}),
|
||||
);
|
||||
};
|
||||
function isMissingPathError(err: unknown): err is NodeJS.ErrnoException {
|
||||
return err instanceof Error && "code" in err && err.code === "ENOENT";
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
entries.map(async (file) => {
|
||||
const full = path.join(mediaDir, file);
|
||||
const stat = await fs.stat(full).catch(() => null);
|
||||
if (!stat) {
|
||||
return;
|
||||
async function retryAfterRecreatingDir<T>(dir: string, run: () => Promise<T>): Promise<T> {
|
||||
try {
|
||||
return await run();
|
||||
} catch (err) {
|
||||
if (!isMissingPathError(err)) {
|
||||
throw err;
|
||||
}
|
||||
// Recursive cleanup can prune an empty directory between mkdir and the later
|
||||
// file open/write. Recreate once and retry the media write path.
|
||||
await fs.mkdir(dir, { recursive: true, mode: 0o700 });
|
||||
return await run();
|
||||
}
|
||||
}
|
||||
|
||||
export async function cleanOldMedia(ttlMs = DEFAULT_TTL_MS, options: CleanOldMediaOptions = {}) {
|
||||
const mediaDir = await ensureMediaDir();
|
||||
const now = Date.now();
|
||||
const recursive = options.recursive ?? false;
|
||||
const pruneEmptyDirs = recursive && (options.pruneEmptyDirs ?? false);
|
||||
|
||||
const removeExpiredFilesInDir = async (dir: string): Promise<boolean> => {
|
||||
const dirEntries = await fs.readdir(dir).catch(() => null);
|
||||
if (!dirEntries) {
|
||||
return false;
|
||||
}
|
||||
for (const entry of dirEntries) {
|
||||
const fullPath = path.join(dir, entry);
|
||||
const stat = await fs.lstat(fullPath).catch(() => null);
|
||||
if (!stat || stat.isSymbolicLink()) {
|
||||
continue;
|
||||
}
|
||||
if (stat.isDirectory()) {
|
||||
await removeExpiredFilesInDir(full);
|
||||
return;
|
||||
if (recursive) {
|
||||
const childIsEmpty = await removeExpiredFilesInDir(fullPath);
|
||||
if (childIsEmpty) {
|
||||
await fs.rmdir(fullPath).catch(() => {});
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (stat.isFile() && now - stat.mtimeMs > ttlMs) {
|
||||
await fs.rm(full).catch(() => {});
|
||||
if (!stat.isFile()) {
|
||||
continue;
|
||||
}
|
||||
}),
|
||||
);
|
||||
if (now - stat.mtimeMs > ttlMs) {
|
||||
await fs.rm(fullPath, { force: true }).catch(() => {});
|
||||
}
|
||||
}
|
||||
if (!pruneEmptyDirs) {
|
||||
return false;
|
||||
}
|
||||
const remainingEntries = await fs.readdir(dir).catch(() => null);
|
||||
return remainingEntries !== null && remainingEntries.length === 0;
|
||||
};
|
||||
|
||||
const entries = await fs.readdir(mediaDir).catch(() => []);
|
||||
for (const file of entries) {
|
||||
const full = path.join(mediaDir, file);
|
||||
const stat = await fs.lstat(full).catch(() => null);
|
||||
if (!stat || stat.isSymbolicLink()) {
|
||||
continue;
|
||||
}
|
||||
if (stat.isDirectory()) {
|
||||
const dirIsEmpty = await removeExpiredFilesInDir(full);
|
||||
if (dirIsEmpty) {
|
||||
await fs.rmdir(full).catch(() => {});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (stat.isFile() && now - stat.mtimeMs > ttlMs) {
|
||||
await fs.rm(full, { force: true }).catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function looksLikeUrl(src: string) {
|
||||
@@ -264,11 +308,13 @@ export async function saveMediaSource(
|
||||
const baseDir = resolveMediaDir();
|
||||
const dir = subdir ? path.join(baseDir, subdir) : baseDir;
|
||||
await fs.mkdir(dir, { recursive: true, mode: 0o700 });
|
||||
await cleanOldMedia();
|
||||
await cleanOldMedia(DEFAULT_TTL_MS, { recursive: false });
|
||||
const baseId = crypto.randomUUID();
|
||||
if (looksLikeUrl(source)) {
|
||||
const tempDest = path.join(dir, `${baseId}.tmp`);
|
||||
const { headerMime, sniffBuffer, size } = await downloadToFile(source, tempDest, headers);
|
||||
const { headerMime, sniffBuffer, size } = await retryAfterRecreatingDir(dir, () =>
|
||||
downloadToFile(source, tempDest, headers),
|
||||
);
|
||||
const mime = await detectMime({
|
||||
buffer: sniffBuffer,
|
||||
headerMime,
|
||||
@@ -287,7 +333,7 @@ export async function saveMediaSource(
|
||||
const ext = extensionForMime(mime) ?? path.extname(source);
|
||||
const id = ext ? `${baseId}${ext}` : baseId;
|
||||
const dest = path.join(dir, id);
|
||||
await fs.writeFile(dest, buffer, { mode: MEDIA_FILE_MODE });
|
||||
await retryAfterRecreatingDir(dir, () => fs.writeFile(dest, buffer, { mode: MEDIA_FILE_MODE }));
|
||||
return { id, path: dest, size: stat.size, contentType: mime };
|
||||
} catch (err) {
|
||||
if (err instanceof SafeOpenError) {
|
||||
@@ -326,6 +372,6 @@ export async function saveMediaBuffer(
|
||||
}
|
||||
|
||||
const dest = path.join(dir, id);
|
||||
await fs.writeFile(dest, buffer, { mode: MEDIA_FILE_MODE });
|
||||
await retryAfterRecreatingDir(dir, () => fs.writeFile(dest, buffer, { mode: MEDIA_FILE_MODE }));
|
||||
return { id, path: dest, size: buffer.byteLength, contentType: mime };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user