477 lines
15 KiB
TypeScript
477 lines
15 KiB
TypeScript
import { chunkTextWithMode, resolveChunkMode, resolveTextChunkLimit } from "../auto-reply/chunk.js";
|
|
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "../auto-reply/reply/history.js";
|
|
import type { ReplyPayload } from "../auto-reply/types.js";
|
|
import type { OpenClawConfig } from "../config/config.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import {
|
|
resolveAllowlistProviderRuntimeGroupPolicy,
|
|
resolveDefaultGroupPolicy,
|
|
warnMissingProviderGroupPolicyFallbackOnce,
|
|
} from "../config/runtime-group-policy.js";
|
|
import type { SignalReactionNotificationMode } from "../config/types.js";
|
|
import type { BackoffPolicy } from "../infra/backoff.js";
|
|
import { waitForTransportReady } from "../infra/transport-ready.js";
|
|
import { saveMediaBuffer } from "../media/store.js";
|
|
import { createNonExitingRuntime, type RuntimeEnv } from "../runtime.js";
|
|
import { normalizeStringEntries } from "../shared/string-normalization.js";
|
|
import { normalizeE164 } from "../utils.js";
|
|
import { resolveSignalAccount } from "./accounts.js";
|
|
import { signalCheck, signalRpcRequest } from "./client.js";
|
|
import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js";
|
|
import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js";
|
|
import { createSignalEventHandler } from "./monitor/event-handler.js";
|
|
import type {
|
|
SignalAttachment,
|
|
SignalReactionMessage,
|
|
SignalReactionTarget,
|
|
} from "./monitor/event-handler.types.js";
|
|
import { sendMessageSignal } from "./send.js";
|
|
import { runSignalSseLoop } from "./sse-reconnect.js";
|
|
|
|
export type MonitorSignalOpts = {
|
|
runtime?: RuntimeEnv;
|
|
abortSignal?: AbortSignal;
|
|
account?: string;
|
|
accountId?: string;
|
|
config?: OpenClawConfig;
|
|
baseUrl?: string;
|
|
autoStart?: boolean;
|
|
startupTimeoutMs?: number;
|
|
cliPath?: string;
|
|
httpHost?: string;
|
|
httpPort?: number;
|
|
receiveMode?: "on-start" | "manual";
|
|
ignoreAttachments?: boolean;
|
|
ignoreStories?: boolean;
|
|
sendReadReceipts?: boolean;
|
|
allowFrom?: Array<string | number>;
|
|
groupAllowFrom?: Array<string | number>;
|
|
mediaMaxMb?: number;
|
|
reconnectPolicy?: Partial<BackoffPolicy>;
|
|
};
|
|
|
|
function resolveRuntime(opts: MonitorSignalOpts): RuntimeEnv {
|
|
return opts.runtime ?? createNonExitingRuntime();
|
|
}
|
|
|
|
function mergeAbortSignals(
|
|
a?: AbortSignal,
|
|
b?: AbortSignal,
|
|
): { signal?: AbortSignal; dispose: () => void } {
|
|
if (!a && !b) {
|
|
return { signal: undefined, dispose: () => {} };
|
|
}
|
|
if (!a) {
|
|
return { signal: b, dispose: () => {} };
|
|
}
|
|
if (!b) {
|
|
return { signal: a, dispose: () => {} };
|
|
}
|
|
const controller = new AbortController();
|
|
const abortFrom = (source: AbortSignal) => {
|
|
if (!controller.signal.aborted) {
|
|
controller.abort(source.reason);
|
|
}
|
|
};
|
|
if (a.aborted) {
|
|
abortFrom(a);
|
|
return { signal: controller.signal, dispose: () => {} };
|
|
}
|
|
if (b.aborted) {
|
|
abortFrom(b);
|
|
return { signal: controller.signal, dispose: () => {} };
|
|
}
|
|
const onAbortA = () => abortFrom(a);
|
|
const onAbortB = () => abortFrom(b);
|
|
a.addEventListener("abort", onAbortA, { once: true });
|
|
b.addEventListener("abort", onAbortB, { once: true });
|
|
return {
|
|
signal: controller.signal,
|
|
dispose: () => {
|
|
a.removeEventListener("abort", onAbortA);
|
|
b.removeEventListener("abort", onAbortB);
|
|
},
|
|
};
|
|
}
|
|
|
|
function createSignalDaemonLifecycle(params: { abortSignal?: AbortSignal }) {
|
|
let daemonHandle: SignalDaemonHandle | null = null;
|
|
let daemonStopRequested = false;
|
|
let daemonExitError: Error | undefined;
|
|
const daemonAbortController = new AbortController();
|
|
const mergedAbort = mergeAbortSignals(params.abortSignal, daemonAbortController.signal);
|
|
const stop = () => {
|
|
daemonStopRequested = true;
|
|
daemonHandle?.stop();
|
|
};
|
|
const attach = (handle: SignalDaemonHandle) => {
|
|
daemonHandle = handle;
|
|
void handle.exited.then((exit) => {
|
|
if (daemonStopRequested || params.abortSignal?.aborted) {
|
|
return;
|
|
}
|
|
daemonExitError = new Error(formatSignalDaemonExit(exit));
|
|
if (!daemonAbortController.signal.aborted) {
|
|
daemonAbortController.abort(daemonExitError);
|
|
}
|
|
});
|
|
};
|
|
const getExitError = () => daemonExitError;
|
|
return {
|
|
attach,
|
|
stop,
|
|
getExitError,
|
|
abortSignal: mergedAbort.signal,
|
|
dispose: mergedAbort.dispose,
|
|
};
|
|
}
|
|
|
|
function normalizeAllowList(raw?: Array<string | number>): string[] {
|
|
return normalizeStringEntries(raw);
|
|
}
|
|
|
|
function resolveSignalReactionTargets(reaction: SignalReactionMessage): SignalReactionTarget[] {
|
|
const targets: SignalReactionTarget[] = [];
|
|
const uuid = reaction.targetAuthorUuid?.trim();
|
|
if (uuid) {
|
|
targets.push({ kind: "uuid", id: uuid, display: `uuid:${uuid}` });
|
|
}
|
|
const author = reaction.targetAuthor?.trim();
|
|
if (author) {
|
|
const normalized = normalizeE164(author);
|
|
targets.push({ kind: "phone", id: normalized, display: normalized });
|
|
}
|
|
return targets;
|
|
}
|
|
|
|
function isSignalReactionMessage(
|
|
reaction: SignalReactionMessage | null | undefined,
|
|
): reaction is SignalReactionMessage {
|
|
if (!reaction) {
|
|
return false;
|
|
}
|
|
const emoji = reaction.emoji?.trim();
|
|
const timestamp = reaction.targetSentTimestamp;
|
|
const hasTarget = Boolean(reaction.targetAuthor?.trim() || reaction.targetAuthorUuid?.trim());
|
|
return Boolean(emoji && typeof timestamp === "number" && timestamp > 0 && hasTarget);
|
|
}
|
|
|
|
function shouldEmitSignalReactionNotification(params: {
|
|
mode?: SignalReactionNotificationMode;
|
|
account?: string | null;
|
|
targets?: SignalReactionTarget[];
|
|
sender?: ReturnType<typeof resolveSignalSender> | null;
|
|
allowlist?: string[];
|
|
}) {
|
|
const { mode, account, targets, sender, allowlist } = params;
|
|
const effectiveMode = mode ?? "own";
|
|
if (effectiveMode === "off") {
|
|
return false;
|
|
}
|
|
if (effectiveMode === "own") {
|
|
const accountId = account?.trim();
|
|
if (!accountId || !targets || targets.length === 0) {
|
|
return false;
|
|
}
|
|
const normalizedAccount = normalizeE164(accountId);
|
|
return targets.some((target) => {
|
|
if (target.kind === "uuid") {
|
|
return accountId === target.id || accountId === `uuid:${target.id}`;
|
|
}
|
|
return normalizedAccount === target.id;
|
|
});
|
|
}
|
|
if (effectiveMode === "allowlist") {
|
|
if (!sender || !allowlist || allowlist.length === 0) {
|
|
return false;
|
|
}
|
|
return isSignalSenderAllowed(sender, allowlist);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
function buildSignalReactionSystemEventText(params: {
|
|
emojiLabel: string;
|
|
actorLabel: string;
|
|
messageId: string;
|
|
targetLabel?: string;
|
|
groupLabel?: string;
|
|
}) {
|
|
const base = `Signal reaction added: ${params.emojiLabel} by ${params.actorLabel} msg ${params.messageId}`;
|
|
const withTarget = params.targetLabel ? `${base} from ${params.targetLabel}` : base;
|
|
return params.groupLabel ? `${withTarget} in ${params.groupLabel}` : withTarget;
|
|
}
|
|
|
|
async function waitForSignalDaemonReady(params: {
|
|
baseUrl: string;
|
|
abortSignal?: AbortSignal;
|
|
timeoutMs: number;
|
|
logAfterMs: number;
|
|
logIntervalMs?: number;
|
|
runtime: RuntimeEnv;
|
|
}): Promise<void> {
|
|
await waitForTransportReady({
|
|
label: "signal daemon",
|
|
timeoutMs: params.timeoutMs,
|
|
logAfterMs: params.logAfterMs,
|
|
logIntervalMs: params.logIntervalMs,
|
|
pollIntervalMs: 150,
|
|
abortSignal: params.abortSignal,
|
|
runtime: params.runtime,
|
|
check: async () => {
|
|
const res = await signalCheck(params.baseUrl, 1000);
|
|
if (res.ok) {
|
|
return { ok: true };
|
|
}
|
|
return {
|
|
ok: false,
|
|
error: res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"),
|
|
};
|
|
},
|
|
});
|
|
}
|
|
|
|
async function fetchAttachment(params: {
|
|
baseUrl: string;
|
|
account?: string;
|
|
attachment: SignalAttachment;
|
|
sender?: string;
|
|
groupId?: string;
|
|
maxBytes: number;
|
|
}): Promise<{ path: string; contentType?: string } | null> {
|
|
const { attachment } = params;
|
|
if (!attachment?.id) {
|
|
return null;
|
|
}
|
|
if (attachment.size && attachment.size > params.maxBytes) {
|
|
throw new Error(
|
|
`Signal attachment ${attachment.id} exceeds ${(params.maxBytes / (1024 * 1024)).toFixed(0)}MB limit`,
|
|
);
|
|
}
|
|
const rpcParams: Record<string, unknown> = {
|
|
id: attachment.id,
|
|
};
|
|
if (params.account) {
|
|
rpcParams.account = params.account;
|
|
}
|
|
if (params.groupId) {
|
|
rpcParams.groupId = params.groupId;
|
|
} else if (params.sender) {
|
|
rpcParams.recipient = params.sender;
|
|
} else {
|
|
return null;
|
|
}
|
|
|
|
const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
|
|
baseUrl: params.baseUrl,
|
|
});
|
|
if (!result?.data) {
|
|
return null;
|
|
}
|
|
const buffer = Buffer.from(result.data, "base64");
|
|
const saved = await saveMediaBuffer(
|
|
buffer,
|
|
attachment.contentType ?? undefined,
|
|
"inbound",
|
|
params.maxBytes,
|
|
);
|
|
return { path: saved.path, contentType: saved.contentType };
|
|
}
|
|
|
|
async function deliverReplies(params: {
|
|
replies: ReplyPayload[];
|
|
target: string;
|
|
baseUrl: string;
|
|
account?: string;
|
|
accountId?: string;
|
|
runtime: RuntimeEnv;
|
|
maxBytes: number;
|
|
textLimit: number;
|
|
chunkMode: "length" | "newline";
|
|
}) {
|
|
const { replies, target, baseUrl, account, accountId, runtime, maxBytes, textLimit, chunkMode } =
|
|
params;
|
|
for (const payload of replies) {
|
|
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
|
|
const text = payload.text ?? "";
|
|
if (!text && mediaList.length === 0) {
|
|
continue;
|
|
}
|
|
if (mediaList.length === 0) {
|
|
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
|
|
await sendMessageSignal(target, chunk, {
|
|
baseUrl,
|
|
account,
|
|
maxBytes,
|
|
accountId,
|
|
});
|
|
}
|
|
} else {
|
|
let first = true;
|
|
for (const url of mediaList) {
|
|
const caption = first ? text : "";
|
|
first = false;
|
|
await sendMessageSignal(target, caption, {
|
|
baseUrl,
|
|
account,
|
|
mediaUrl: url,
|
|
maxBytes,
|
|
accountId,
|
|
});
|
|
}
|
|
}
|
|
runtime.log?.(`delivered reply to ${target}`);
|
|
}
|
|
}
|
|
|
|
export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promise<void> {
|
|
const runtime = resolveRuntime(opts);
|
|
const cfg = opts.config ?? loadConfig();
|
|
const accountInfo = resolveSignalAccount({
|
|
cfg,
|
|
accountId: opts.accountId,
|
|
});
|
|
const historyLimit = Math.max(
|
|
0,
|
|
accountInfo.config.historyLimit ??
|
|
cfg.messages?.groupChat?.historyLimit ??
|
|
DEFAULT_GROUP_HISTORY_LIMIT,
|
|
);
|
|
const groupHistories = new Map<string, HistoryEntry[]>();
|
|
const textLimit = resolveTextChunkLimit(cfg, "signal", accountInfo.accountId);
|
|
const chunkMode = resolveChunkMode(cfg, "signal", accountInfo.accountId);
|
|
const baseUrl = opts.baseUrl?.trim() || accountInfo.baseUrl;
|
|
const account = opts.account?.trim() || accountInfo.config.account?.trim();
|
|
const dmPolicy = accountInfo.config.dmPolicy ?? "pairing";
|
|
const allowFrom = normalizeAllowList(opts.allowFrom ?? accountInfo.config.allowFrom);
|
|
const groupAllowFrom = normalizeAllowList(
|
|
opts.groupAllowFrom ??
|
|
accountInfo.config.groupAllowFrom ??
|
|
(accountInfo.config.allowFrom && accountInfo.config.allowFrom.length > 0
|
|
? accountInfo.config.allowFrom
|
|
: []),
|
|
);
|
|
const defaultGroupPolicy = resolveDefaultGroupPolicy(cfg);
|
|
const { groupPolicy, providerMissingFallbackApplied } =
|
|
resolveAllowlistProviderRuntimeGroupPolicy({
|
|
providerConfigPresent: cfg.channels?.signal !== undefined,
|
|
groupPolicy: accountInfo.config.groupPolicy,
|
|
defaultGroupPolicy,
|
|
});
|
|
warnMissingProviderGroupPolicyFallbackOnce({
|
|
providerMissingFallbackApplied,
|
|
providerKey: "signal",
|
|
accountId: accountInfo.accountId,
|
|
log: (message) => runtime.log?.(message),
|
|
});
|
|
const reactionMode = accountInfo.config.reactionNotifications ?? "own";
|
|
const reactionAllowlist = normalizeAllowList(accountInfo.config.reactionAllowlist);
|
|
const mediaMaxBytes = (opts.mediaMaxMb ?? accountInfo.config.mediaMaxMb ?? 8) * 1024 * 1024;
|
|
const ignoreAttachments = opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments ?? false;
|
|
const sendReadReceipts = Boolean(opts.sendReadReceipts ?? accountInfo.config.sendReadReceipts);
|
|
|
|
const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl;
|
|
const startupTimeoutMs = Math.min(
|
|
120_000,
|
|
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
|
|
);
|
|
const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts);
|
|
const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal });
|
|
let daemonHandle: SignalDaemonHandle | null = null;
|
|
|
|
if (autoStart) {
|
|
const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli";
|
|
const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1";
|
|
const httpPort = opts.httpPort ?? accountInfo.config.httpPort ?? 8080;
|
|
daemonHandle = spawnSignalDaemon({
|
|
cliPath,
|
|
account,
|
|
httpHost,
|
|
httpPort,
|
|
receiveMode: opts.receiveMode ?? accountInfo.config.receiveMode,
|
|
ignoreAttachments: opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments,
|
|
ignoreStories: opts.ignoreStories ?? accountInfo.config.ignoreStories,
|
|
sendReadReceipts,
|
|
runtime,
|
|
});
|
|
daemonLifecycle.attach(daemonHandle);
|
|
}
|
|
|
|
const onAbort = () => {
|
|
daemonLifecycle.stop();
|
|
};
|
|
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
|
|
|
|
try {
|
|
if (daemonHandle) {
|
|
await waitForSignalDaemonReady({
|
|
baseUrl,
|
|
abortSignal: daemonLifecycle.abortSignal,
|
|
timeoutMs: startupTimeoutMs,
|
|
logAfterMs: 10_000,
|
|
logIntervalMs: 10_000,
|
|
runtime,
|
|
});
|
|
const daemonExitError = daemonLifecycle.getExitError();
|
|
if (daemonExitError) {
|
|
throw daemonExitError;
|
|
}
|
|
}
|
|
|
|
const handleEvent = createSignalEventHandler({
|
|
runtime,
|
|
cfg,
|
|
baseUrl,
|
|
account,
|
|
accountId: accountInfo.accountId,
|
|
blockStreaming: accountInfo.config.blockStreaming,
|
|
historyLimit,
|
|
groupHistories,
|
|
textLimit,
|
|
dmPolicy,
|
|
allowFrom,
|
|
groupAllowFrom,
|
|
groupPolicy,
|
|
reactionMode,
|
|
reactionAllowlist,
|
|
mediaMaxBytes,
|
|
ignoreAttachments,
|
|
sendReadReceipts,
|
|
readReceiptsViaDaemon,
|
|
fetchAttachment,
|
|
deliverReplies: (params) => deliverReplies({ ...params, chunkMode }),
|
|
resolveSignalReactionTargets,
|
|
isSignalReactionMessage,
|
|
shouldEmitSignalReactionNotification,
|
|
buildSignalReactionSystemEventText,
|
|
});
|
|
|
|
await runSignalSseLoop({
|
|
baseUrl,
|
|
account,
|
|
abortSignal: daemonLifecycle.abortSignal,
|
|
runtime,
|
|
policy: opts.reconnectPolicy,
|
|
onEvent: (event) => {
|
|
void handleEvent(event).catch((err) => {
|
|
runtime.error?.(`event handler failed: ${String(err)}`);
|
|
});
|
|
},
|
|
});
|
|
const daemonExitError = daemonLifecycle.getExitError();
|
|
if (daemonExitError) {
|
|
throw daemonExitError;
|
|
}
|
|
} catch (err) {
|
|
const daemonExitError = daemonLifecycle.getExitError();
|
|
if (opts.abortSignal?.aborted && !daemonExitError) {
|
|
return;
|
|
}
|
|
throw err;
|
|
} finally {
|
|
daemonLifecycle.dispose();
|
|
opts.abortSignal?.removeEventListener("abort", onAbort);
|
|
daemonLifecycle.stop();
|
|
}
|
|
}
|