Files
openclaw/src/slack/streaming.ts
David Szarzynski bbcb3ac6e0 fix(slack): pass recipient_team_id to streaming API calls (#20988)
* fix(slack): pass recipient_team_id and recipient_user_id to streaming API calls

The Slack Agents & AI Apps streaming API (chat.startStream / chat.stopStream)
requires recipient_team_id and recipient_user_id parameters. Without them,
stopStream fails with 'missing_recipient_team_id' (all contexts) or
'missing_recipient_user_id' (DM contexts), causing streamed messages to
disappear after generation completes.

This passes:
- team_id (from auth.test at provider startup, stored in monitor context)
- user_id (from the incoming message sender, for DM recipient identification)

through to the ChatStreamer via recipient_team_id and recipient_user_id options.

Fixes #19839, #20847, #20299, #19791, #20337

AI-assisted: Written with Claude (Opus 4.6) via OpenClaw. Lightly tested
(unit tests pass, live workspace verification in progress).

* fix(slack): disable block streaming when native streaming is active

When Slack native streaming (`chat.startStream`/`stopStream`) is enabled,
`disableBlockStreaming` was set to `false`, which activated the app-level
block streaming pipeline. This pipeline intercepted agent output, sent it
via block replies, then dropped the final payloads that would have flowed
through `deliverWithStreaming` to the Slack streaming API — resulting in
zero replies delivered.

Set `disableBlockStreaming: true` when native streaming is active so the
final reply flows through the Slack streaming API path as intended.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
2026-02-19 14:44:34 -08:00

154 lines
4.5 KiB
TypeScript

/**
* Slack native text streaming helpers.
*
* Uses the Slack SDK's `ChatStreamer` (via `client.chatStream()`) to stream
* text responses word-by-word in a single updating message, matching Slack's
* "Agents & AI Apps" streaming UX.
*
* @see https://docs.slack.dev/ai/developing-ai-apps#streaming
* @see https://docs.slack.dev/reference/methods/chat.startStream
* @see https://docs.slack.dev/reference/methods/chat.appendStream
* @see https://docs.slack.dev/reference/methods/chat.stopStream
*/
import type { WebClient } from "@slack/web-api";
import type { ChatStreamer } from "@slack/web-api/dist/chat-stream.js";
import { logVerbose } from "../globals.js";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export type SlackStreamSession = {
/** The SDK ChatStreamer instance managing this stream. */
streamer: ChatStreamer;
/** Channel this stream lives in. */
channel: string;
/** Thread timestamp (required for streaming). */
threadTs: string;
/** True once stop() has been called. */
stopped: boolean;
};
export type StartSlackStreamParams = {
client: WebClient;
channel: string;
threadTs: string;
/** Optional initial markdown text to include in the stream start. */
text?: string;
/**
* The team ID of the workspace this stream belongs to.
* Required by the Slack API for `chat.startStream` / `chat.stopStream`.
* Obtain from `auth.test` response (`team_id`).
*/
teamId?: string;
/**
* The user ID of the message recipient (required for DM streaming).
* Without this, `chat.stopStream` fails with `missing_recipient_user_id`
* in direct message conversations.
*/
userId?: string;
};
export type AppendSlackStreamParams = {
session: SlackStreamSession;
text: string;
};
export type StopSlackStreamParams = {
session: SlackStreamSession;
/** Optional final markdown text to append before stopping. */
text?: string;
};
// ---------------------------------------------------------------------------
// Stream lifecycle
// ---------------------------------------------------------------------------
/**
* Start a new Slack text stream.
*
* Returns a {@link SlackStreamSession} that should be passed to
* {@link appendSlackStream} and {@link stopSlackStream}.
*
* The first chunk of text can optionally be included via `text`.
*/
export async function startSlackStream(
params: StartSlackStreamParams,
): Promise<SlackStreamSession> {
const { client, channel, threadTs, text, teamId, userId } = params;
logVerbose(
`slack-stream: starting stream in ${channel} thread=${threadTs}${teamId ? ` team=${teamId}` : ""}${userId ? ` user=${userId}` : ""}`,
);
const streamer = client.chatStream({
channel,
thread_ts: threadTs,
...(teamId ? { recipient_team_id: teamId } : {}),
...(userId ? { recipient_user_id: userId } : {}),
});
const session: SlackStreamSession = {
streamer,
channel,
threadTs,
stopped: false,
};
// If initial text is provided, send it as the first append which will
// trigger the ChatStreamer to call chat.startStream under the hood.
if (text) {
await streamer.append({ markdown_text: text });
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
}
return session;
}
/**
* Append markdown text to an active Slack stream.
*/
export async function appendSlackStream(params: AppendSlackStreamParams): Promise<void> {
const { session, text } = params;
if (session.stopped) {
logVerbose("slack-stream: attempted to append to a stopped stream, ignoring");
return;
}
if (!text) {
return;
}
await session.streamer.append({ markdown_text: text });
logVerbose(`slack-stream: appended ${text.length} chars`);
}
/**
* Stop (finalize) a Slack stream.
*
* After calling this the stream message becomes a normal Slack message.
* Optionally include final text to append before stopping.
*/
export async function stopSlackStream(params: StopSlackStreamParams): Promise<void> {
const { session, text } = params;
if (session.stopped) {
logVerbose("slack-stream: stream already stopped, ignoring duplicate stop");
return;
}
session.stopped = true;
logVerbose(
`slack-stream: stopping stream in ${session.channel} thread=${session.threadTs}${
text ? ` (final text: ${text.length} chars)` : ""
}`,
);
await session.streamer.stop(text ? { markdown_text: text } : undefined);
logVerbose("slack-stream: stream stopped");
}