feat: Add claim plan confirmation flow via Redis SSE
Problem:
- After wizard form submission, need to wait for claim data from n8n
- Claim data comes via Redis channel claim:plan:{session_token}
- Need to display confirmation form with claim data
Solution:
1. Backend: Added SSE endpoint /api/v1/claim-plan/{session_token}
- Subscribes to Redis channel claim:plan:{session_token}
- Streams claim data from n8n to frontend
- Handles timeouts and errors gracefully
2. Frontend: Added subscription to claim:plan channel
- StepWizardPlan: After form submission, subscribes to SSE
- Waits for claim_plan_ready event
- Shows loading message while waiting
- On success: saves claimPlanData and shows confirmation step
3. New component: StepClaimConfirmation
- Displays claim confirmation form in iframe
- Receives claimPlanData from parent
- Generates HTML form (placeholder - should call n8n for real HTML)
- Handles confirmation/cancellation via postMessage
4. ClaimForm: Added conditional step for confirmation
- Shows StepClaimConfirmation when showClaimConfirmation=true
- Step appears after StepWizardPlan
- Only visible when claimPlanData is available
Flow:
1. User fills wizard form → submits
2. Form data sent to n8n via /api/v1/claims/wizard
3. Frontend subscribes to SSE /api/v1/claim-plan/{session_token}
4. n8n processes data → publishes to Redis claim:plan:{session_token}
5. Backend receives → streams to frontend via SSE
6. Frontend receives → shows StepClaimConfirmation
7. User confirms → proceeds to next step
Files:
- backend/app/api/events.py: Added stream_claim_plan endpoint
- frontend/src/components/form/StepWizardPlan.tsx: Added subscribeToClaimPlan
- frontend/src/components/form/StepClaimConfirmation.tsx: New component
- frontend/src/pages/ClaimForm.tsx: Added confirmation step to steps array
This commit is contained in:
@@ -238,3 +238,129 @@ async def stream_events(task_id: str):
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/claim-plan/{session_token}")
|
||||
async def stream_claim_plan(session_token: str):
|
||||
"""
|
||||
SSE стрим для получения данных заявления из канала claim:plan:{session_token}
|
||||
|
||||
Используется после отправки формы визарда для получения данных заявления
|
||||
от n8n workflow, которые затем отображаются в форме подтверждения.
|
||||
|
||||
Args:
|
||||
session_token: Session token (например, sess_c9e7c0c2-de2e-40cd-ab7c-3bdc40282d34)
|
||||
Используется для формирования канала claim:plan:{session_token}
|
||||
|
||||
Returns:
|
||||
StreamingResponse с данными заявления в формате:
|
||||
{
|
||||
"event_type": "claim_plan_ready",
|
||||
"status": "ready",
|
||||
"data": {
|
||||
"propertyName": {...}, // Данные заявления из n8n
|
||||
...
|
||||
}
|
||||
}
|
||||
"""
|
||||
logger.info(f"🚀 Claim plan SSE connection requested for session_token: {session_token}")
|
||||
|
||||
async def claim_plan_generator():
|
||||
"""Генератор событий из Redis Pub/Sub для claim:plan канала"""
|
||||
channel = f"claim:plan:{session_token}"
|
||||
|
||||
# Подписываемся на канал Redis
|
||||
pubsub = redis_service.client.pubsub()
|
||||
await pubsub.subscribe(channel)
|
||||
|
||||
logger.info(f"📡 Client subscribed to {channel}")
|
||||
|
||||
# Отправляем начальное событие
|
||||
yield f"data: {json.dumps({'status': 'connected', 'message': 'Ожидание данных заявления...'})}\n\n"
|
||||
|
||||
try:
|
||||
# Слушаем события (таймаут 5 минут для обработки в n8n)
|
||||
while True:
|
||||
logger.info(f"⏳ Waiting for claim plan data on {channel}...")
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=300.0)
|
||||
|
||||
if message:
|
||||
logger.info(f"📥 Received claim plan message type: {message['type']}")
|
||||
if message['type'] == 'message':
|
||||
event_data_raw = message['data'] # Уже строка (decode_responses=True)
|
||||
logger.info(f"📦 Raw claim plan data length: {len(event_data_raw)}")
|
||||
|
||||
try:
|
||||
# Парсим данные от n8n
|
||||
claim_data = json.loads(event_data_raw)
|
||||
|
||||
# Формируем событие в стандартном формате
|
||||
event = {
|
||||
"event_type": "claim_plan_ready",
|
||||
"status": "ready",
|
||||
"message": "Данные заявления готовы",
|
||||
"data": claim_data, # Весь объект от n8n
|
||||
"timestamp": None
|
||||
}
|
||||
|
||||
logger.info(f"✅ Claim plan data received for session {session_token}")
|
||||
|
||||
# Отправляем событие клиенту
|
||||
event_json = json.dumps(event, ensure_ascii=False)
|
||||
yield f"data: {event_json}\n\n"
|
||||
|
||||
# После получения данных закрываем соединение
|
||||
logger.info(f"✅ Claim plan sent to client, closing SSE")
|
||||
break
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"❌ Failed to parse claim plan JSON: {e}")
|
||||
error_event = {
|
||||
"event_type": "claim_plan_error",
|
||||
"status": "error",
|
||||
"message": f"Ошибка парсинга данных: {str(e)}",
|
||||
"data": {},
|
||||
"timestamp": None
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
break
|
||||
else:
|
||||
logger.info(f"⏰ Timeout waiting for claim plan on {channel}")
|
||||
# Отправляем timeout событие
|
||||
timeout_event = {
|
||||
"event_type": "claim_plan_timeout",
|
||||
"status": "timeout",
|
||||
"message": "Превышено время ожидания данных заявления",
|
||||
"data": {},
|
||||
"timestamp": None
|
||||
}
|
||||
yield f"data: {json.dumps(timeout_event, ensure_ascii=False)}\n\n"
|
||||
break
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"❌ Client disconnected from {channel}")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error in claim plan stream: {e}")
|
||||
error_event = {
|
||||
"event_type": "claim_plan_error",
|
||||
"status": "error",
|
||||
"message": f"Ошибка получения данных: {str(e)}",
|
||||
"data": {},
|
||||
"timestamp": None
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
finally:
|
||||
await pubsub.unsubscribe(channel)
|
||||
await pubsub.close()
|
||||
|
||||
return StreamingResponse(
|
||||
claim_plan_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Отключаем буферизацию nginx
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user