Files
crm.clientright.ru/ticket_form/backend/app/api/events.py
Fedor e114231541 feat: Получение cf_2624 из MySQL при загрузке черновика
- Добавлен сервис CrmMySQLService для подключения к MySQL БД vtiger CRM
- Обновлён get_draft() для прямого SQL запроса к MySQL вместо webservice API
- Получение cf_2624 и всех данных контакта из MySQL
- Обновлена документация и SQL файлы для n8n
- Добавлено логирование для отладки

Преимущества:
- Проще: один SQL запрос вместо цепочки HTTP запросов
- Быстрее: прямой запрос к БД
- Надёжнее: не зависит от webservice API
- Актуальнее: всегда свежие данные из БД
2025-12-03 16:04:25 +03:00

474 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub
"""
import asyncio
import json
from fastapi import APIRouter, Body
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict, Any
from app.services.redis_service import redis_service
from app.services.database import db
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1", tags=["Events"])
class EventPublish(BaseModel):
"""Модель для публикации события"""
event_type: str = "ocr_completed"
status: str
message: str
data: Dict[str, Any] = {}
timestamp: str = None
@router.post("/events/{task_id}")
async def publish_event(task_id: str, event: EventPublish):
"""
Публикация события в Redis канал
Используется n8n для отправки событий (OCR, AI, wizard и т.д.)
Args:
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
Используется для формирования канала ocr_events:{session_token}
event: Данные события
Returns:
Статус публикации
"""
try:
# task_id на самом деле это session_token
channel = f"ocr_events:{task_id}"
event_data = {
"event_type": event.event_type,
"status": event.status,
"message": event.message,
"data": event.data,
"timestamp": event.timestamp
}
# Публикуем в Redis
event_json = json.dumps(event_data, ensure_ascii=False)
await redis_service.publish(channel, event_json)
logger.info(f"📢 Event published to {channel}: {event.status}")
return {
"success": True,
"channel": channel,
"event": event_data
}
except Exception as e:
logger.error(f"❌ Failed to publish event: {e}")
return {
"success": False,
"error": str(e)
}
@router.get("/events/{task_id}")
async def stream_events(task_id: str):
"""
SSE стрим событий обработки OCR, AI, wizard и т.д.
Args:
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
Используется для формирования канала ocr_events:{session_token}
Фронтенд подключается через EventSource к этому эндпоинту
Returns:
StreamingResponse с событиями
"""
logger.info(f"🚀 SSE connection requested for session_token: {task_id}")
async def event_generator():
"""Генератор событий из Redis Pub/Sub"""
# task_id на самом деле это session_token
channel = f"ocr_events:{task_id}"
# Подписываемся на канал Redis
pubsub = redis_service.client.pubsub()
await pubsub.subscribe(channel)
logger.info(f"📡 Client subscribed to {channel}")
# Отправляем начальное событие
yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n"
try:
# Слушаем события
while True:
logger.info(f"⏳ Waiting for message on {channel}...")
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=60.0) # Увеличено для RAG обработки
if message:
logger.info(f"📥 Received message type: {message['type']}")
if message['type'] == 'message':
event_data = message['data'] # Уже строка (decode_responses=True)
logger.info(f"📦 Raw event data: {event_data[:200]}...")
event = json.loads(event_data)
# Обработка формата от n8n Redis ноды (вложенный)
# Формат: {"claim_id": "...", "event": {...}}
if 'event' in event and isinstance(event['event'], dict):
# Извлекаем вложенное событие
actual_event = event['event']
logger.info(f"📦 Unwrapped n8n Redis format for {task_id}")
else:
# Формат уже плоский (от backend API или старых источников)
actual_event = event
# ✅ Логируем полученное событие
event_type = actual_event.get('event_type')
logger.info(f"🔍 Processing event: event_type={event_type}, has claim_id={bool(actual_event.get('claim_id'))}")
# ✅ Обработка нового формата: documents_list_ready
if event_type == 'documents_list_ready':
logger.info(f"📋 Documents list received: {len(actual_event.get('documents_required', []))} documents")
# Просто пропускаем дальше к yield
# ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type
# Это значит, что n8n пушит минимальный payload для wizard_ready
elif not event_type and actual_event.get('claim_id'):
logger.info(f"📦 Detected minimal wizard payload (no event_type), wrapping for claim_id={actual_event.get('claim_id')}")
# Обёртываем в правильный формат
actual_event = {
'event_type': 'wizard_ready',
'status': 'ready',
'message': 'Wizard plan готов',
'data': actual_event, # Весь объект становится data
'timestamp': actual_event.get('timestamp') or None
}
logger.info(f"✅ Wrapped minimal payload into wizard_ready event")
# Обработка события wizard_ready: загружаем данные из PostgreSQL
if actual_event.get('event_type') == 'wizard_ready' and actual_event.get('data', {}).get('claim_id'):
claim_id = actual_event['data']['claim_id']
logger.info(f"🔍 Wizard ready event received, loading data for claim_id={claim_id}")
try:
# Загружаем данные из PostgreSQL
query = """
SELECT
id,
payload->>'claim_id' as claim_id,
session_token,
unified_id,
status_code,
channel,
payload,
created_at,
updated_at
FROM clpr_claims
WHERE (payload->>'claim_id' = $1 OR id::text = $1)
LIMIT 1
"""
row = await db.fetch_one(query, claim_id)
if row:
# Обрабатываем payload - может быть строкой (JSONB) или уже dict
payload_raw = row.get('payload')
if isinstance(payload_raw, str):
try:
payload = json.loads(payload_raw) if payload_raw else {}
except (json.JSONDecodeError, TypeError):
payload = {}
elif isinstance(payload_raw, dict):
payload = payload_raw
else:
payload = {}
# Извлекаем claim_id из payload, если его нет в row
claim_id_from_payload = payload.get('claim_id') if isinstance(payload, dict) else None
final_claim_id = row.get('claim_id') or claim_id_from_payload or str(row['id'])
# Обогащаем событие полными данными из PostgreSQL
# Добавляем данные и в data, и в корень для совместимости с фронтендом
actual_event['data'] = {
**actual_event.get('data', {}),
'wizard_plan': payload.get('wizard_plan'),
'problem_description': payload.get('problem_description'),
'wizard_answers': payload.get('answers'),
'answers_prefill': payload.get('answers_prefill'),
'documents_meta': payload.get('documents_meta', []),
'ai_agent1_facts': payload.get('ai_agent1_facts'),
'ai_agent13_rag': payload.get('ai_agent13_rag'),
'coverage_report': payload.get('coverage_report'),
'phone': payload.get('phone'),
'email': payload.get('email'),
}
# Также добавляем wizard_plan в корень для совместимости с фронтендом
actual_event['wizard_plan'] = payload.get('wizard_plan')
actual_event['answers_prefill'] = payload.get('answers_prefill')
actual_event['coverage_report'] = payload.get('coverage_report')
logger.info(f"✅ Wizard data loaded from PostgreSQL for claim_id={final_claim_id}, has_wizard_plan={payload.get('wizard_plan') is not None}")
else:
logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}")
except Exception as e:
logger.error(f"❌ Error loading wizard data from PostgreSQL: {e}")
# ✅ Обработка ocr_status ready: загружаем form_draft из PostgreSQL
if actual_event.get('event_type') == 'ocr_status' and actual_event.get('status') == 'ready':
claim_id = actual_event.get('claim_id') or actual_event.get('data', {}).get('claim_id')
# ✅ Получаем cf_2624 из события (Данные подтверждены)
cf_2624 = actual_event.get('cf_2624')
if claim_id:
logger.info(f"🔍 OCR ready event received, loading form_draft for claim_id={claim_id}, cf_2624={cf_2624}")
try:
# ✅ Если есть cf_2624 в событии - сохраняем в черновик
if cf_2624 is not None:
try:
update_query = """
UPDATE clpr_claims
SET payload = jsonb_set(
COALESCE(payload, '{}'::jsonb),
'{cf_2624}',
$1::jsonb
)
WHERE id::text = $2 OR payload->>'claim_id' = $2
RETURNING id;
"""
await db.execute(update_query, json.dumps(cf_2624), claim_id)
logger.info(f"✅ Сохранён cf_2624={cf_2624} в черновик claim_id={claim_id}")
except Exception as e:
logger.warning(f"⚠️ Не удалось сохранить cf_2624 в черновик: {e}")
# Загружаем form_draft и documents из PostgreSQL
query = """
SELECT
c.id,
c.payload->'form_draft' as form_draft,
c.payload->'documents_required' as documents_required,
c.payload->'documents_meta' as documents_meta,
c.payload->>'cf_2624' as cf_2624
FROM clpr_claims c
WHERE c.id::text = $1 OR c.payload->>'claim_id' = $1
LIMIT 1
"""
row = await db.fetch_one(query, claim_id)
if row:
# Парсим JSONB поля (могут быть строками)
form_draft_raw = row.get('form_draft')
documents_required_raw = row.get('documents_required')
documents_meta_raw = row.get('documents_meta')
cf_2624_from_db = row.get('cf_2624') # ✅ Получаем cf_2624 из БД
# Парсим если строка
def parse_json_field(val):
if val is None:
return None
if isinstance(val, str):
try:
return json.loads(val)
except:
return val
return val
form_draft = parse_json_field(form_draft_raw)
documents_required = parse_json_field(documents_required_raw)
documents_meta = parse_json_field(documents_meta_raw)
# Обогащаем событие данными из БД
actual_event['data'] = {
'claim_id': claim_id,
'all_ready': True,
'form_draft': form_draft,
'documents_required': documents_required,
'documents_meta': documents_meta,
}
# ✅ Добавляем cf_2624 в событие (из БД или из события)
actual_event['cf_2624'] = cf_2624_from_db or cf_2624 or "0"
logger.info(f"✅ Form draft loaded from PostgreSQL for claim_id={claim_id}, has_form_draft={form_draft is not None}, cf_2624={actual_event.get('cf_2624')}")
else:
logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}")
except Exception as e:
logger.error(f"❌ Error loading form_draft from PostgreSQL: {e}")
# Отправляем событие клиенту (плоский формат)
event_json = json.dumps(actual_event, ensure_ascii=False, default=str)
event_type_sent = actual_event.get('event_type', 'unknown')
event_status = actual_event.get('status', 'unknown')
# Логируем размер и наличие данных
data_info = actual_event.get('data', {})
has_form_draft = 'form_draft' in data_info if isinstance(data_info, dict) else False
logger.info(f"📤 Sending event to client: type={event_type_sent}, status={event_status}, json_len={len(event_json)}, has_form_draft={has_form_draft}")
yield f"data: {event_json}\n\n"
# Если обработка завершена - закрываем соединение
# НЕ закрываем для documents_list_ready и document_ocr_completed (ждём ещё события)
if event_status in ['completed', 'error'] and event_type_sent not in ['documents_list_ready', 'document_ocr_completed', 'document_uploaded']:
logger.info(f"✅ Task {task_id} finished, closing SSE")
break
# Закрываем для финальных событий
if event_type_sent in ['claim_ready', 'claim_plan_ready']:
logger.info(f"✅ Final event {event_type_sent} sent, closing SSE")
break
# Закрываем для ocr_status ready (форма заявления готова)
if event_type_sent == 'ocr_status' and event_status == 'ready':
logger.info(f"✅ OCR ready event sent, closing SSE")
break
else:
logger.info(f"⏰ Timeout waiting for message on {channel}")
# Пинг каждые 30 сек чтобы соединение не закрылось
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info(f"❌ Client disconnected from {channel}")
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключаем буферизацию nginx
}
)
@router.get("/claim-plan/{session_token}")
async def stream_claim_plan(session_token: str):
"""
SSE стрим для получения данных заявления из канала claim:plan:{session_token}
Используется после отправки формы визарда для получения данных заявления
от n8n workflow, которые затем отображаются в форме подтверждения.
Args:
session_token: Session token (например, sess_c9e7c0c2-de2e-40cd-ab7c-3bdc40282d34)
Используется для формирования канала claim:plan:{session_token}
Returns:
StreamingResponse с данными заявления в формате:
{
"event_type": "claim_plan_ready",
"status": "ready",
"data": {
"propertyName": {...}, // Данные заявления из n8n
...
}
}
"""
logger.info(f"🚀 Claim plan SSE connection requested for session_token: {session_token}")
async def claim_plan_generator():
"""Генератор событий из Redis Pub/Sub для claim:plan канала"""
channel = f"claim:plan:{session_token}"
# Подписываемся на канал Redis
pubsub = redis_service.client.pubsub()
await pubsub.subscribe(channel)
logger.info(f"📡 Client subscribed to {channel}")
# Отправляем начальное событие
yield f"data: {json.dumps({'status': 'connected', 'message': 'Ожидание данных заявления...'})}\n\n"
try:
# Слушаем события (таймаут 5 минут для обработки в n8n)
while True:
logger.info(f"⏳ Waiting for claim plan data on {channel}...")
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=300.0)
if message:
logger.info(f"📥 Received claim plan message type: {message['type']}")
if message['type'] == 'message':
event_data_raw = message['data'] # Уже строка (decode_responses=True)
logger.info(f"📦 Raw claim plan data length: {len(event_data_raw)}")
try:
# Парсим данные от n8n
claim_data = json.loads(event_data_raw)
# Формируем событие в стандартном формате
event = {
"event_type": "claim_plan_ready",
"status": "ready",
"message": "Данные заявления готовы",
"data": claim_data, # Весь объект от n8n
"timestamp": None
}
logger.info(f"✅ Claim plan data received for session {session_token}")
# Отправляем событие клиенту
event_json = json.dumps(event, ensure_ascii=False)
yield f"data: {event_json}\n\n"
# После получения данных закрываем соединение
logger.info(f"✅ Claim plan sent to client, closing SSE")
break
except json.JSONDecodeError as e:
logger.error(f"❌ Failed to parse claim plan JSON: {e}")
error_event = {
"event_type": "claim_plan_error",
"status": "error",
"message": f"Ошибка парсинга данных: {str(e)}",
"data": {},
"timestamp": None
}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
break
else:
logger.info(f"⏰ Timeout waiting for claim plan on {channel}")
# Отправляем timeout событие
timeout_event = {
"event_type": "claim_plan_timeout",
"status": "timeout",
"message": "Превышено время ожидания данных заявления",
"data": {},
"timestamp": None
}
yield f"data: {json.dumps(timeout_event, ensure_ascii=False)}\n\n"
break
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info(f"❌ Client disconnected from {channel}")
except Exception as e:
logger.error(f"❌ Error in claim plan stream: {e}")
error_event = {
"event_type": "claim_plan_error",
"status": "error",
"message": f"Ошибка получения данных: {str(e)}",
"data": {},
"timestamp": None
}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
return StreamingResponse(
claim_plan_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключаем буферизацию nginx
}
)