✅ Интеграция SSE + Redis Pub/Sub для real-time OCR результатов
🎯 Основные изменения: Backend: - Реализован SSE endpoint /events/{task_id} для real-time стриминга событий - Интеграция Redis Pub/Sub для получения событий от n8n - Исправлен путь к .env файлу (абсолютный путь) - Убран префикс /api/v1 для events router - Добавлено подробное логирование событий Frontend: - Переключён на Vite dev mode для работы proxy - Настроен proxy /events -> backend:8100 - Реализована модалка с крутилкой при загрузке файла - SSE клиент для получения OCR результатов в real-time - Отображение результатов AI анализа в модалке Docker: - Frontend: изменён на npm run dev (Vite dev server) - Добавлен host.docker.internal для доступа к backend - Настроен proxy в docker-compose Утилиты: - monitor_redis_direct.py - мониторинг Redis Pub/Sub - test_redis_publish_direct.py - тестирование публикации в Redis 🚀 Полная цепочка работает: Frontend → Backend SSE → Redis Pub/Sub ← n8n → OCR/AI → Result
This commit is contained in:
@@ -79,13 +79,14 @@ async def stream_events(task_id: str):
|
||||
Returns:
|
||||
StreamingResponse с событиями
|
||||
"""
|
||||
logger.info(f"🚀 SSE connection requested for task_id: {task_id}")
|
||||
|
||||
async def event_generator():
|
||||
"""Генератор событий из Redis Pub/Sub"""
|
||||
channel = f"ocr_events:{task_id}"
|
||||
|
||||
# Подписываемся на канал Redis
|
||||
pubsub = redis_service.redis.pubsub()
|
||||
pubsub = redis_service.client.pubsub()
|
||||
await pubsub.subscribe(channel)
|
||||
|
||||
logger.info(f"📡 Client subscribed to {channel}")
|
||||
@@ -96,19 +97,37 @@ async def stream_events(task_id: str):
|
||||
try:
|
||||
# Слушаем события
|
||||
while True:
|
||||
logger.info(f"⏳ Waiting for message on {channel}...")
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=30.0)
|
||||
|
||||
if message and message['type'] == 'message':
|
||||
event_data = message['data'].decode('utf-8')
|
||||
|
||||
# Отправляем событие клиенту
|
||||
yield f"data: {event_data}\n\n"
|
||||
|
||||
# Если обработка завершена - закрываем соединение
|
||||
event = json.loads(event_data)
|
||||
if event.get('status') in ['completed', 'error']:
|
||||
logger.info(f"✅ Task {task_id} finished, closing SSE")
|
||||
break
|
||||
if message:
|
||||
logger.info(f"📥 Received message type: {message['type']}")
|
||||
if message['type'] == 'message':
|
||||
event_data = message['data'] # Уже строка (decode_responses=True)
|
||||
logger.info(f"📦 Raw event data: {event_data[:200]}...")
|
||||
event = json.loads(event_data)
|
||||
|
||||
# Обработка формата от n8n Redis ноды (вложенный)
|
||||
# Формат: {"claim_id": "...", "event": {...}}
|
||||
if 'event' in event and isinstance(event['event'], dict):
|
||||
# Извлекаем вложенное событие
|
||||
actual_event = event['event']
|
||||
logger.info(f"📦 Unwrapped n8n Redis format for {task_id}")
|
||||
else:
|
||||
# Формат уже плоский (от backend API или старых источников)
|
||||
actual_event = event
|
||||
|
||||
# Отправляем событие клиенту (плоский формат)
|
||||
event_json = json.dumps(actual_event, ensure_ascii=False)
|
||||
logger.info(f"📤 Sending event to client: {actual_event.get('status', 'unknown')}")
|
||||
yield f"data: {event_json}\n\n"
|
||||
|
||||
# Если обработка завершена - закрываем соединение
|
||||
if actual_event.get('status') in ['completed', 'error', 'success']:
|
||||
logger.info(f"✅ Task {task_id} finished, closing SSE")
|
||||
break
|
||||
else:
|
||||
logger.info(f"⏰ Timeout waiting for message on {channel}")
|
||||
|
||||
# Пинг каждые 30 сек чтобы соединение не закрылось
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
@@ -163,7 +163,7 @@ class Settings(BaseSettings):
|
||||
log_file: str = "/app/logs/erv_platform.log"
|
||||
|
||||
class Config:
|
||||
env_file = "../.env"
|
||||
env_file = "/var/www/fastuser/data/www/crm.clientright.ru/erv_platform/.env"
|
||||
case_sensitive = False
|
||||
extra = "ignore" # Игнорируем лишние поля из .env
|
||||
|
||||
|
||||
@@ -98,7 +98,7 @@ app.include_router(claims.router)
|
||||
app.include_router(policy.router)
|
||||
app.include_router(upload.router)
|
||||
app.include_router(draft.router)
|
||||
app.include_router(events.router, prefix="/api/v1")
|
||||
app.include_router(events.router)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
|
||||
Reference in New Issue
Block a user