Support: chat, tickets list, SSE Postgres NOTIFY, read/unread

This commit is contained in:
Fedor
2026-02-25 23:18:45 +03:00
parent d8fe0b605b
commit b3a7396d32
11 changed files with 1615 additions and 14 deletions

699
backend/app/api/support.py Normal file
View File

@@ -0,0 +1,699 @@
"""
Support API: диалог поддержки (треды + сообщения).
POST /api/v1/support — multipart, создание/поиск треда, запись сообщения user, прокси в n8n.
GET /api/v1/support/thread — получить тред и сообщения.
GET /api/v1/support/stream — SSE: один канал на юзера, события из Postgres NOTIFY.
POST /api/v1/support/incoming — webhook для n8n: добавить сообщение от поддержки.
GET /api/v1/support/limits — лимиты вложений.
"""
import asyncio
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple
import asyncpg
import httpx
from fastapi import APIRouter, Header, HTTPException, Query, Request
from starlette.responses import StreamingResponse
from ..config import settings
from ..services.database import db
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/support", tags=["support"])
# Реестр SSE по unified_id: кому пушить при NOTIFY (один канал support_events)
_support_stream_registry: Dict[str, Set[asyncio.Queue]] = {}
_support_notify_inbox: asyncio.Queue = asyncio.Queue()
_SUPPORT_EVENTS_CHANNEL = "support_events"
def _get_support_webhook() -> str:
url = (getattr(settings, "n8n_support_webhook", None) or "").strip()
if not url:
raise HTTPException(
status_code=503,
detail="N8N_SUPPORT_WEBHOOK не настроен",
)
return url
async def _resolve_session(
session_token: Optional[str] = None,
channel: Optional[str] = None,
channel_user_id: Optional[str] = None,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""Возвращает (unified_id, phone, email, session_id)."""
unified_id: Optional[str] = None
phone: Optional[str] = None
session_id: Optional[str] = session_token
if channel and channel_user_id:
try:
from .session import get_session_by_channel_user
data = await get_session_by_channel_user(channel.strip(), str(channel_user_id).strip())
if data:
unified_id = data.get("unified_id")
phone = data.get("phone")
if session_token is None:
session_id = data.get("session_token")
except Exception as e:
logger.warning("Ошибка чтения сессии по channel: %s", e)
if not unified_id and session_token:
try:
from .session import SessionVerifyRequest, verify_session
res = await verify_session(SessionVerifyRequest(session_token=session_token))
if getattr(res, "valid", False):
unified_id = getattr(res, "unified_id", None)
phone = getattr(res, "phone", None)
except HTTPException:
raise
except Exception as e:
logger.warning("Ошибка верификации сессии для support: %s", e)
raise HTTPException(status_code=401, detail="Сессия недействительна")
if not unified_id:
raise HTTPException(
status_code=401,
detail="Укажите session_token или (channel + channel_user_id)",
)
return (unified_id, phone, None, session_id or session_token)
def _check_attachment_limits(
files: List[Tuple[str, bytes, Optional[str]]],
) -> None:
"""Проверяет лимиты вложений; 0 или пусто = не проверять."""
max_count = getattr(settings, "support_attachments_max_count", 0) or 0
max_bytes = settings.support_attachments_max_size_bytes
allowed = (getattr(settings, "support_attachments_allowed_types", None) or "").strip()
if max_count > 0 and len(files) > max_count:
raise HTTPException(
status_code=400,
detail=f"Слишком много файлов: максимум {max_count}",
)
if max_bytes > 0:
for name, content, _ in files:
if len(content) > max_bytes:
raise HTTPException(
status_code=400,
detail=f"Файл {name} превышает допустимый размер ({max_bytes // (1024*1024)} МБ)",
)
if allowed:
allowed_list = [x.strip().lower() for x in allowed.split(",") if x.strip()]
for filename, content, mime in files:
mime = (mime or "").strip().lower()
ext = ""
if "." in filename:
ext = "." + filename.rsplit(".", 1)[-1].lower()
ok = False
for a in allowed_list:
if a.startswith("."):
if ext == a:
ok = True
break
elif "/" in a:
if mime == a or (a.endswith("/*") and mime.split("/")[0] == a.split("/")[0]):
ok = True
break
else:
if ext == f".{a}" or mime == a:
ok = True
break
if not ok:
raise HTTPException(
status_code=400,
detail=f"Тип файла «{filename}» не разрешён. Допустимые: {allowed}",
)
def _uuid_str(val: Any) -> str:
if val is None:
return ""
return str(val)
async def _get_or_create_thread(unified_id: str, claim_id: Optional[str], source: str) -> str:
"""Найти тред по (unified_id, claim_id) или создать. Возвращает thread_id (UUID str)."""
if claim_id and claim_id.strip():
row = await db.fetch_one(
"SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id = $2",
unified_id,
claim_id.strip(),
)
else:
row = await db.fetch_one(
"SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id IS NULL",
unified_id,
)
if row:
return _uuid_str(row["id"])
thread_id = uuid.uuid4()
await db.execute(
"INSERT INTO clpr_support_threads (id, unified_id, claim_id, source) VALUES ($1, $2, $3, $4)",
thread_id,
unified_id,
claim_id.strip() if claim_id and claim_id.strip() else None,
source or "bar",
)
return str(thread_id)
def _support_notify_callback(conn: Any, pid: int, channel: str, payload: str) -> None:
"""Вызывается asyncpg при NOTIFY support_events. Кладём payload во inbox."""
try:
_support_notify_inbox.put_nowait(payload)
except Exception as e:
logger.warning("Support notify inbox put: %s", e)
async def _run_support_listener() -> None:
"""
Один подписчик на Postgres NOTIFY support_events.
Держит соединение, слушает канал, раскидывает по unified_id в реестр.
"""
conn: Optional[asyncpg.Connection] = None
try:
conn = await asyncpg.connect(
host=settings.postgres_host,
port=settings.postgres_port,
database=settings.postgres_db,
user=settings.postgres_user,
password=settings.postgres_password,
)
await conn.execute("LISTEN " + _SUPPORT_EVENTS_CHANNEL)
conn.add_listener(_SUPPORT_EVENTS_CHANNEL, _support_notify_callback)
logger.info("Support LISTEN %s started", _SUPPORT_EVENTS_CHANNEL)
while True:
payload = await _support_notify_inbox.get()
try:
data = json.loads(payload)
u_id = data.get("unified_id")
if not u_id:
continue
queues = _support_stream_registry.get(u_id)
if not queues:
continue
for q in list(queues):
try:
q.put_nowait(data)
except asyncio.QueueFull:
pass
except Exception as e:
logger.warning("Support stream put: %s", e)
except json.JSONDecodeError as e:
logger.warning("Support notify payload not JSON: %s", e)
except Exception as e:
logger.exception("Support listener dispatch: %s", e)
except asyncio.CancelledError:
logger.info("Support listener cancelled")
except Exception as e:
logger.exception("Support listener error: %s", e)
finally:
if conn and not conn.is_closed():
await conn.close()
logger.info("Support LISTEN stopped")
@router.get("/limits")
async def get_support_limits():
"""Лимиты вложений (0/пусто = без ограничений)."""
max_count = getattr(settings, "support_attachments_max_count", 0) or 0
max_bytes = settings.support_attachments_max_size_bytes
allowed = (getattr(settings, "support_attachments_allowed_types", None) or "").strip()
unlimited = max_count == 0 and max_bytes == 0 and not allowed
return {
"max_count": max_count,
"max_size_per_file": max_bytes,
"allowed_types": allowed,
"unlimited": unlimited,
}
@router.get("/threads")
async def get_support_threads(
session_token: Optional[str] = Query(None),
channel: Optional[str] = Query(None),
channel_user_id: Optional[str] = Query(None),
):
"""
Список всех тредов пользователя для экрана «Мои обращения».
Сессия: session_token или channel + channel_user_id.
"""
unified_id, _, _, _ = await _resolve_session(
session_token=session_token, channel=channel, channel_user_id=channel_user_id
)
rows = await db.fetch_all(
"""
SELECT
t.id,
t.claim_id,
t.source,
t.ticket_id,
t.created_at,
t.updated_at,
(SELECT m.body FROM clpr_support_messages m WHERE m.thread_id = t.id ORDER BY m.created_at DESC LIMIT 1) AS last_body,
(SELECT m.created_at FROM clpr_support_messages m WHERE m.thread_id = t.id ORDER BY m.created_at DESC LIMIT 1) AS last_at,
(SELECT COUNT(*)::int FROM clpr_support_messages m WHERE m.thread_id = t.id) AS messages_count,
(SELECT COUNT(*)::int FROM clpr_support_messages m
WHERE m.thread_id = t.id AND m.direction = 'support'
AND m.created_at > COALESCE(
(SELECT r.last_read_at FROM clpr_support_reads r WHERE r.unified_id = t.unified_id AND r.thread_id = t.id),
'1970-01-01'::timestamptz
)) AS unread_count
FROM clpr_support_threads t
WHERE t.unified_id = $1
ORDER BY COALESCE(
(SELECT m.created_at FROM clpr_support_messages m WHERE m.thread_id = t.id ORDER BY m.created_at DESC LIMIT 1),
t.updated_at,
t.created_at
) DESC
""",
unified_id,
)
threads = []
for r in rows:
last_at = r.get("last_at")
if hasattr(last_at, "isoformat"):
last_at = last_at.isoformat()
elif last_at is not None:
last_at = str(last_at)
threads.append({
"thread_id": _uuid_str(r["id"]),
"claim_id": str(r["claim_id"]).strip() if r.get("claim_id") else None,
"source": str(r.get("source") or "bar"),
"ticket_id": str(r["ticket_id"]) if r.get("ticket_id") else None,
"created_at": r["created_at"].isoformat() if hasattr(r.get("created_at"), "isoformat") else str(r.get("created_at") or ""),
"updated_at": r["updated_at"].isoformat() if hasattr(r.get("updated_at"), "isoformat") else str(r.get("updated_at") or ""),
"last_body": (r.get("last_body") or "")[:200] if r.get("last_body") else None,
"last_at": last_at,
"messages_count": r.get("messages_count") or 0,
"unread_count": r.get("unread_count") or 0,
})
return {"threads": threads}
@router.get("/unread-count")
async def get_support_unread_count(
session_token: Optional[str] = Query(None),
channel: Optional[str] = Query(None),
channel_user_id: Optional[str] = Query(None),
):
"""Суммарное число непрочитанных сообщений от поддержки (для бейджа в баре)."""
unified_id, _, _, _ = await _resolve_session(
session_token=session_token, channel=channel, channel_user_id=channel_user_id
)
row = await db.fetch_one(
"""
SELECT COALESCE(SUM(cnt), 0)::int AS total
FROM (
SELECT COUNT(*)::int AS cnt
FROM clpr_support_threads t
JOIN clpr_support_messages m ON m.thread_id = t.id AND m.direction = 'support'
WHERE t.unified_id = $1
AND m.created_at > COALESCE(
(SELECT r.last_read_at FROM clpr_support_reads r WHERE r.unified_id = t.unified_id AND r.thread_id = t.id),
'1970-01-01'::timestamptz
)
GROUP BY t.id
) s
""",
unified_id,
)
total = (row and row.get("total")) or 0
return {"unread_count": total}
@router.post("/read")
async def mark_support_thread_read(
request: Request,
session_token: Optional[str] = Query(None),
channel: Optional[str] = Query(None),
channel_user_id: Optional[str] = Query(None),
):
"""
Отметить тред как прочитанный (пользователь открыл чат).
Тело JSON: { "thread_id": "..." } или query thread_id= / claim_id=.
"""
unified_id, _, _, _ = await _resolve_session(
session_token=session_token, channel=channel, channel_user_id=channel_user_id
)
thread_id = request.query_params.get("thread_id")
claim_id = request.query_params.get("claim_id")
if not thread_id:
try:
body = await request.json() if request.headers.get("content-type", "").startswith("application/json") else {}
except Exception:
body = {}
thread_id = body.get("thread_id")
if not thread_id:
claim_id = claim_id or body.get("claim_id")
if claim_id and not thread_id:
cid = str(claim_id).strip()
if cid:
row = await db.fetch_one(
"SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id = $2",
unified_id,
cid,
)
else:
row = await db.fetch_one(
"SELECT id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id IS NULL",
unified_id,
)
if row:
thread_id = str(row["id"])
if not thread_id:
raise HTTPException(status_code=400, detail="thread_id или claim_id обязателен")
try:
thread_uuid = uuid.UUID(thread_id)
except Exception:
raise HTTPException(status_code=400, detail="Некорректный thread_id")
# Проверяем, что тред принадлежит пользователю
row = await db.fetch_one(
"SELECT id FROM clpr_support_threads WHERE id = $1 AND unified_id = $2",
thread_uuid,
unified_id,
)
if not row:
raise HTTPException(status_code=404, detail="Тред не найден")
await db.execute(
"""
INSERT INTO clpr_support_reads (unified_id, thread_id, last_read_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT (unified_id, thread_id) DO UPDATE SET last_read_at = CURRENT_TIMESTAMP
""",
unified_id,
thread_uuid,
)
return {"success": True}
@router.get("/thread")
async def get_support_thread(
claim_id: Optional[str] = Query(None),
session_token: Optional[str] = Query(None),
channel: Optional[str] = Query(None),
channel_user_id: Optional[str] = Query(None),
):
"""
Получить тред поддержки и сообщения. Query: claim_id (опционально).
Сессия: session_token или channel + channel_user_id.
"""
unified_id, _, _, _ = await _resolve_session(
session_token=session_token, channel=channel, channel_user_id=channel_user_id
)
cid = claim_id.strip() if claim_id and str(claim_id).strip() else None
if cid:
row = await db.fetch_one(
"SELECT id, ticket_id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id = $2",
unified_id,
cid,
)
else:
row = await db.fetch_one(
"SELECT id, ticket_id FROM clpr_support_threads WHERE unified_id = $1 AND claim_id IS NULL",
unified_id,
)
if not row:
return {"thread_id": None, "messages": [], "ticket_id": None}
thread_id = _uuid_str(row["id"])
ticket_id = row.get("ticket_id")
if ticket_id is not None:
ticket_id = str(ticket_id)
rows = await db.fetch_all(
"SELECT id, direction, body, attachments, created_at FROM clpr_support_messages WHERE thread_id = $1 ORDER BY created_at ASC",
row["id"],
)
messages = []
for r in rows:
att = r.get("attachments")
if att is not None and not isinstance(att, list):
try:
att = json.loads(att) if isinstance(att, str) else att
except Exception:
att = []
messages.append({
"id": _uuid_str(r["id"]),
"direction": r["direction"],
"body": r["body"] or "",
"attachments": att or [],
"created_at": r["created_at"].isoformat() if hasattr(r["created_at"], "isoformat") else str(r["created_at"]),
})
return {
"thread_id": thread_id,
"messages": messages,
"ticket_id": ticket_id,
}
@router.get("/stream")
async def support_stream(
session_token: Optional[str] = Query(None),
channel: Optional[str] = Query(None),
channel_user_id: Optional[str] = Query(None),
):
"""
SSE: один поток на пользователя по unified_id. События приходят из Postgres NOTIFY (триггер на clpr_support_messages).
Query: session_token или channel + channel_user_id.
"""
unified_id, _, _, _ = await _resolve_session(
session_token=session_token, channel=channel, channel_user_id=channel_user_id
)
queue: asyncio.Queue = asyncio.Queue(maxsize=64)
if unified_id not in _support_stream_registry:
_support_stream_registry[unified_id] = set()
_support_stream_registry[unified_id].add(queue)
async def event_gen():
try:
yield f"data: {json.dumps({'event': 'connected', 'unified_id': unified_id}, ensure_ascii=False)}\n\n"
while True:
try:
msg = await asyncio.wait_for(queue.get(), timeout=30.0)
# Формат как в SupportChat: id, direction, body, attachments, created_at
created_at = msg.get("created_at")
if hasattr(created_at, "isoformat"):
created_at = created_at.isoformat()
elif created_at is not None:
created_at = str(created_at)
event = {
"event": "support_message",
"message": {
"id": str(msg.get("message_id", "")),
"direction": msg.get("direction", "support"),
"body": msg.get("body", ""),
"attachments": json.loads(msg.get("attachments", "[]")) if isinstance(msg.get("attachments"), str) else (msg.get("attachments") or []),
"created_at": created_at,
},
"thread_id": str(msg.get("thread_id", "")),
}
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
_support_stream_registry.get(unified_id, set()).discard(queue)
if unified_id in _support_stream_registry and not _support_stream_registry[unified_id]:
del _support_stream_registry[unified_id]
return StreamingResponse(
event_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@router.post("")
async def submit_support(request: Request):
"""
Отправить сообщение в поддержку. Multipart: message, subject?, claim_id?, source, thread_id?,
session_token (или channel + channel_user_id), файлы. Создаёт/находит тред, пишет сообщение, проксирует в n8n.
"""
form = await request.form()
message = form.get("message")
if not message or not str(message).strip():
raise HTTPException(status_code=400, detail="Поле message обязательно")
message = str(message).strip()
subject = form.get("subject")
subject = str(subject).strip() if subject else None
claim_id = form.get("claim_id")
claim_id = str(claim_id).strip() if claim_id else None
source = form.get("source")
source = str(source).strip() if source else "bar"
thread_id_param = form.get("thread_id")
thread_id_param = str(thread_id_param).strip() if thread_id_param else None
session_token = form.get("session_token")
session_token = str(session_token).strip() if session_token else None
channel = form.get("channel")
channel = str(channel).strip() if channel else None
channel_user_id = form.get("channel_user_id")
channel_user_id = str(channel_user_id).strip() if channel_user_id else None
file_items: List[Tuple[str, bytes, Optional[str]]] = []
for key, value in form.multi_items():
if hasattr(value, "read") and hasattr(value, "filename"):
content = await value.read()
file_items.append((value.filename or key, content, getattr(value, "content_type", None)))
_check_attachment_limits(file_items)
unified_id, phone, email, session_id = await _resolve_session(
session_token=session_token,
channel=channel,
channel_user_id=channel_user_id,
)
thread_id = await _get_or_create_thread(unified_id, claim_id or None, source)
attachments_json = json.dumps([{"filename": fn} for fn, _, _ in file_items])
message_id = uuid.uuid4()
await db.execute(
"INSERT INTO clpr_support_messages (id, thread_id, direction, body, attachments) VALUES ($1, $2, 'user', $3, $4)",
message_id,
uuid.UUID(thread_id),
message,
attachments_json,
)
row = await db.fetch_one("SELECT ticket_id FROM clpr_support_threads WHERE id = $1", uuid.UUID(thread_id))
ticket_id = str(row["ticket_id"]) if row and row.get("ticket_id") else None
webhook_url = _get_support_webhook()
timestamp = datetime.now(timezone.utc).isoformat()
data: Dict[str, str] = {
"message": message,
"source": source or "bar",
"unified_id": unified_id or "",
"phone": (phone or "").strip(),
"email": (email or "").strip(),
"session_id": (session_id or "").strip(),
"timestamp": timestamp,
"thread_id": thread_id,
}
if subject:
data["subject"] = subject
if claim_id:
data["claim_id"] = claim_id
if ticket_id:
data["ticket_id"] = ticket_id
files_for_upload: Dict[str, Tuple[str, bytes, Optional[str]]] = {}
for i, (filename, content, content_type) in enumerate(file_items):
key = f"attachments[{i}]" if len(file_items) > 1 else "attachments"
if key in files_for_upload:
key = f"attachments[{i}]"
files_for_upload[key] = (filename, content, content_type or "application/octet-stream")
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
webhook_url,
data=data,
files=files_for_upload or None,
)
except httpx.TimeoutException:
logger.error("Таймаут вызова N8N support webhook")
raise HTTPException(status_code=504, detail="Таймаут подключения к сервису поддержки")
except Exception as e:
logger.exception("Ошибка вызова N8N support webhook: %s", e)
raise HTTPException(status_code=502, detail="Сервис поддержки временно недоступен")
if response.status_code != 200:
logger.warning("N8N support webhook вернул %s: %s", response.status_code, response.text[:500])
raise HTTPException(status_code=502, detail="Сервис поддержки вернул ошибку")
try:
resp_json = response.json()
if isinstance(resp_json, dict) and resp_json.get("ticket_id"):
tid = str(resp_json.get("ticket_id")).strip()
if tid:
await db.execute(
"UPDATE clpr_support_threads SET ticket_id = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2",
tid,
uuid.UUID(thread_id),
)
except Exception:
pass
return {
"success": True,
"thread_id": thread_id,
"message_id": str(message_id),
}
@router.post("/incoming")
async def support_incoming(
request: Request,
x_support_incoming_secret: Optional[str] = Header(None, alias="X-Support-Incoming-Secret"),
):
"""
Webhook для n8n: добавить сообщение от поддержки в тред.
Тело: JSON { "thread_id" или "ticket_id", "body", "attachments?" }.
Заголовок X-Support-Incoming-Secret должен совпадать с SUPPORT_INCOMING_SECRET (если задан).
"""
secret = (getattr(settings, "support_incoming_secret", None) or "").strip()
if secret:
header_secret = x_support_incoming_secret or request.query_params.get("secret") or ""
if header_secret.strip() != secret:
raise HTTPException(status_code=403, detail="Invalid secret")
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="JSON body required")
thread_id = body.get("thread_id")
ticket_id = body.get("ticket_id")
msg_body = (body.get("body") or "").strip()
attachments = body.get("attachments")
if isinstance(attachments, list):
attachments = json.dumps(attachments)
else:
attachments = "[]"
if not thread_id and not ticket_id:
raise HTTPException(status_code=400, detail="thread_id or ticket_id required")
if ticket_id and not thread_id:
row = await db.fetch_one("SELECT id FROM clpr_support_threads WHERE ticket_id = $1", str(ticket_id))
if not row:
raise HTTPException(status_code=404, detail="Thread not found by ticket_id")
thread_id = str(row["id"])
try:
thread_uuid = uuid.UUID(thread_id)
except Exception:
raise HTTPException(status_code=400, detail="Invalid thread_id")
msg_id = uuid.uuid4()
await db.execute(
"INSERT INTO clpr_support_messages (id, thread_id, direction, body, attachments) VALUES ($1, $2, 'support', $3, $4)",
msg_id,
thread_uuid,
msg_body,
attachments,
)
logger.info("Support incoming message added: thread_id=%s", thread_id)
return {"success": True, "message_id": str(msg_id)}

View File

@@ -5,6 +5,7 @@ from fastapi import FastAPI, Request
import json
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import asyncio
import logging
import time
import uuid
@@ -18,7 +19,7 @@ from .services.rabbitmq_service import rabbitmq_service
from .services.policy_service import policy_service
from .services.crm_mysql_service import crm_mysql_service
from .services.s3_service import s3_service
from .api import sms, claims, policy, upload, draft, events, n8n_proxy, session, documents, banks, telegram_auth, max_auth, auth2, auth_universal, documents_draft_open, profile
from .api import sms, claims, policy, upload, draft, events, n8n_proxy, session, documents, banks, telegram_auth, max_auth, auth2, auth_universal, documents_draft_open, profile, support
from .api import debug_session
# Настройка логирования
@@ -161,6 +162,14 @@ async def lifespan(app: FastAPI):
s3_service.connect()
except Exception as e:
logger.warning(f"⚠️ S3 storage not available: {e}")
# Postgres LISTEN support_events для доставки сообщений поддержки в реальном времени (SSE)
support_listener_task = None
try:
support_listener_task = asyncio.create_task(support._run_support_listener())
logger.info("✅ Support NOTIFY listener task started")
except Exception as e:
logger.warning(f"⚠️ Support listener not started: {e}")
logger.info("✅ Ticket Form Intake Platform started successfully!")
@@ -168,6 +177,12 @@ async def lifespan(app: FastAPI):
# SHUTDOWN
logger.info("🛑 Shutting down Ticket Form Intake Platform...")
if support_listener_task and not support_listener_task.done():
support_listener_task.cancel()
try:
await support_listener_task
except asyncio.CancelledError:
pass
await db.disconnect()
await redis_service.disconnect()
@@ -243,6 +258,7 @@ app.include_router(max_auth.router) # 📱 MAX Mini App auth
app.include_router(auth2.router) # 🆕 Alt auth endpoint (tg/max/sms)
app.include_router(auth_universal.router) # Универсальный auth: channel + init_data → N8N_AUTH_WEBHOOK, Redis session:{channel}:{channel_user_id}
app.include_router(profile.router) # 👤 Профиль: контакты из CRM через N8N_CONTACT_WEBHOOK
app.include_router(support.router) # 📞 Поддержка: форма из бара и карточек жалоб → n8n
app.include_router(documents_draft_open.router) # 🆕 Documents draft-open (isolated)
app.include_router(debug_session.router) # 🔧 Debug helpers (set session + redirect)

View File

@@ -0,0 +1,32 @@
-- Треды и сообщения поддержки (диалог). Префикс таблиц: clpr_
-- Один тред на (unified_id, claim_id или null); сообщения user/support
CREATE TABLE IF NOT EXISTS clpr_support_threads (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
unified_id VARCHAR(255) NOT NULL,
claim_id VARCHAR(255),
source VARCHAR(50) NOT NULL DEFAULT 'bar',
ticket_id VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_clpr_support_threads_unified_claim ON clpr_support_threads(unified_id, claim_id);
CREATE INDEX IF NOT EXISTS idx_clpr_support_threads_ticket ON clpr_support_threads(ticket_id);
CREATE TABLE IF NOT EXISTS clpr_support_messages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
thread_id UUID NOT NULL REFERENCES clpr_support_threads(id) ON DELETE CASCADE,
direction VARCHAR(20) NOT NULL CHECK (direction IN ('user', 'support')),
body TEXT NOT NULL DEFAULT '',
attachments JSONB DEFAULT '[]',
external_id VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_clpr_support_messages_thread ON clpr_support_messages(thread_id);
CREATE INDEX IF NOT EXISTS idx_clpr_support_messages_created ON clpr_support_messages(thread_id, created_at);
CREATE UNIQUE INDEX IF NOT EXISTS idx_clpr_support_messages_external ON clpr_support_messages(thread_id, external_id) WHERE external_id IS NOT NULL;
COMMENT ON TABLE clpr_support_threads IS 'Треды обращений в поддержку: один на пользователя (бар) или по claim_id';
COMMENT ON TABLE clpr_support_messages IS 'Сообщения в треде: user — от пользователя, support — от оператора';

View File

@@ -0,0 +1,37 @@
-- NOTIFY при INSERT в clpr_support_messages для доставки в реальном времени (SSE).
-- Один канал support_events, в payload — unified_id и данные сообщения.
-- Таблицы поддержки с префиксом clpr_
CREATE OR REPLACE FUNCTION support_messages_notify()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
u_id VARCHAR(255);
payload TEXT;
BEGIN
SELECT unified_id INTO u_id FROM clpr_support_threads WHERE id = NEW.thread_id;
IF u_id IS NULL THEN
RETURN NEW;
END IF;
payload := json_build_object(
'unified_id', u_id,
'thread_id', NEW.thread_id,
'message_id', NEW.id,
'direction', NEW.direction,
'body', NEW.body,
'attachments', COALESCE(NEW.attachments::TEXT, '[]'),
'created_at', NEW.created_at
)::TEXT;
PERFORM pg_notify('support_events', payload);
RETURN NEW;
END;
$$;
DROP TRIGGER IF EXISTS after_support_message_insert ON clpr_support_messages;
CREATE TRIGGER after_support_message_insert
AFTER INSERT ON clpr_support_messages
FOR EACH ROW
EXECUTE PROCEDURE support_messages_notify();
COMMENT ON FUNCTION support_messages_notify() IS 'NOTIFY support_events при новом сообщении для SSE';

View File

@@ -0,0 +1,13 @@
-- Отметки «прочитано» по тредам: когда пользователь последний раз видел тред.
-- Непрочитанные = сообщения от support с created_at > last_read_at.
CREATE TABLE IF NOT EXISTS clpr_support_reads (
unified_id VARCHAR(255) NOT NULL,
thread_id UUID NOT NULL REFERENCES clpr_support_threads(id) ON DELETE CASCADE,
last_read_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (unified_id, thread_id)
);
CREATE INDEX IF NOT EXISTS idx_clpr_support_reads_unified ON clpr_support_reads(unified_id);
COMMENT ON TABLE clpr_support_reads IS 'Когда пользователь последний раз «прочитал» тред (открыл чат)';