From b3a7396d32df3c51504f5770f8912141b8712181 Mon Sep 17 00:00:00 2001 From: Fedor Date: Wed, 25 Feb 2026 23:18:45 +0300 Subject: [PATCH] Support: chat, tickets list, SSE Postgres NOTIFY, read/unread --- backend/app/api/support.py | 699 ++++++++++++++++++ backend/app/main.py | 18 +- .../003_support_threads_messages.sql | 32 + .../migrations/004_support_notify_trigger.sql | 37 + backend/db/migrations/005_support_reads.sql | 13 + docs/SUPPORT_FEATURE_SUMMARY.md | 36 + docs/SUPPORT_N8N_WEBHOOK.md | 80 ++ frontend/src/components/BottomBar.css | 44 ++ frontend/src/components/BottomBar.tsx | 67 +- frontend/src/components/SupportChat.tsx | 412 +++++++++++ frontend/src/pages/Support.tsx | 191 +++++ 11 files changed, 1615 insertions(+), 14 deletions(-) create mode 100644 backend/app/api/support.py create mode 100644 backend/db/migrations/003_support_threads_messages.sql create mode 100644 backend/db/migrations/004_support_notify_trigger.sql create mode 100644 backend/db/migrations/005_support_reads.sql create mode 100644 docs/SUPPORT_FEATURE_SUMMARY.md create mode 100644 docs/SUPPORT_N8N_WEBHOOK.md create mode 100644 frontend/src/components/SupportChat.tsx create mode 100644 frontend/src/pages/Support.tsx diff --git a/backend/app/api/support.py b/backend/app/api/support.py new file mode 100644 index 0000000..e354367 --- /dev/null +++ b/backend/app/api/support.py @@ -0,0 +1,699 @@ +""" +Support API: диалог поддержки (треды + сообщения). +POST /api/v1/support — multipart, создание/поиск треда, запись сообщения user, прокси в n8n. +GET /api/v1/support/thread — получить тред и сообщения. +GET /api/v1/support/stream — SSE: один канал на юзера, события из Postgres NOTIFY. +POST /api/v1/support/incoming — webhook для n8n: добавить сообщение от поддержки. +GET /api/v1/support/limits — лимиты вложений. +""" + +import asyncio +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set, Tuple + +import asyncpg +import httpx +from fastapi import APIRouter, Header, HTTPException, Query, Request +from starlette.responses import StreamingResponse + +from ..config import settings +from ..services.database import db + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/v1/support", tags=["support"]) + +# Реестр SSE по unified_id: кому пушить при NOTIFY (один канал support_events) +_support_stream_registry: Dict[str, Set[asyncio.Queue]] = {} +_support_notify_inbox: asyncio.Queue = asyncio.Queue() +_SUPPORT_EVENTS_CHANNEL = "support_events" + + +def _get_support_webhook() -> str: + url = (getattr(settings, "n8n_support_webhook", None) or "").strip() + if not url: + raise HTTPException( + status_code=503, + detail="N8N_SUPPORT_WEBHOOK не настроен", + ) + return url + + +async def _resolve_session( + session_token: Optional[str] = None, + channel: Optional[str] = None, + channel_user_id: Optional[str] = None, +) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + """Возвращает (unified_id, phone, email, session_id).""" + unified_id: Optional[str] = None + phone: Optional[str] = None + session_id: Optional[str] = session_token + + if channel and channel_user_id: + try: + from .session import get_session_by_channel_user + data = await get_session_by_channel_user(channel.strip(), str(channel_user_id).strip()) + if data: + unified_id = data.get("unified_id") + phone = data.get("phone") + if session_token is None: + session_id = data.get("session_token") + except Exception as e: + logger.warning("Ошибка чтения сессии по channel: %s", e) + + if not unified_id and session_token: + try: + from .session import SessionVerifyRequest, verify_session + res = await verify_session(SessionVerifyRequest(session_token=session_token)) + if getattr(res, "valid", False): + unified_id = getattr(res, "unified_id", None) + phone = getattr(res, "phone", None) + except HTTPException: + raise + except Exception as e: + logger.warning("Ошибка верификации сессии для support: %s", e) + raise HTTPException(status_code=401, detail="Сессия недействительна") + + if not unified_id: + raise HTTPException( + status_code=401, + detail="Укажите session_token или (channel + channel_user_id)", + ) + + return (unified_id, phone, None, session_id or session_token) + + +def _check_attachment_limits( + files: List[Tuple[str, bytes, Optional[str]]], +) -> None: + """Проверяет лимиты вложений; 0 или пусто = не проверять.""" + max_count = getattr(settings, "support_attachments_max_count", 0) or 0 + max_bytes = settings.support_attachments_max_size_bytes + allowed = (getattr(settings, "support_attachments_allowed_types", None) or "").strip() + + if max_count > 0 and len(files) > max_count: + raise HTTPException( + status_code=400, + detail=f"Слишком много файлов: максимум {max_count}", + ) + + if max_bytes > 0: + for name, content, _ in files: + if len(content) > max_bytes: + raise HTTPException( + status_code=400, + detail=f"Файл {name} превышает допустимый размер ({max_bytes // (1024*1024)} МБ)", + ) + + if allowed: + allowed_list = [x.strip().lower() for x in allowed.split(",") if x.strip()] + for filename, content, mime in files: + mime = (mime or "").strip().lower() + ext = "" + if "." in filename: + ext = "." + filename.rsplit(".", 1)[-1].lower() + ok = False + for a in allowed_list: + if a.startswith("."): + if ext == a: + ok = True + break + elif "/" in a: + if mime == a or (a.endswith("/*") and mime.split("/")[0] == a.split("/")[0]): + ok = True + break + else: + if ext == f".{a}" or mime == a: + ok = True + break + if not ok: + raise HTTPException( + status_code=400, + detail=f"Тип файла «{filename}» не разрешён. Допустимые: {allowed}", + ) + + +def _uuid_str(val: Any) -> str: + if val is None: + return "" + return str(val) + + +async def _get_or_create_thread(unified_id: str, claim_id: Optional[str], source: str) -> str: + """Найти тред по (unified_id, claim_id) или создать. Возвращает thread_id (UUID str).""" + if claim_id and claim_id.strip(): + row = await db.fetch_one( + "SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id = $2", + unified_id, + claim_id.strip(), + ) + else: + row = await db.fetch_one( + "SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id IS NULL", + unified_id, + ) + + if row: + return _uuid_str(row["id"]) + + thread_id = uuid.uuid4() + await db.execute( + "INSERT INTO clpr_support_threads (id, unified_id, claim_id, source) VALUES ($1, $2, $3, $4)", + thread_id, + unified_id, + claim_id.strip() if claim_id and claim_id.strip() else None, + source or "bar", + ) + return str(thread_id) + + +def _support_notify_callback(conn: Any, pid: int, channel: str, payload: str) -> None: + """Вызывается asyncpg при NOTIFY support_events. Кладём payload во inbox.""" + try: + _support_notify_inbox.put_nowait(payload) + except Exception as e: + logger.warning("Support notify inbox put: %s", e) + + +async def _run_support_listener() -> None: + """ + Один подписчик на Postgres NOTIFY support_events. + Держит соединение, слушает канал, раскидывает по unified_id в реестр. + """ + conn: Optional[asyncpg.Connection] = None + try: + conn = await asyncpg.connect( + host=settings.postgres_host, + port=settings.postgres_port, + database=settings.postgres_db, + user=settings.postgres_user, + password=settings.postgres_password, + ) + await conn.execute("LISTEN " + _SUPPORT_EVENTS_CHANNEL) + conn.add_listener(_SUPPORT_EVENTS_CHANNEL, _support_notify_callback) + logger.info("Support LISTEN %s started", _SUPPORT_EVENTS_CHANNEL) + while True: + payload = await _support_notify_inbox.get() + try: + data = json.loads(payload) + u_id = data.get("unified_id") + if not u_id: + continue + queues = _support_stream_registry.get(u_id) + if not queues: + continue + for q in list(queues): + try: + q.put_nowait(data) + except asyncio.QueueFull: + pass + except Exception as e: + logger.warning("Support stream put: %s", e) + except json.JSONDecodeError as e: + logger.warning("Support notify payload not JSON: %s", e) + except Exception as e: + logger.exception("Support listener dispatch: %s", e) + except asyncio.CancelledError: + logger.info("Support listener cancelled") + except Exception as e: + logger.exception("Support listener error: %s", e) + finally: + if conn and not conn.is_closed(): + await conn.close() + logger.info("Support LISTEN stopped") + + +@router.get("/limits") +async def get_support_limits(): + """Лимиты вложений (0/пусто = без ограничений).""" + max_count = getattr(settings, "support_attachments_max_count", 0) or 0 + max_bytes = settings.support_attachments_max_size_bytes + allowed = (getattr(settings, "support_attachments_allowed_types", None) or "").strip() + unlimited = max_count == 0 and max_bytes == 0 and not allowed + return { + "max_count": max_count, + "max_size_per_file": max_bytes, + "allowed_types": allowed, + "unlimited": unlimited, + } + + +@router.get("/threads") +async def get_support_threads( + session_token: Optional[str] = Query(None), + channel: Optional[str] = Query(None), + channel_user_id: Optional[str] = Query(None), +): + """ + Список всех тредов пользователя для экрана «Мои обращения». + Сессия: session_token или channel + channel_user_id. + """ + unified_id, _, _, _ = await _resolve_session( + session_token=session_token, channel=channel, channel_user_id=channel_user_id + ) + + rows = await db.fetch_all( + """ + SELECT + t.id, + t.claim_id, + t.source, + t.ticket_id, + t.created_at, + t.updated_at, + (SELECT m.body FROM clpr_support_messages m WHERE m.thread_id = t.id ORDER BY m.created_at DESC LIMIT 1) AS last_body, + (SELECT m.created_at FROM clpr_support_messages m WHERE m.thread_id = t.id ORDER BY m.created_at DESC LIMIT 1) AS last_at, + (SELECT COUNT(*)::int FROM clpr_support_messages m WHERE m.thread_id = t.id) AS messages_count, + (SELECT COUNT(*)::int FROM clpr_support_messages m + WHERE m.thread_id = t.id AND m.direction = 'support' + AND m.created_at > COALESCE( + (SELECT r.last_read_at FROM clpr_support_reads r WHERE r.unified_id = t.unified_id AND r.thread_id = t.id), + '1970-01-01'::timestamptz + )) AS unread_count + FROM clpr_support_threads t + WHERE t.unified_id = $1 + ORDER BY COALESCE( + (SELECT m.created_at FROM clpr_support_messages m WHERE m.thread_id = t.id ORDER BY m.created_at DESC LIMIT 1), + t.updated_at, + t.created_at + ) DESC + """, + unified_id, + ) + threads = [] + for r in rows: + last_at = r.get("last_at") + if hasattr(last_at, "isoformat"): + last_at = last_at.isoformat() + elif last_at is not None: + last_at = str(last_at) + threads.append({ + "thread_id": _uuid_str(r["id"]), + "claim_id": str(r["claim_id"]).strip() if r.get("claim_id") else None, + "source": str(r.get("source") or "bar"), + "ticket_id": str(r["ticket_id"]) if r.get("ticket_id") else None, + "created_at": r["created_at"].isoformat() if hasattr(r.get("created_at"), "isoformat") else str(r.get("created_at") or ""), + "updated_at": r["updated_at"].isoformat() if hasattr(r.get("updated_at"), "isoformat") else str(r.get("updated_at") or ""), + "last_body": (r.get("last_body") or "")[:200] if r.get("last_body") else None, + "last_at": last_at, + "messages_count": r.get("messages_count") or 0, + "unread_count": r.get("unread_count") or 0, + }) + return {"threads": threads} + + +@router.get("/unread-count") +async def get_support_unread_count( + session_token: Optional[str] = Query(None), + channel: Optional[str] = Query(None), + channel_user_id: Optional[str] = Query(None), +): + """Суммарное число непрочитанных сообщений от поддержки (для бейджа в баре).""" + unified_id, _, _, _ = await _resolve_session( + session_token=session_token, channel=channel, channel_user_id=channel_user_id + ) + row = await db.fetch_one( + """ + SELECT COALESCE(SUM(cnt), 0)::int AS total + FROM ( + SELECT COUNT(*)::int AS cnt + FROM clpr_support_threads t + JOIN clpr_support_messages m ON m.thread_id = t.id AND m.direction = 'support' + WHERE t.unified_id = $1 + AND m.created_at > COALESCE( + (SELECT r.last_read_at FROM clpr_support_reads r WHERE r.unified_id = t.unified_id AND r.thread_id = t.id), + '1970-01-01'::timestamptz + ) + GROUP BY t.id + ) s + """, + unified_id, + ) + total = (row and row.get("total")) or 0 + return {"unread_count": total} + + +@router.post("/read") +async def mark_support_thread_read( + request: Request, + session_token: Optional[str] = Query(None), + channel: Optional[str] = Query(None), + channel_user_id: Optional[str] = Query(None), +): + """ + Отметить тред как прочитанный (пользователь открыл чат). + Тело JSON: { "thread_id": "..." } или query thread_id= / claim_id=. + """ + unified_id, _, _, _ = await _resolve_session( + session_token=session_token, channel=channel, channel_user_id=channel_user_id + ) + thread_id = request.query_params.get("thread_id") + claim_id = request.query_params.get("claim_id") + if not thread_id: + try: + body = await request.json() if request.headers.get("content-type", "").startswith("application/json") else {} + except Exception: + body = {} + thread_id = body.get("thread_id") + if not thread_id: + claim_id = claim_id or body.get("claim_id") + if claim_id and not thread_id: + cid = str(claim_id).strip() + if cid: + row = await db.fetch_one( + "SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id = $2", + unified_id, + cid, + ) + else: + row = await db.fetch_one( + "SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id IS NULL", + unified_id, + ) + if row: + thread_id = str(row["id"]) + if not thread_id: + raise HTTPException(status_code=400, detail="thread_id или claim_id обязателен") + try: + thread_uuid = uuid.UUID(thread_id) + except Exception: + raise HTTPException(status_code=400, detail="Некорректный thread_id") + # Проверяем, что тред принадлежит пользователю + row = await db.fetch_one( + "SELECT id FROM clpr_support_threads WHERE id = $1 AND unified_id = $2", + thread_uuid, + unified_id, + ) + if not row: + raise HTTPException(status_code=404, detail="Тред не найден") + await db.execute( + """ + INSERT INTO clpr_support_reads (unified_id, thread_id, last_read_at) + VALUES ($1, $2, CURRENT_TIMESTAMP) + ON CONFLICT (unified_id, thread_id) DO UPDATE SET last_read_at = CURRENT_TIMESTAMP + """, + unified_id, + thread_uuid, + ) + return {"success": True} + + +@router.get("/thread") +async def get_support_thread( + claim_id: Optional[str] = Query(None), + session_token: Optional[str] = Query(None), + channel: Optional[str] = Query(None), + channel_user_id: Optional[str] = Query(None), +): + """ + Получить тред поддержки и сообщения. Query: claim_id (опционально). + Сессия: session_token или channel + channel_user_id. + """ + unified_id, _, _, _ = await _resolve_session( + session_token=session_token, channel=channel, channel_user_id=channel_user_id + ) + + cid = claim_id.strip() if claim_id and str(claim_id).strip() else None + if cid: + row = await db.fetch_one( + "SELECT id, ticket_id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id = $2", + unified_id, + cid, + ) + else: + row = await db.fetch_one( + "SELECT id, ticket_id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id IS NULL", + unified_id, + ) + + if not row: + return {"thread_id": None, "messages": [], "ticket_id": None} + + thread_id = _uuid_str(row["id"]) + ticket_id = row.get("ticket_id") + if ticket_id is not None: + ticket_id = str(ticket_id) + + rows = await db.fetch_all( + "SELECT id, direction, body, attachments, created_at FROM clpr_support_messages WHERE thread_id = $1 ORDER BY created_at ASC", + row["id"], + ) + messages = [] + for r in rows: + att = r.get("attachments") + if att is not None and not isinstance(att, list): + try: + att = json.loads(att) if isinstance(att, str) else att + except Exception: + att = [] + messages.append({ + "id": _uuid_str(r["id"]), + "direction": r["direction"], + "body": r["body"] or "", + "attachments": att or [], + "created_at": r["created_at"].isoformat() if hasattr(r["created_at"], "isoformat") else str(r["created_at"]), + }) + + return { + "thread_id": thread_id, + "messages": messages, + "ticket_id": ticket_id, + } + + +@router.get("/stream") +async def support_stream( + session_token: Optional[str] = Query(None), + channel: Optional[str] = Query(None), + channel_user_id: Optional[str] = Query(None), +): + """ + SSE: один поток на пользователя по unified_id. События приходят из Postgres NOTIFY (триггер на clpr_support_messages). + Query: session_token или channel + channel_user_id. + """ + unified_id, _, _, _ = await _resolve_session( + session_token=session_token, channel=channel, channel_user_id=channel_user_id + ) + + queue: asyncio.Queue = asyncio.Queue(maxsize=64) + if unified_id not in _support_stream_registry: + _support_stream_registry[unified_id] = set() + _support_stream_registry[unified_id].add(queue) + + async def event_gen(): + try: + yield f"data: {json.dumps({'event': 'connected', 'unified_id': unified_id}, ensure_ascii=False)}\n\n" + while True: + try: + msg = await asyncio.wait_for(queue.get(), timeout=30.0) + # Формат как в SupportChat: id, direction, body, attachments, created_at + created_at = msg.get("created_at") + if hasattr(created_at, "isoformat"): + created_at = created_at.isoformat() + elif created_at is not None: + created_at = str(created_at) + event = { + "event": "support_message", + "message": { + "id": str(msg.get("message_id", "")), + "direction": msg.get("direction", "support"), + "body": msg.get("body", ""), + "attachments": json.loads(msg.get("attachments", "[]")) if isinstance(msg.get("attachments"), str) else (msg.get("attachments") or []), + "created_at": created_at, + }, + "thread_id": str(msg.get("thread_id", "")), + } + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + except asyncio.TimeoutError: + yield ": keepalive\n\n" + finally: + _support_stream_registry.get(unified_id, set()).discard(queue) + if unified_id in _support_stream_registry and not _support_stream_registry[unified_id]: + del _support_stream_registry[unified_id] + + return StreamingResponse( + event_gen(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + + +@router.post("") +async def submit_support(request: Request): + """ + Отправить сообщение в поддержку. Multipart: message, subject?, claim_id?, source, thread_id?, + session_token (или channel + channel_user_id), файлы. Создаёт/находит тред, пишет сообщение, проксирует в n8n. + """ + form = await request.form() + message = form.get("message") + if not message or not str(message).strip(): + raise HTTPException(status_code=400, detail="Поле message обязательно") + message = str(message).strip() + + subject = form.get("subject") + subject = str(subject).strip() if subject else None + claim_id = form.get("claim_id") + claim_id = str(claim_id).strip() if claim_id else None + source = form.get("source") + source = str(source).strip() if source else "bar" + thread_id_param = form.get("thread_id") + thread_id_param = str(thread_id_param).strip() if thread_id_param else None + session_token = form.get("session_token") + session_token = str(session_token).strip() if session_token else None + channel = form.get("channel") + channel = str(channel).strip() if channel else None + channel_user_id = form.get("channel_user_id") + channel_user_id = str(channel_user_id).strip() if channel_user_id else None + + file_items: List[Tuple[str, bytes, Optional[str]]] = [] + for key, value in form.multi_items(): + if hasattr(value, "read") and hasattr(value, "filename"): + content = await value.read() + file_items.append((value.filename or key, content, getattr(value, "content_type", None))) + + _check_attachment_limits(file_items) + + unified_id, phone, email, session_id = await _resolve_session( + session_token=session_token, + channel=channel, + channel_user_id=channel_user_id, + ) + + thread_id = await _get_or_create_thread(unified_id, claim_id or None, source) + + attachments_json = json.dumps([{"filename": fn} for fn, _, _ in file_items]) + + message_id = uuid.uuid4() + await db.execute( + "INSERT INTO clpr_support_messages (id, thread_id, direction, body, attachments) VALUES ($1, $2, 'user', $3, $4)", + message_id, + uuid.UUID(thread_id), + message, + attachments_json, + ) + + row = await db.fetch_one("SELECT ticket_id FROM clpr_support_threads WHERE id = $1", uuid.UUID(thread_id)) + ticket_id = str(row["ticket_id"]) if row and row.get("ticket_id") else None + + webhook_url = _get_support_webhook() + timestamp = datetime.now(timezone.utc).isoformat() + + data: Dict[str, str] = { + "message": message, + "source": source or "bar", + "unified_id": unified_id or "", + "phone": (phone or "").strip(), + "email": (email or "").strip(), + "session_id": (session_id or "").strip(), + "timestamp": timestamp, + "thread_id": thread_id, + } + if subject: + data["subject"] = subject + if claim_id: + data["claim_id"] = claim_id + if ticket_id: + data["ticket_id"] = ticket_id + + files_for_upload: Dict[str, Tuple[str, bytes, Optional[str]]] = {} + for i, (filename, content, content_type) in enumerate(file_items): + key = f"attachments[{i}]" if len(file_items) > 1 else "attachments" + if key in files_for_upload: + key = f"attachments[{i}]" + files_for_upload[key] = (filename, content, content_type or "application/octet-stream") + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post( + webhook_url, + data=data, + files=files_for_upload or None, + ) + except httpx.TimeoutException: + logger.error("Таймаут вызова N8N support webhook") + raise HTTPException(status_code=504, detail="Таймаут подключения к сервису поддержки") + except Exception as e: + logger.exception("Ошибка вызова N8N support webhook: %s", e) + raise HTTPException(status_code=502, detail="Сервис поддержки временно недоступен") + + if response.status_code != 200: + logger.warning("N8N support webhook вернул %s: %s", response.status_code, response.text[:500]) + raise HTTPException(status_code=502, detail="Сервис поддержки вернул ошибку") + + try: + resp_json = response.json() + if isinstance(resp_json, dict) and resp_json.get("ticket_id"): + tid = str(resp_json.get("ticket_id")).strip() + if tid: + await db.execute( + "UPDATE clpr_support_threads SET ticket_id = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2", + tid, + uuid.UUID(thread_id), + ) + except Exception: + pass + + return { + "success": True, + "thread_id": thread_id, + "message_id": str(message_id), + } + + +@router.post("/incoming") +async def support_incoming( + request: Request, + x_support_incoming_secret: Optional[str] = Header(None, alias="X-Support-Incoming-Secret"), +): + """ + Webhook для n8n: добавить сообщение от поддержки в тред. + Тело: JSON { "thread_id" или "ticket_id", "body", "attachments?" }. + Заголовок X-Support-Incoming-Secret должен совпадать с SUPPORT_INCOMING_SECRET (если задан). + """ + secret = (getattr(settings, "support_incoming_secret", None) or "").strip() + if secret: + header_secret = x_support_incoming_secret or request.query_params.get("secret") or "" + if header_secret.strip() != secret: + raise HTTPException(status_code=403, detail="Invalid secret") + + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="JSON body required") + + thread_id = body.get("thread_id") + ticket_id = body.get("ticket_id") + msg_body = (body.get("body") or "").strip() + attachments = body.get("attachments") + if isinstance(attachments, list): + attachments = json.dumps(attachments) + else: + attachments = "[]" + + if not thread_id and not ticket_id: + raise HTTPException(status_code=400, detail="thread_id or ticket_id required") + + if ticket_id and not thread_id: + row = await db.fetch_one("SELECT id FROM clpr_support_threads WHERE ticket_id = $1", str(ticket_id)) + if not row: + raise HTTPException(status_code=404, detail="Thread not found by ticket_id") + thread_id = str(row["id"]) + + try: + thread_uuid = uuid.UUID(thread_id) + except Exception: + raise HTTPException(status_code=400, detail="Invalid thread_id") + + msg_id = uuid.uuid4() + await db.execute( + "INSERT INTO clpr_support_messages (id, thread_id, direction, body, attachments) VALUES ($1, $2, 'support', $3, $4)", + msg_id, + thread_uuid, + msg_body, + attachments, + ) + logger.info("Support incoming message added: thread_id=%s", thread_id) + return {"success": True, "message_id": str(msg_id)} diff --git a/backend/app/main.py b/backend/app/main.py index 289795c..e451eed 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -5,6 +5,7 @@ from fastapi import FastAPI, Request import json from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager +import asyncio import logging import time import uuid @@ -18,7 +19,7 @@ from .services.rabbitmq_service import rabbitmq_service from .services.policy_service import policy_service from .services.crm_mysql_service import crm_mysql_service from .services.s3_service import s3_service -from .api import sms, claims, policy, upload, draft, events, n8n_proxy, session, documents, banks, telegram_auth, max_auth, auth2, auth_universal, documents_draft_open, profile +from .api import sms, claims, policy, upload, draft, events, n8n_proxy, session, documents, banks, telegram_auth, max_auth, auth2, auth_universal, documents_draft_open, profile, support from .api import debug_session # Настройка логирования @@ -161,6 +162,14 @@ async def lifespan(app: FastAPI): s3_service.connect() except Exception as e: logger.warning(f"⚠️ S3 storage not available: {e}") + + # Postgres LISTEN support_events для доставки сообщений поддержки в реальном времени (SSE) + support_listener_task = None + try: + support_listener_task = asyncio.create_task(support._run_support_listener()) + logger.info("✅ Support NOTIFY listener task started") + except Exception as e: + logger.warning(f"⚠️ Support listener not started: {e}") logger.info("✅ Ticket Form Intake Platform started successfully!") @@ -168,6 +177,12 @@ async def lifespan(app: FastAPI): # SHUTDOWN logger.info("🛑 Shutting down Ticket Form Intake Platform...") + if support_listener_task and not support_listener_task.done(): + support_listener_task.cancel() + try: + await support_listener_task + except asyncio.CancelledError: + pass await db.disconnect() await redis_service.disconnect() @@ -243,6 +258,7 @@ app.include_router(max_auth.router) # 📱 MAX Mini App auth app.include_router(auth2.router) # 🆕 Alt auth endpoint (tg/max/sms) app.include_router(auth_universal.router) # Универсальный auth: channel + init_data → N8N_AUTH_WEBHOOK, Redis session:{channel}:{channel_user_id} app.include_router(profile.router) # 👤 Профиль: контакты из CRM через N8N_CONTACT_WEBHOOK +app.include_router(support.router) # 📞 Поддержка: форма из бара и карточек жалоб → n8n app.include_router(documents_draft_open.router) # 🆕 Documents draft-open (isolated) app.include_router(debug_session.router) # 🔧 Debug helpers (set session + redirect) diff --git a/backend/db/migrations/003_support_threads_messages.sql b/backend/db/migrations/003_support_threads_messages.sql new file mode 100644 index 0000000..202bd06 --- /dev/null +++ b/backend/db/migrations/003_support_threads_messages.sql @@ -0,0 +1,32 @@ +-- Треды и сообщения поддержки (диалог). Префикс таблиц: clpr_ +-- Один тред на (unified_id, claim_id или null); сообщения user/support + +CREATE TABLE IF NOT EXISTS clpr_support_threads ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + unified_id VARCHAR(255) NOT NULL, + claim_id VARCHAR(255), + source VARCHAR(50) NOT NULL DEFAULT 'bar', + ticket_id VARCHAR(255), + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_clpr_support_threads_unified_claim ON clpr_support_threads(unified_id, claim_id); +CREATE INDEX IF NOT EXISTS idx_clpr_support_threads_ticket ON clpr_support_threads(ticket_id); + +CREATE TABLE IF NOT EXISTS clpr_support_messages ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + thread_id UUID NOT NULL REFERENCES clpr_support_threads(id) ON DELETE CASCADE, + direction VARCHAR(20) NOT NULL CHECK (direction IN ('user', 'support')), + body TEXT NOT NULL DEFAULT '', + attachments JSONB DEFAULT '[]', + external_id VARCHAR(255), + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_clpr_support_messages_thread ON clpr_support_messages(thread_id); +CREATE INDEX IF NOT EXISTS idx_clpr_support_messages_created ON clpr_support_messages(thread_id, created_at); +CREATE UNIQUE INDEX IF NOT EXISTS idx_clpr_support_messages_external ON clpr_support_messages(thread_id, external_id) WHERE external_id IS NOT NULL; + +COMMENT ON TABLE clpr_support_threads IS 'Треды обращений в поддержку: один на пользователя (бар) или по claim_id'; +COMMENT ON TABLE clpr_support_messages IS 'Сообщения в треде: user — от пользователя, support — от оператора'; diff --git a/backend/db/migrations/004_support_notify_trigger.sql b/backend/db/migrations/004_support_notify_trigger.sql new file mode 100644 index 0000000..7bef0aa --- /dev/null +++ b/backend/db/migrations/004_support_notify_trigger.sql @@ -0,0 +1,37 @@ +-- NOTIFY при INSERT в clpr_support_messages для доставки в реальном времени (SSE). +-- Один канал support_events, в payload — unified_id и данные сообщения. +-- Таблицы поддержки с префиксом clpr_ + +CREATE OR REPLACE FUNCTION support_messages_notify() +RETURNS TRIGGER +LANGUAGE plpgsql +AS $$ +DECLARE + u_id VARCHAR(255); + payload TEXT; +BEGIN + SELECT unified_id INTO u_id FROM clpr_support_threads WHERE id = NEW.thread_id; + IF u_id IS NULL THEN + RETURN NEW; + END IF; + payload := json_build_object( + 'unified_id', u_id, + 'thread_id', NEW.thread_id, + 'message_id', NEW.id, + 'direction', NEW.direction, + 'body', NEW.body, + 'attachments', COALESCE(NEW.attachments::TEXT, '[]'), + 'created_at', NEW.created_at + )::TEXT; + PERFORM pg_notify('support_events', payload); + RETURN NEW; +END; +$$; + +DROP TRIGGER IF EXISTS after_support_message_insert ON clpr_support_messages; +CREATE TRIGGER after_support_message_insert + AFTER INSERT ON clpr_support_messages + FOR EACH ROW + EXECUTE PROCEDURE support_messages_notify(); + +COMMENT ON FUNCTION support_messages_notify() IS 'NOTIFY support_events при новом сообщении для SSE'; diff --git a/backend/db/migrations/005_support_reads.sql b/backend/db/migrations/005_support_reads.sql new file mode 100644 index 0000000..b3e0823 --- /dev/null +++ b/backend/db/migrations/005_support_reads.sql @@ -0,0 +1,13 @@ +-- Отметки «прочитано» по тредам: когда пользователь последний раз видел тред. +-- Непрочитанные = сообщения от support с created_at > last_read_at. + +CREATE TABLE IF NOT EXISTS clpr_support_reads ( + unified_id VARCHAR(255) NOT NULL, + thread_id UUID NOT NULL REFERENCES clpr_support_threads(id) ON DELETE CASCADE, + last_read_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (unified_id, thread_id) +); + +CREATE INDEX IF NOT EXISTS idx_clpr_support_reads_unified ON clpr_support_reads(unified_id); + +COMMENT ON TABLE clpr_support_reads IS 'Когда пользователь последний раз «прочитал» тред (открыл чат)'; diff --git a/docs/SUPPORT_FEATURE_SUMMARY.md b/docs/SUPPORT_FEATURE_SUMMARY.md new file mode 100644 index 0000000..e7bfef8 --- /dev/null +++ b/docs/SUPPORT_FEATURE_SUMMARY.md @@ -0,0 +1,36 @@ +# Поддержка: чат, список тикетов, прочитано/непрочитано, SSE + +## Что сделано + +### БД (таблицы с префиксом `clpr_`) +- **clpr_support_threads** — треды обращений (unified_id, claim_id, source, ticket_id). +- **clpr_support_messages** — сообщения (thread_id, direction user/support, body, attachments). +- **clpr_support_reads** — когда пользователь последний раз «прочитал» тред (unified_id, thread_id, last_read_at). +- Триггер на INSERT в clpr_support_messages → NOTIFY `support_events` (payload: unified_id, thread_id, сообщение) для доставки в реальном времени. + +### Backend API +- **POST /api/v1/support** — отправить сообщение (multipart), создание/поиск треда, прокси в n8n. +- **GET /api/v1/support/threads** — список всех тредов пользователя с unread_count. +- **GET /api/v1/support/thread** — один тред и сообщения (по claim_id или бар). +- **GET /api/v1/support/stream** — SSE: один поток на пользователя, события из Postgres NOTIFY. +- **GET /api/v1/support/unread-count** — суммарное число непрочитанных (бейдж в баре). +- **POST /api/v1/support/read** — отметить тред прочитанным (thread_id или claim_id). +- **POST /api/v1/support/incoming** — webhook для n8n: добавить ответ оператора в тред. +- **GET /api/v1/support/limits** — лимиты вложений. + +При старте приложения запускается задача LISTEN на канал `support_events`; при NOTIFY события раскидываются по реестру стримов (unified_id → SSE). + +### Frontend +- **Страница «Поддержка»** — первый экран: список обращений (тикетов) с бейджем непрочитанных; кнопка «Новое обращение»; по клику — чат выбранного треда. +- **SupportChat** — чат с SSE (новые сообщения от поддержки без перезагрузки); при открытии чата вызывается POST /read. +- **Нижний бар** — на иконке «Поддержка» бейдж с общим числом непрочитанных. + +### Документация +- **docs/SUPPORT_N8N_WEBHOOK.md** — переменные окружения, API, миграции, тест SSE, прочитано/непрочитано и сценарии в n8n. + +## Миграции +- 003_support_threads_messages.sql — создание clpr_support_threads, clpr_support_messages. +- 004_support_notify_trigger.sql — триггер NOTIFY support_events. +- 005_support_reads.sql — таблица clpr_support_reads. + +Креды Postgres из .env. Применение: из корня aiform_prod подставить POSTGRES_* и выполнить psql -f для каждой миграции. diff --git a/docs/SUPPORT_N8N_WEBHOOK.md b/docs/SUPPORT_N8N_WEBHOOK.md new file mode 100644 index 0000000..56f72ab --- /dev/null +++ b/docs/SUPPORT_N8N_WEBHOOK.md @@ -0,0 +1,80 @@ +# Поддержка: webhook n8n, диалог (треды), лимиты вложений + +Функционал «Поддержка» реализован как диалог: треды и сообщения хранятся в БД. **Таблицы с префиксом `clpr_`:** `clpr_support_threads`, `clpr_support_messages`. Исходящие сообщения пользователя проксируются в n8n; входящие ответы оператора приходят в backend через webhook POST /api/v1/support/incoming (из n8n при ответе в CRM). + +Подключение к PostgreSQL: креды берутся из `.env` — `POSTGRES_HOST`, `POSTGRES_PORT`, `POSTGRES_DB`, `POSTGRES_USER`, `POSTGRES_PASSWORD`. + +## Переменные окружения + +В `.env` задаются: + +| Переменная | Описание | +|------------|----------| +| `N8N_SUPPORT_WEBHOOK` | URL webhook n8n (multipart). Обязателен. | +| `SUPPORT_ATTACHMENTS_MAX_COUNT` | Макс. количество файлов (0 = без ограничений). | +| `SUPPORT_ATTACHMENTS_MAX_SIZE_MB` | Макс. размер одного файла в МБ (0 = без ограничений). | +| `SUPPORT_ATTACHMENTS_ALLOWED_TYPES` | Допустимые типы (пусто = любые). | +| `SUPPORT_INCOMING_SECRET` | Секрет для POST /api/v1/support/incoming (заголовок `X-Support-Incoming-Secret` или query `secret`). Если задан — только n8n с этим секретом может слать ответы в тред. | + +Значение **0** или **пустая строка** для лимитов означает «без ограничений». + +## Формат запроса от backend к n8n + +Backend отправляет на `N8N_SUPPORT_WEBHOOK` **POST multipart/form-data**: + +- **Поля:** `message`, `subject`, `claim_id`, `source`, `unified_id`, `phone`, `email`, `session_id`, `timestamp`, **`thread_id`** (UUID треда), **`ticket_id`** (если тред уже привязан к тикету в CRM). +- **Файлы:** `attachments[0]`, … или `attachments`. + +Ответ n8n может содержать **`ticket_id`** — backend сохранит его в `clpr_support_threads` для последующих сообщений и для входящего webhook. + +## API backend + +- **POST /api/v1/support** — multipart: message, subject?, claim_id?, source, **thread_id?**, session_token (или channel+channel_user_id), файлы. Создаёт/находит тред по (unified_id, claim_id), записывает сообщение (user), проксирует в n8n. Ответ: `{ "success": true, "thread_id": "...", "message_id": "..." }`. +- **GET /api/v1/support/threads** — список всех тредов пользователя. В каждом элементе есть **`unread_count`** (число непрочитанных сообщений от поддержки). Ответ: `{ "threads": [{ "thread_id", "claim_id" | null, "source", "ticket_id", "created_at", "updated_at", "last_body", "last_at", "messages_count", "unread_count" }] }`. +- **GET /api/v1/support/unread-count** — суммарное число непрочитанных по всем тредам (для бейджа в баре). Ответ: `{ "unread_count": number }`. +- **POST /api/v1/support/read** — отметить тред как прочитанный (пользователь открыл чат). Query или body: `thread_id` или `claim_id`. Обновляет `clpr_support_reads`. +- **GET /api/v1/support/thread** — query: `claim_id?`, `session_token` (или `channel` + `channel_user_id`). Возвращает один тред и сообщения: `{ "thread_id": "...", "messages": [...], "ticket_id": "..." }`. Если треда нет — `thread_id: null`, `messages: []`. +- **POST /api/v1/support/incoming** — для n8n: добавить сообщение от поддержки в тред. Тело JSON: `{ "thread_id" или "ticket_id", "body", "attachments?": [] }`. Заголовок **`X-Support-Incoming-Secret`** или query **`secret`** должен совпадать с `SUPPORT_INCOMING_SECRET` (если задан). По `ticket_id` backend находит thread_id и вставляет сообщение с direction=support. +- **GET /api/v1/support/limits** — лимиты вложений из env. +- **GET /api/v1/support/stream** — SSE: один поток на пользователя (query `session_token` или `channel` + `channel_user_id`). Новые сообщения от поддержки приходят в реальном времени через Postgres NOTIFY (триггер на `clpr_support_messages`). События: `connected`, `support_message` (в теле — `thread_id`, `message`: id, direction, body, attachments, created_at). + +## Доставка в реальном времени (Postgres NOTIFY) + +При INSERT в `clpr_support_messages` срабатывает триггер, который делает `NOTIFY support_events` с payload (unified_id, thread_id, сообщение). Backend при старте подписывается на канал `support_events` одним LISTEN-соединением и раскидывает события по реестру стримов (unified_id → очереди SSE). + +**Прочитано/непрочитано:** таблица `clpr_support_reads` (unified_id, thread_id, last_read_at). Пользователь «прочитал» тред, когда открывает чат — фронт вызывает POST /read. Непрочитанные = сообщения от support с created_at > last_read_at. По этим данным можно в n8n/CRM строить сценарии напоминаний (push, повторная отправка), если пользователь долго не читает. + +**Миграции** (таблицы с префиксом `clpr_`): `003` — треды и сообщения; `004` — триггер NOTIFY; `005_support_reads.sql` — отметки прочтения. Применять к БД вручную. Креды Postgres — из `.env`: + +```bash +# из корня aiform_prod, креды из .env +export $(grep -E '^POSTGRES_' .env | xargs) +psql -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -f backend/db/migrations/003_support_threads_messages.sql +psql -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -f backend/db/migrations/004_support_notify_trigger.sql +psql -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -U "$POSTGRES_USER" -d "$POSTGRES_DB" -f backend/db/migrations/005_support_reads.sql +``` + +Если в БД уже есть таблицы без префикса (`support_threads`, `support_messages`), их нужно переименовать в `clpr_support_threads` и `clpr_support_messages` перед применением 004, либо пересоздать схему (миграция 003 с префиксом создаёт таблицы с `IF NOT EXISTS`). + +## n8n + +1. **Webhook приёма обращений** — multipart, при первом сообщении создаёт тикет в CRM, в ответе возвращает `ticket_id`. При последующих (есть thread_id/ticket_id) — добавляет комментарий к тикету. +2. **Вызов нашего incoming** — когда оператор ответил в CRM, workflow n8n должен вызвать **POST https://.../api/v1/support/incoming** с заголовком `X-Support-Incoming-Secret: ` и телом `{ "thread_id": "..." или "ticket_id": "...", "body": "текст ответа" }`, чтобы сообщение появилось в чате мини-аппа. + +--- + +## Как тестировать SSE (ответы в реальном времени) + +1. **В мини-аппе:** зайти в поддержку (бар → «Поддержка» или страница /support), авторизоваться, отправить первое сообщение (или открыть уже существующий тред). Оставить чат открытым. +2. **Узнать `thread_id`:** в DevTools → Network найти запрос `GET .../api/v1/support/thread` и в ответе скопировать `thread_id`, либо после отправки сообщения — ответ `POST .../api/v1/support` содержит `thread_id`. +3. **Имитация ответа поддержки:** вызвать incoming (как будет делать n8n): + + ```bash + # Подставить THREAD_ID и секрет из .env (SUPPORT_INCOMING_SECRET). Если секрет пустой — заголовок можно не передавать. + curl -s -X POST 'https://miniapp.clientright.ru/api/v1/support/incoming' \ + -H 'Content-Type: application/json' \ + -H 'X-Support-Incoming-Secret: ВАШ_SUPPORT_INCOMING_SECRET' \ + -d '{"thread_id":"THREAD_ID","body":"Тестовый ответ от поддержки"}' + ``` + +4. **Ожидание:** в открытом чате в мини-аппе в течение 1–2 секунд должно появиться новое сообщение **без перезагрузки и без повторного запроса** (доставка по SSE). Если сообщение появляется только после обновления страницы — проверить, что фронт пересобран с SSE (`docker compose build frontend && docker compose up -d frontend`) и что в Network есть запрос к `/api/v1/support/stream` со статусом pending (длинное соединение). diff --git a/frontend/src/components/BottomBar.css b/frontend/src/components/BottomBar.css index e84a132..35b21cc 100644 --- a/frontend/src/components/BottomBar.css +++ b/frontend/src/components/BottomBar.css @@ -65,9 +65,53 @@ color: #dc2626; } +.app-bar-item-icon-wrap { + position: relative; + display: inline-flex; + align-items: center; + justify-content: center; +} + .app-bar-avatar { width: 28px; height: 28px; border-radius: 50%; object-fit: cover; } + +.app-bar-profile-badge { + position: absolute; + top: -4px; + right: -6px; + min-width: 16px; + height: 16px; + padding: 0 4px; + font-size: 11px; + font-weight: 700; + line-height: 16px; + color: #fff; + text-align: center; + background: #dc2626; + border: 1.5px solid #fff; + border-radius: 50%; + box-sizing: border-box; +} + +.app-bar-support-badge { + position: absolute; + bottom: -2px; + left: 50%; + transform: translate(-50%, 50%); + min-width: 18px; + height: 18px; + padding: 0 5px; + font-size: 11px; + font-weight: 700; + line-height: 18px; + color: #fff; + text-align: center; + background: #dc2626; + border: 1.5px solid #fff; + border-radius: 9px; + box-sizing: border-box; +} diff --git a/frontend/src/components/BottomBar.tsx b/frontend/src/components/BottomBar.tsx index 3786bfb..86fe420 100644 --- a/frontend/src/components/BottomBar.tsx +++ b/frontend/src/components/BottomBar.tsx @@ -3,27 +3,56 @@ import { Home, Headphones, User, LogOut, ArrowLeft } from 'lucide-react'; import './BottomBar.css'; import { miniappLog } from '../utils/miniappLogger'; +function getSessionToken(): string | null { + if (typeof sessionStorage !== 'undefined') { + const s = sessionStorage.getItem('session_token'); + if (s) return s; + } + if (typeof localStorage !== 'undefined') { + return localStorage.getItem('session_token'); + } + return null; +} + interface BottomBarProps { currentPath: string; avatarUrl?: string; + profileNeedsAttention?: boolean; onNavigate?: (path: string) => void; } -export default function BottomBar({ currentPath, avatarUrl, onNavigate }: BottomBarProps) { +export default function BottomBar({ currentPath, avatarUrl, profileNeedsAttention, onNavigate }: BottomBarProps) { const isHome = currentPath.startsWith('/hello'); const isProfile = currentPath === '/profile'; + const isSupport = currentPath === '/support'; const [backEnabled, setBackEnabled] = useState(false); + const [supportUnreadCount, setSupportUnreadCount] = useState(0); + + // Непрочитанные в поддержке — для бейджа на иконке + useEffect(() => { + const token = getSessionToken(); + if (!token) { + setSupportUnreadCount(0); + return; + } + const params = new URLSearchParams(); + params.set('session_token', token); + fetch(`/api/v1/support/unread-count?${params.toString()}`) + .then((res) => (res.ok ? res.json() : { unread_count: 0 })) + .then((data) => setSupportUnreadCount(data.unread_count ?? 0)) + .catch(() => setSupportUnreadCount(0)); + }, [currentPath]); // В некоторых webview бывает «ghost click» сразу после навигации — даём бару чуть устояться useEffect(() => { - if (isHome || isProfile) { + if (isHome || isProfile || isSupport) { setBackEnabled(false); return; } setBackEnabled(false); const t = window.setTimeout(() => setBackEnabled(true), 1200); return () => window.clearTimeout(t); - }, [isHome, isProfile, currentPath]); + }, [isHome, isProfile, isSupport, currentPath]); const handleBack = (e: React.MouseEvent) => { e.preventDefault(); @@ -153,25 +182,37 @@ export default function BottomBar({ currentPath, avatarUrl, onNavigate }: Bottom onNavigate('/profile'); } }} + aria-label={profileNeedsAttention ? 'Профиль — требуется подтверждение данных' : 'Профиль'} > - {avatarUrl ? ( - - ) : ( - - )} + + {avatarUrl ? ( + + ) : ( + + )} + {profileNeedsAttention && !} + Профиль { - if (onNavigate && !currentPath.startsWith('/hello')) { + if (onNavigate && currentPath !== '/support') { e.preventDefault(); - onNavigate('/hello'); + onNavigate('/support'); } }} + aria-label={supportUnreadCount > 0 ? `Поддержка: ${supportUnreadCount} непрочитанных` : 'Поддержка'} > - + + + {supportUnreadCount > 0 && ( + + {supportUnreadCount > 99 ? '99+' : supportUnreadCount} + + )} + Поддержка