""" SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub """ import asyncio import json from fastapi import APIRouter, Body from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Dict, Any from app.services.redis_service import redis_service from app.services.database import db import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1", tags=["Events"]) class EventPublish(BaseModel): """Модель для публикации события""" event_type: str = "ocr_completed" status: str message: str data: Dict[str, Any] = {} timestamp: str = None @router.post("/events/{task_id}") async def publish_event(task_id: str, event: EventPublish): """ Публикация события в Redis канал Используется n8n для отправки событий (OCR, AI, wizard и т.д.) Args: task_id: Session token (например, sess-1763201209156-hyjye5u9h) Используется для формирования канала ocr_events:{session_token} event: Данные события Returns: Статус публикации """ try: # task_id на самом деле это session_token channel = f"ocr_events:{task_id}" event_data = { "event_type": event.event_type, "status": event.status, "message": event.message, "data": event.data, "timestamp": event.timestamp } # Публикуем в Redis event_json = json.dumps(event_data, ensure_ascii=False) await redis_service.publish(channel, event_json) logger.info(f"📢 Event published to {channel}: {event.status}") return { "success": True, "channel": channel, "event": event_data } except Exception as e: logger.error(f"❌ Failed to publish event: {e}") return { "success": False, "error": str(e) } @router.get("/events/{task_id}") async def stream_events(task_id: str): """ SSE стрим событий обработки OCR, AI, wizard и т.д. Args: task_id: Session token (например, sess-1763201209156-hyjye5u9h) Используется для формирования канала ocr_events:{session_token} Фронтенд подключается через EventSource к этому эндпоинту Returns: StreamingResponse с событиями """ logger.info(f"🚀 SSE connection requested for session_token: {task_id}") async def event_generator(): """Генератор событий из Redis Pub/Sub""" # task_id на самом деле это session_token channel = f"ocr_events:{task_id}" # Подписываемся на канал Redis pubsub = redis_service.client.pubsub() await pubsub.subscribe(channel) logger.info(f"📡 Client subscribed to {channel}") # Отправляем начальное событие yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n" try: # Слушаем события while True: logger.info(f"⏳ Waiting for message on {channel}...") message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=60.0) # Увеличено для RAG обработки if message: logger.info(f"📥 Received message type: {message['type']}") if message['type'] == 'message': event_data = message['data'] # Уже строка (decode_responses=True) logger.info(f"📦 Raw event data: {event_data[:200]}...") event = json.loads(event_data) # Обработка формата от n8n Redis ноды (вложенный) # Формат: {"claim_id": "...", "event": {...}} if 'event' in event and isinstance(event['event'], dict): # Извлекаем вложенное событие actual_event = event['event'] logger.info(f"📦 Unwrapped n8n Redis format for {task_id}") else: # Формат уже плоский (от backend API или старых источников) actual_event = event # ✅ Логируем полученное событие event_type = actual_event.get('event_type') logger.info(f"🔍 Processing event: event_type={event_type}, has claim_id={bool(actual_event.get('claim_id'))}") # ✅ Обработка нового формата: documents_list_ready if event_type == 'documents_list_ready': logger.info(f"📋 Documents list received: {len(actual_event.get('documents_required', []))} documents") # Просто пропускаем дальше к yield # ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type # Это значит, что n8n пушит минимальный payload для wizard_ready elif not event_type and actual_event.get('claim_id'): logger.info(f"📦 Detected minimal wizard payload (no event_type), wrapping for claim_id={actual_event.get('claim_id')}") # Обёртываем в правильный формат actual_event = { 'event_type': 'wizard_ready', 'status': 'ready', 'message': 'Wizard plan готов', 'data': actual_event, # Весь объект становится data 'timestamp': actual_event.get('timestamp') or None } logger.info(f"✅ Wrapped minimal payload into wizard_ready event") # Обработка события wizard_ready: загружаем данные из PostgreSQL if actual_event.get('event_type') == 'wizard_ready' and actual_event.get('data', {}).get('claim_id'): claim_id = actual_event['data']['claim_id'] logger.info(f"🔍 Wizard ready event received, loading data for claim_id={claim_id}") try: # Загружаем данные из PostgreSQL query = """ SELECT id, payload->>'claim_id' as claim_id, session_token, unified_id, status_code, channel, payload, created_at, updated_at FROM clpr_claims WHERE (payload->>'claim_id' = $1 OR id::text = $1) LIMIT 1 """ row = await db.fetch_one(query, claim_id) if row: # Обрабатываем payload - может быть строкой (JSONB) или уже dict payload_raw = row.get('payload') if isinstance(payload_raw, str): try: payload = json.loads(payload_raw) if payload_raw else {} except (json.JSONDecodeError, TypeError): payload = {} elif isinstance(payload_raw, dict): payload = payload_raw else: payload = {} # Извлекаем claim_id из payload, если его нет в row claim_id_from_payload = payload.get('claim_id') if isinstance(payload, dict) else None final_claim_id = row.get('claim_id') or claim_id_from_payload or str(row['id']) # Обогащаем событие полными данными из PostgreSQL # Добавляем данные и в data, и в корень для совместимости с фронтендом actual_event['data'] = { **actual_event.get('data', {}), 'wizard_plan': payload.get('wizard_plan'), 'problem_description': payload.get('problem_description'), 'wizard_answers': payload.get('answers'), 'answers_prefill': payload.get('answers_prefill'), 'documents_meta': payload.get('documents_meta', []), 'ai_agent1_facts': payload.get('ai_agent1_facts'), 'ai_agent13_rag': payload.get('ai_agent13_rag'), 'coverage_report': payload.get('coverage_report'), 'phone': payload.get('phone'), 'email': payload.get('email'), } # Также добавляем wizard_plan в корень для совместимости с фронтендом actual_event['wizard_plan'] = payload.get('wizard_plan') actual_event['answers_prefill'] = payload.get('answers_prefill') actual_event['coverage_report'] = payload.get('coverage_report') logger.info(f"✅ Wizard data loaded from PostgreSQL for claim_id={final_claim_id}, has_wizard_plan={payload.get('wizard_plan') is not None}") else: logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}") except Exception as e: logger.error(f"❌ Error loading wizard data from PostgreSQL: {e}") # ✅ Обработка ocr_status ready: загружаем form_draft из PostgreSQL if actual_event.get('event_type') == 'ocr_status' and actual_event.get('status') == 'ready': claim_id = actual_event.get('claim_id') or actual_event.get('data', {}).get('claim_id') # ✅ Получаем cf_2624 из события (Данные подтверждены) cf_2624 = actual_event.get('cf_2624') if claim_id: logger.info(f"🔍 OCR ready event received, loading form_draft for claim_id={claim_id}, cf_2624={cf_2624}") try: # ✅ Если есть cf_2624 в событии - сохраняем в черновик if cf_2624 is not None: try: update_query = """ UPDATE clpr_claims SET payload = jsonb_set( COALESCE(payload, '{}'::jsonb), '{cf_2624}', $1::jsonb ) WHERE id::text = $2 OR payload->>'claim_id' = $2 RETURNING id; """ await db.execute(update_query, json.dumps(cf_2624), claim_id) logger.info(f"✅ Сохранён cf_2624={cf_2624} в черновик claim_id={claim_id}") except Exception as e: logger.warning(f"⚠️ Не удалось сохранить cf_2624 в черновик: {e}") # Загружаем form_draft и documents из PostgreSQL query = """ SELECT c.id, c.payload->'form_draft' as form_draft, c.payload->'documents_required' as documents_required, c.payload->'documents_meta' as documents_meta, c.payload->>'cf_2624' as cf_2624 FROM clpr_claims c WHERE c.id::text = $1 OR c.payload->>'claim_id' = $1 LIMIT 1 """ row = await db.fetch_one(query, claim_id) if row: # Парсим JSONB поля (могут быть строками) form_draft_raw = row.get('form_draft') documents_required_raw = row.get('documents_required') documents_meta_raw = row.get('documents_meta') cf_2624_from_db = row.get('cf_2624') # ✅ Получаем cf_2624 из БД # Парсим если строка def parse_json_field(val): if val is None: return None if isinstance(val, str): try: return json.loads(val) except: return val return val form_draft = parse_json_field(form_draft_raw) documents_required = parse_json_field(documents_required_raw) documents_meta = parse_json_field(documents_meta_raw) # Обогащаем событие данными из БД actual_event['data'] = { 'claim_id': claim_id, 'all_ready': True, 'form_draft': form_draft, 'documents_required': documents_required, 'documents_meta': documents_meta, } # ✅ Добавляем cf_2624 в событие (из БД или из события) actual_event['cf_2624'] = cf_2624_from_db or cf_2624 or "0" logger.info(f"✅ Form draft loaded from PostgreSQL for claim_id={claim_id}, has_form_draft={form_draft is not None}, cf_2624={actual_event.get('cf_2624')}") else: logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}") except Exception as e: logger.error(f"❌ Error loading form_draft from PostgreSQL: {e}") # Отправляем событие клиенту (плоский формат) event_json = json.dumps(actual_event, ensure_ascii=False, default=str) event_type_sent = actual_event.get('event_type', 'unknown') event_status = actual_event.get('status', 'unknown') # Логируем размер и наличие данных data_info = actual_event.get('data', {}) has_form_draft = 'form_draft' in data_info if isinstance(data_info, dict) else False logger.info(f"📤 Sending event to client: type={event_type_sent}, status={event_status}, json_len={len(event_json)}, has_form_draft={has_form_draft}") yield f"data: {event_json}\n\n" # Если обработка завершена - закрываем соединение # НЕ закрываем для documents_list_ready и document_ocr_completed (ждём ещё события) if event_status in ['completed', 'error'] and event_type_sent not in ['documents_list_ready', 'document_ocr_completed', 'document_uploaded']: logger.info(f"✅ Task {task_id} finished, closing SSE") break # Закрываем для финальных событий if event_type_sent in ['claim_ready', 'claim_plan_ready']: logger.info(f"✅ Final event {event_type_sent} sent, closing SSE") break # Закрываем для ocr_status ready (форма заявления готова) if event_type_sent == 'ocr_status' and event_status == 'ready': logger.info(f"✅ OCR ready event sent, closing SSE") break else: logger.info(f"⏰ Timeout waiting for message on {channel}") # Пинг каждые 30 сек чтобы соединение не закрылось await asyncio.sleep(0.1) except asyncio.CancelledError: logger.info(f"❌ Client disconnected from {channel}") finally: await pubsub.unsubscribe(channel) await pubsub.close() return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Отключаем буферизацию nginx } ) @router.get("/claim-plan/{session_token}") async def stream_claim_plan(session_token: str): """ SSE стрим для получения данных заявления из канала claim:plan:{session_token} Используется после отправки формы визарда для получения данных заявления от n8n workflow, которые затем отображаются в форме подтверждения. Args: session_token: Session token (например, sess_c9e7c0c2-de2e-40cd-ab7c-3bdc40282d34) Используется для формирования канала claim:plan:{session_token} Returns: StreamingResponse с данными заявления в формате: { "event_type": "claim_plan_ready", "status": "ready", "data": { "propertyName": {...}, // Данные заявления из n8n ... } } """ logger.info(f"🚀 Claim plan SSE connection requested for session_token: {session_token}") async def claim_plan_generator(): """Генератор событий из Redis Pub/Sub для claim:plan канала""" channel = f"claim:plan:{session_token}" # Подписываемся на канал Redis pubsub = redis_service.client.pubsub() await pubsub.subscribe(channel) logger.info(f"📡 Client subscribed to {channel}") # Отправляем начальное событие yield f"data: {json.dumps({'status': 'connected', 'message': 'Ожидание данных заявления...'})}\n\n" try: # Слушаем события (таймаут 5 минут для обработки в n8n) while True: logger.info(f"⏳ Waiting for claim plan data on {channel}...") message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=300.0) if message: logger.info(f"📥 Received claim plan message type: {message['type']}") if message['type'] == 'message': event_data_raw = message['data'] # Уже строка (decode_responses=True) logger.info(f"📦 Raw claim plan data length: {len(event_data_raw)}") try: # Парсим данные от n8n claim_data = json.loads(event_data_raw) # Формируем событие в стандартном формате event = { "event_type": "claim_plan_ready", "status": "ready", "message": "Данные заявления готовы", "data": claim_data, # Весь объект от n8n "timestamp": None } logger.info(f"✅ Claim plan data received for session {session_token}") # Отправляем событие клиенту event_json = json.dumps(event, ensure_ascii=False) yield f"data: {event_json}\n\n" # После получения данных закрываем соединение logger.info(f"✅ Claim plan sent to client, closing SSE") break except json.JSONDecodeError as e: logger.error(f"❌ Failed to parse claim plan JSON: {e}") error_event = { "event_type": "claim_plan_error", "status": "error", "message": f"Ошибка парсинга данных: {str(e)}", "data": {}, "timestamp": None } yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" break else: logger.info(f"⏰ Timeout waiting for claim plan on {channel}") # Отправляем timeout событие timeout_event = { "event_type": "claim_plan_timeout", "status": "timeout", "message": "Превышено время ожидания данных заявления", "data": {}, "timestamp": None } yield f"data: {json.dumps(timeout_event, ensure_ascii=False)}\n\n" break await asyncio.sleep(0.1) except asyncio.CancelledError: logger.info(f"❌ Client disconnected from {channel}") except Exception as e: logger.error(f"❌ Error in claim plan stream: {e}") error_event = { "event_type": "claim_plan_error", "status": "error", "message": f"Ошибка получения данных: {str(e)}", "data": {}, "timestamp": None } yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" finally: await pubsub.unsubscribe(channel) await pubsub.close() return StreamingResponse( claim_plan_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Отключаем буферизацию nginx } )