Files
aiform_dev/backend/app/api/events.py
AI Assistant 3621ae6021 feat: Session persistence with Redis + Draft management fixes
- Implement session management API (/api/v1/session/create, verify, logout)
- Add session restoration from localStorage on page reload
- Fix session_id priority when loading drafts (use current, not old from DB)
- Add unified_id and claim_id to wizard payload sent to n8n
- Add Docker volume for frontend HMR (Hot Module Replacement)
- Add comprehensive session logging for debugging

Components updated:
- backend/app/api/session.py (NEW) - Session management endpoints
- backend/app/main.py - Include session router
- frontend/src/components/form/Step1Phone.tsx v2.0 - Create session after SMS
- frontend/src/pages/ClaimForm.tsx v3.8 - Session restoration & priority fix
- frontend/src/components/form/StepWizardPlan.tsx v1.4 - Add unified_id/claim_id
- docker-compose.yml - Add frontend volume for live reload

Session flow:
1. User verifies phone -> session created in Redis (24h TTL)
2. session_token saved to localStorage
3. Page reload -> session restored automatically
4. Draft selected -> current session_id used (not old from DB)
5. Wizard submit -> unified_id, claim_id, session_id sent to n8n
6. Logout -> session removed from Redis & localStorage

Fixes:
- Session token not persisting after page reload
- unified_id missing in n8n webhook payload
- Old session_id from draft overwriting current session
- Frontend changes requiring container rebuild
2025-11-20 18:31:42 +03:00

