""" SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub """ import asyncio import json from fastapi import APIRouter, Body from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Dict, Any from app.services.redis_service import redis_service import logging logger = logging.getLogger(__name__) router = APIRouter() class EventPublish(BaseModel): """Модель для публикации события""" event_type: str = "ocr_completed" status: str message: str data: Dict[str, Any] = {} timestamp: str = None @router.post("/events/{task_id}") async def publish_event(task_id: str, event: EventPublish): """ Публикация события в Redis канал Используется n8n для отправки событий (OCR, AI и т.д.) Args: task_id: ID задачи event: Данные события Returns: Статус публикации """ try: channel = f"ocr_events:{task_id}" event_data = { "event_type": event.event_type, "status": event.status, "message": event.message, "data": event.data, "timestamp": event.timestamp } # Публикуем в Redis event_json = json.dumps(event_data, ensure_ascii=False) await redis_service.publish(channel, event_json) logger.info(f"📢 Event published to {channel}: {event.status}") return { "success": True, "channel": channel, "event": event_data } except Exception as e: logger.error(f"❌ Failed to publish event: {e}") return { "success": False, "error": str(e) } @router.get("/events/{task_id}") async def stream_events(task_id: str): """ SSE стрим событий обработки OCR Args: task_id: ID задачи Returns: StreamingResponse с событиями """ logger.info(f"🚀 SSE connection requested for task_id: {task_id}") async def event_generator(): """Генератор событий из Redis Pub/Sub""" channel = f"ocr_events:{task_id}" # Подписываемся на канал Redis pubsub = redis_service.client.pubsub() await pubsub.subscribe(channel) logger.info(f"📡 Client subscribed to {channel}") # Отправляем начальное событие yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n" try: # Слушаем события while True: logger.info(f"⏳ Waiting for message on {channel}...") message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=60.0) # Увеличено для RAG обработки if message: logger.info(f"📥 Received message type: {message['type']}") if message['type'] == 'message': event_data = message['data'] # Уже строка (decode_responses=True) logger.info(f"📦 Raw event data: {event_data[:200]}...") event = json.loads(event_data) # Обработка формата от n8n Redis ноды (вложенный) # Формат: {"claim_id": "...", "event": {...}} if 'event' in event and isinstance(event['event'], dict): # Извлекаем вложенное событие actual_event = event['event'] logger.info(f"📦 Unwrapped n8n Redis format for {task_id}") else: # Формат уже плоский (от backend API или старых источников) actual_event = event # Отправляем событие клиенту (плоский формат) event_json = json.dumps(actual_event, ensure_ascii=False) logger.info(f"📤 Sending event to client: {actual_event.get('status', 'unknown')}") yield f"data: {event_json}\n\n" # Если обработка завершена - закрываем соединение if actual_event.get('status') in ['completed', 'error', 'success']: logger.info(f"✅ Task {task_id} finished, closing SSE") break else: logger.info(f"⏰ Timeout waiting for message on {channel}") # Пинг каждые 30 сек чтобы соединение не закрылось await asyncio.sleep(0.1) except asyncio.CancelledError: logger.info(f"❌ Client disconnected from {channel}") finally: await pubsub.unsubscribe(channel) await pubsub.close() return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Отключаем буферизацию nginx } )