241 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub
"""
import asyncio
import json
from fastapi import APIRouter, Body
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict, Any
from app.services.redis_service import redis_service
from app.services.database import db
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
class EventPublish(BaseModel):
"""Модель для публикации события"""
event_type: str = "ocr_completed"
status: str
message: str
data: Dict[str, Any] = {}
timestamp: str = None
@router.post("/events/{task_id}")
async def publish_event(task_id: str, event: EventPublish):
"""
Публикация события в Redis канал
Используется n8n для отправки событий (OCR, AI, wizard и т.д.)
Args:
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
Используется для формирования канала ocr_events:{session_token}
event: Данные события
Returns:
Статус публикации
"""
try:
# task_id на самом деле это session_token
channel = f"ocr_events:{task_id}"
event_data = {
"event_type": event.event_type,
"status": event.status,
"message": event.message,
"data": event.data,
"timestamp": event.timestamp
}
# Публикуем в Redis
event_json = json.dumps(event_data, ensure_ascii=False)
await redis_service.publish(channel, event_json)
logger.info(f"📢 Event published to {channel}: {event.status}")
return {
"success": True,
"channel": channel,
"event": event_data
}
except Exception as e:
logger.error(f"❌ Failed to publish event: {e}")
return {
"success": False,
"error": str(e)
}
@router.get("/events/{task_id}")
async def stream_events(task_id: str):
"""
SSE стрим событий обработки OCR, AI, wizard и т.д.
Args:
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
Используется для формирования канала ocr_events:{session_token}
Фронтенд подключается через EventSource к этому эндпоинту
Returns:
StreamingResponse с событиями
"""
logger.info(f"🚀 SSE connection requested for session_token: {task_id}")
async def event_generator():
"""Генератор событий из Redis Pub/Sub"""
# task_id на самом деле это session_token
channel = f"ocr_events:{task_id}"
# Подписываемся на канал Redis
pubsub = redis_service.client.pubsub()
await pubsub.subscribe(channel)
logger.info(f"📡 Client subscribed to {channel}")
# Отправляем начальное событие
yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n"
try:
# Слушаем события
while True:
logger.info(f"⏳ Waiting for message on {channel}...")
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=60.0) # Увеличено для RAG обработки
if message:
logger.info(f"📥 Received message type: {message['type']}")
if message['type'] == 'message':
event_data = message['data'] # Уже строка (decode_responses=True)
logger.info(f"📦 Raw event data: {event_data[:200]}...")
event = json.loads(event_data)
# Обработка формата от n8n Redis ноды (вложенный)
# Формат: {"claim_id": "...", "event": {...}}
if 'event' in event and isinstance(event['event'], dict):
# Извлекаем вложенное событие
actual_event = event['event']
logger.info(f"📦 Unwrapped n8n Redis format for {task_id}")
else:
# Формат уже плоский (от backend API или старых источников)
actual_event = event
# ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type
# Это значит, что n8n пушит минимальный payload для wizard_ready
logger.info(f"🔍 Checking event: has event_type={bool(actual_event.get('event_type'))}, has claim_id={bool(actual_event.get('claim_id'))}")
if not actual_event.get('event_type') and actual_event.get('claim_id'):
logger.info(f"📦 Detected minimal wizard payload (no event_type), wrapping for claim_id={actual_event.get('claim_id')}")
# Обёртываем в правильный формат
actual_event = {
'event_type': 'wizard_ready',
'status': 'ready',
'message': 'Wizard plan готов',
'data': actual_event, # Весь объект становится data
'timestamp': actual_event.get('timestamp') or None
}
logger.info(f"✅ Wrapped minimal payload into wizard_ready event")
# Обработка события wizard_ready: загружаем данные из PostgreSQL
if actual_event.get('event_type') == 'wizard_ready' and actual_event.get('data', {}).get('claim_id'):
claim_id = actual_event['data']['claim_id']
logger.info(f"🔍 Wizard ready event received, loading data for claim_id={claim_id}")
try:
# Загружаем данные из PostgreSQL
query = """
SELECT
id,
payload->>'claim_id' as claim_id,
session_token,
unified_id,
status_code,
channel,
payload,
created_at,
updated_at
FROM clpr_claims
WHERE (payload->>'claim_id' = $1 OR id::text = $1)
LIMIT 1
"""
row = await db.fetch_one(query, claim_id)
if row:
# Обрабатываем payload - может быть строкой (JSONB) или уже dict
payload_raw = row.get('payload')
if isinstance(payload_raw, str):
try:
payload = json.loads(payload_raw) if payload_raw else {}
except (json.JSONDecodeError, TypeError):
payload = {}
elif isinstance(payload_raw, dict):
payload = payload_raw
else:
payload = {}
# Извлекаем claim_id из payload, если его нет в row
claim_id_from_payload = payload.get('claim_id') if isinstance(payload, dict) else None
final_claim_id = row.get('claim_id') or claim_id_from_payload or str(row['id'])
# Обогащаем событие полными данными из PostgreSQL
# Добавляем данные и в data, и в корень для совместимости с фронтендом
actual_event['data'] = {
**actual_event.get('data', {}),
'wizard_plan': payload.get('wizard_plan'),
'problem_description': payload.get('problem_description'),
'wizard_answers': payload.get('answers'),
'answers_prefill': payload.get('answers_prefill'),
'documents_meta': payload.get('documents_meta', []),
'ai_agent1_facts': payload.get('ai_agent1_facts'),
'ai_agent13_rag': payload.get('ai_agent13_rag'),
'coverage_report': payload.get('coverage_report'),
'phone': payload.get('phone'),
'email': payload.get('email'),
}
# Также добавляем wizard_plan в корень для совместимости с фронтендом
actual_event['wizard_plan'] = payload.get('wizard_plan')
actual_event['answers_prefill'] = payload.get('answers_prefill')
actual_event['coverage_report'] = payload.get('coverage_report')
logger.info(f"✅ Wizard data loaded from PostgreSQL for claim_id={final_claim_id}, has_wizard_plan={payload.get('wizard_plan') is not None}")
else:
logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}")
except Exception as e:
logger.error(f"❌ Error loading wizard data from PostgreSQL: {e}")
# Отправляем событие клиенту (плоский формат)
event_json = json.dumps(actual_event, ensure_ascii=False)
logger.info(f"📤 Sending event to client: {actual_event.get('status', 'unknown')}")
yield f"data: {event_json}\n\n"
# Если обработка завершена - закрываем соединение
if actual_event.get('status') in ['completed', 'error', 'success']:
logger.info(f"✅ Task {task_id} finished, closing SSE")
break
else:
logger.info(f"⏰ Timeout waiting for message on {channel}")
# Пинг каждые 30 сек чтобы соединение не закрылось
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info(f"❌ Client disconnected from {channel}")
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключаем буферизацию nginx
}
)