""" Documents API Routes - Загрузка и обработка документов Новый флоу: поэкранная загрузка документов """ from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Request from typing import Optional, List import httpx import json import uuid import hashlib from datetime import datetime import logging from ..services.redis_service import redis_service from ..services.database import db from ..config import settings router = APIRouter(prefix="/api/v1/documents", tags=["Documents"]) logger = logging.getLogger(__name__) # n8n webhook для загрузки документов N8N_DOCUMENT_UPLOAD_WEBHOOK = "https://n8n.clientright.pro/webhook/webform_document_upload" def get_client_ip(request: Request) -> str: """Получить реальный IP клиента (с учётом proxy заголовков)""" # Сначала проверяем заголовки от reverse proxy forwarded_for = request.headers.get("x-forwarded-for", "").split(",")[0].strip() real_ip = request.headers.get("x-real-ip", "").strip() # X-Forwarded-For имеет приоритет if forwarded_for and forwarded_for not in ("127.0.0.1", "192.168.0.1", "::1"): return forwarded_for if real_ip and real_ip not in ("127.0.0.1", "192.168.0.1", "::1"): return real_ip # Fallback на request.client return request.client.host if request.client else "unknown" @router.post("/upload") async def upload_document( request: Request, file: UploadFile = File(...), claim_id: str = Form(...), session_id: str = Form(...), document_type: str = Form(...), document_name: Optional[str] = Form(None), document_description: Optional[str] = Form(None), group_index: Optional[str] = Form(None), unified_id: Optional[str] = Form(None), contact_id: Optional[str] = Form(None), phone: Optional[str] = Form(None), ): """ Загрузка одного документа. Принимает файл и метаданные, отправляет в n8n для: 1. Сохранения в S3 2. OCR обработки 3. Обновления черновика в PostgreSQL После успешной обработки n8n публикует событие document_ocr_completed в Redis. """ try: # Генерируем уникальный ID файла file_id = f"doc_{uuid.uuid4().hex[:12]}" logger.info( "📤 Document upload received", extra={ "claim_id": claim_id, "session_id": session_id, "document_type": document_type, "file_name": file.filename, "file_size": file.size if hasattr(file, 'size') else 'unknown', "content_type": file.content_type, }, ) # Читаем содержимое файла file_content = await file.read() file_size = len(file_content) # Получаем IP клиента client_ip = get_client_ip(request) # Формируем данные в формате совместимом с существующим n8n воркфлоу form_data = { # Основные идентификаторы "form_id": "ticket_form", "stage": "document_upload", "session_id": session_id, "claim_id": claim_id, "client_ip": client_ip, # Идентификаторы пользователя "unified_id": unified_id or "", "contact_id": contact_id or "", "phone": phone or "", # Информация о документе "document_type": document_type, "file_id": file_id, "original_filename": file.filename, "content_type": file.content_type or "application/octet-stream", "file_size": str(file_size), "upload_timestamp": datetime.utcnow().isoformat(), # Формат uploads_* для совместимости # ✅ Используем group_index для правильной индексации (по умолчанию 0) "uploads_field_names[{idx}]".format(idx=group_index or "0"): document_type, "uploads_field_labels[{idx}]".format(idx=group_index or "0"): document_name or document_type, "uploads_descriptions[{idx}]".format(idx=group_index or "0"): document_description or "", } # ✅ Добавляем group_index в данные формы if group_index: form_data["group_index"] = group_index logger.info(f"📋 group_index передан в n8n: {group_index}") # Файл для multipart (ключ uploads[group_index] для совместимости) idx = group_index or "0" files = { f"uploads[{idx}]": (file.filename, file_content, file.content_type or "application/octet-stream") } # Отправляем в n8n async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( N8N_DOCUMENT_UPLOAD_WEBHOOK, data=form_data, files=files, ) response_text = response.text or "" if response.status_code == 200: logger.info( "✅ Document uploaded to n8n", extra={ "claim_id": claim_id, "document_type": document_type, "file_id": file_id, "response_preview": response_text[:200], }, ) # Парсим ответ от n8n try: n8n_response = json.loads(response_text) except json.JSONDecodeError: n8n_response = {"raw": response_text} # Публикуем событие в Redis для фронтенда event_data = { "event_type": "document_uploaded", "status": "processing", "claim_id": claim_id, "session_id": session_id, "document_type": document_type, "file_id": file_id, "original_filename": file.filename, "timestamp": datetime.utcnow().isoformat(), } await redis_service.publish( f"ocr_events:{session_id}", json.dumps(event_data, ensure_ascii=False) ) return { "success": True, "file_id": file_id, "document_type": document_type, "ocr_status": "processing", "message": "Документ загружен и отправлен на обработку", "n8n_response": n8n_response, } else: logger.error( "❌ n8n document upload error", extra={ "status_code": response.status_code, "body": response_text[:500], }, ) raise HTTPException( status_code=response.status_code, detail=f"Ошибка n8n: {response_text}", ) except httpx.TimeoutException: logger.error("⏱️ n8n document upload timeout") raise HTTPException(status_code=504, detail="Таймаут загрузки документа") except HTTPException: raise except Exception as e: logger.exception("❌ Document upload error") raise HTTPException( status_code=500, detail=f"Ошибка загрузки документа: {str(e)}", ) @router.post("/upload-multiple") async def upload_multiple_documents( request: Request, files: List[UploadFile] = File(...), claim_id: str = Form(...), session_id: str = Form(...), document_type: str = Form(...), document_name: Optional[str] = Form(None), document_description: Optional[str] = Form(None), unified_id: Optional[str] = Form(None), contact_id: Optional[str] = Form(None), phone: Optional[str] = Form(None), ): """ Загрузка нескольких файлов для одного документа (например, несколько страниц паспорта). Все файлы отправляются одним запросом в n8n. """ try: logger.info( "📤 Multiple documents upload received", extra={ "claim_id": claim_id, "session_id": session_id, "document_type": document_type, "files_count": len(files), "file_names": [f.filename for f in files], }, ) # Получаем IP клиента client_ip = get_client_ip(request) # Генерируем ID для каждого файла и читаем контент file_ids = [] files_multipart = {} for i, file in enumerate(files): file_id = f"doc_{uuid.uuid4().hex[:12]}" file_ids.append(file_id) file_content = await file.read() files_multipart[f"uploads[{i}]"] = ( file.filename, file_content, file.content_type or "application/octet-stream" ) # Формируем данные формы form_data = { # Основные идентификаторы "form_id": "ticket_form", "stage": "document_upload", "session_id": session_id, "claim_id": claim_id, "client_ip": client_ip, # Идентификаторы пользователя "unified_id": unified_id or "", "contact_id": contact_id or "", "phone": phone or "", # Информация о документе "document_type": document_type, "files_count": str(len(files)), "upload_timestamp": datetime.utcnow().isoformat(), } # ✅ Получаем group_index из Form (индекс документа в documents_required) form_params = await request.form() group_index = form_params.get("group_index") if group_index: form_data["group_index"] = group_index logger.info(f"📋 group_index передан в n8n: {group_index}") # Добавляем информацию о каждом файле for i, (file, file_id) in enumerate(zip(files, file_ids)): form_data[f"file_ids[{i}]"] = file_id form_data[f"uploads_field_names[{i}]"] = document_type form_data[f"uploads_field_labels[{i}]"] = document_name or document_type form_data[f"uploads_descriptions[{i}]"] = document_description or "" form_data[f"original_filenames[{i}]"] = file.filename # Отправляем в n8n одним запросом async with httpx.AsyncClient(timeout=180.0) as client: response = await client.post( N8N_DOCUMENT_UPLOAD_WEBHOOK, data=form_data, files=files_multipart, ) response_text = response.text or "" if response.status_code == 200: logger.info( "✅ Multiple documents uploaded to n8n", extra={ "claim_id": claim_id, "document_type": document_type, "file_ids": file_ids, "files_count": len(files), }, ) # Парсим ответ от n8n try: n8n_response = json.loads(response_text) except json.JSONDecodeError: n8n_response = {"raw": response_text} # Публикуем событие в Redis event_data = { "event_type": "documents_uploaded", "status": "processing", "claim_id": claim_id, "session_id": session_id, "document_type": document_type, "file_ids": file_ids, "files_count": len(files), "original_filenames": [f.filename for f in files], "timestamp": datetime.utcnow().isoformat(), } await redis_service.publish( f"ocr_events:{session_id}", json.dumps(event_data, ensure_ascii=False) ) return { "success": True, "file_ids": file_ids, "files_count": len(files), "document_type": document_type, "ocr_status": "processing", "message": f"Загружено {len(files)} файл(ов)", "n8n_response": n8n_response, } else: logger.error( "❌ n8n multiple upload error", extra={ "status_code": response.status_code, "body": response_text[:500], }, ) raise HTTPException( status_code=response.status_code, detail=f"Ошибка n8n: {response_text}", ) except httpx.TimeoutException: logger.error("⏱️ n8n multiple upload timeout") raise HTTPException(status_code=504, detail="Таймаут загрузки документов") except HTTPException: raise except Exception as e: logger.exception("❌ Multiple upload error") raise HTTPException( status_code=500, detail=f"Ошибка загрузки документов: {str(e)}", ) @router.get("/status/{claim_id}") async def get_documents_status(claim_id: str): """ Получить статус обработки документов для заявки. Возвращает: - Список загруженных документов и их OCR статус - Общий прогресс обработки """ try: # TODO: Запрос в PostgreSQL для получения статуса документов # Пока возвращаем mock данные return { "success": True, "claim_id": claim_id, "documents": [], "ocr_progress": { "total": 0, "completed": 0, "processing": 0, "failed": 0, }, "wizard_ready": False, "claim_ready": False, } except Exception as e: logger.exception("❌ Error getting documents status") raise HTTPException( status_code=500, detail=f"Ошибка получения статуса: {str(e)}", ) async def skip_document( request: Request, claim_id: str = Form(...), session_id: str = Form(...), document_type: str = Form(...), document_name: Optional[str] = Form(None), group_index: Optional[str] = Form(None), unified_id: Optional[str] = Form(None), contact_id: Optional[str] = Form(None), phone: Optional[str] = Form(None), ): """ Пропуск документа (пользователь указал, что документа нет). Отправляет событие в n8n на тот же webhook, что и загрузка файлов, но с флагом skipped=true для обработки пропуска. """ try: logger.info( "⏭️ Document skip received", extra={ "claim_id": claim_id, "session_id": session_id, "document_type": document_type, "group_index": group_index, }, ) # Получаем IP клиента client_ip = get_client_ip(request) # Формируем данные в формате совместимом с существующим n8n воркфлоу form_data = { # Основные идентификаторы "form_id": "ticket_form", "stage": "document_skip", "session_id": session_id, "claim_id": claim_id, "client_ip": client_ip, # Идентификаторы пользователя "unified_id": unified_id or "", "contact_id": contact_id or "", "phone": phone or "", # Информация о документе "document_type": document_type, "document_name": document_name or document_type, "skipped": "true", # ✅ Флаг пропуска документа "action": "skip", # ✅ Действие: пропуск "skip_timestamp": datetime.utcnow().isoformat(), # Формат uploads_* для совместимости (без файлов) # ✅ Используем group_index для правильной индексации (по умолчанию 0) "uploads_field_names[{idx}]".format(idx=group_index or "0"): document_type, "uploads_field_labels[{idx}]".format(idx=group_index or "0"): document_name or document_type, "uploads_descriptions[{idx}]".format(idx=group_index or "0"): "", "files_count": "0", # ✅ Нет файлов } # ✅ Добавляем group_index в данные формы if group_index: form_data["group_index"] = group_index logger.info(f"📋 group_index передан в n8n: {group_index}") # Отправляем в n8n на тот же webhook (без файлов) async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( N8N_DOCUMENT_UPLOAD_WEBHOOK, data=form_data, ) response_text = response.text or "" if response.status_code == 200: logger.info( "✅ Document skip sent to n8n", extra={ "claim_id": claim_id, "document_type": document_type, "response_preview": response_text[:200], }, ) # Парсим ответ от n8n try: n8n_response = json.loads(response_text) except json.JSONDecodeError: n8n_response = {"raw": response_text} # Публикуем событие в Redis для фронтенда event_data = { "event_type": "document_skipped", "status": "skipped", "claim_id": claim_id, "session_id": session_id, "document_type": document_type, "document_name": document_name or document_type, "timestamp": datetime.utcnow().isoformat(), } await redis_service.publish( f"ocr_events:{session_id}", json.dumps(event_data, ensure_ascii=False) ) return { "success": True, "document_type": document_type, "status": "skipped", "message": "Документ пропущен и сохранён", "n8n_response": n8n_response, } else: logger.error( "❌ n8n document skip error", extra={ "status_code": response.status_code, "body": response_text[:500], }, ) raise HTTPException( status_code=response.status_code, detail=f"Ошибка n8n: {response_text}", ) except httpx.TimeoutException: logger.error("⏱️ n8n document skip timeout") raise HTTPException(status_code=504, detail="Таймаут отправки пропуска документа") except HTTPException: raise except Exception as e: logger.exception("❌ Document skip error") raise HTTPException(status_code=500, detail=f"Ошибка пропуска документа: {str(e)}") @router.post("/generate-list") async def generate_documents_list(request: Request): """ Запрос на генерацию списка документов для проблемы. Принимает описание проблемы, отправляет в n8n для быстрого AI-анализа. n8n публикует результат в Redis канал ocr_events:{session_id} с event_type=documents_list_ready. """ try: body = await request.json() session_id = body.get("session_id") problem_description = body.get("problem_description") if not session_id or not problem_description: raise HTTPException( status_code=400, detail="session_id и problem_description обязательны", ) logger.info( "📝 Generate documents list request", extra={ "session_id": session_id, "description_length": len(problem_description), }, ) # Публикуем событие в Redis для n8n event_data = { "type": "generate_documents_list", "session_id": session_id, "claim_id": body.get("claim_id"), "unified_id": body.get("unified_id"), "phone": body.get("phone"), "problem_description": problem_description, "timestamp": datetime.utcnow().isoformat(), } channel = f"{settings.redis_prefix}documents_list" subscribers = await redis_service.publish( channel, json.dumps(event_data, ensure_ascii=False) ) logger.info( "✅ Documents list request published", extra={ "channel": channel, "subscribers": subscribers, }, ) return { "success": True, "message": "Запрос на генерацию списка документов отправлен", "channel": channel, } except HTTPException: raise except Exception as e: logger.exception("❌ Error generating documents list") raise HTTPException( status_code=500, detail=f"Ошибка генерации списка: {str(e)}", ) def compute_documents_hash(doc_ids: List[str]) -> str: """ Вычисляет hash от списка document_id для проверки актуальности черновика. Должен совпадать с JS алгоритмом в n8n build_form_draft. """ import ctypes sorted_ids = sorted([d for d in doc_ids if d]) hash_input = ','.join(sorted_ids) # djb2 hash — эмуляция JS поведения # В JS: (hash << 5) возвращает 32-битный signed int hash_val = 5381 for char in hash_input: # ctypes.c_int32 эмулирует JS 32-битный signed int при сдвиге shifted = ctypes.c_int32(hash_val << 5).value hash_val = shifted + hash_val + ord(char) # В JS: Math.abs(hash).toString(16).padStart(8, '0') return format(abs(hash_val), 'x').zfill(8) @router.post("/check-ocr-status") async def check_ocr_status(request: Request): """ Проверка статуса OCR обработки документов. Вызывается при нажатии "Продолжить" после загрузки документов. Логика: 1. Проверяем наличие form_draft в payload 2. Если черновик есть и documents_hash совпадает — возвращаем его 3. Если черновика нет или он устарел — запускаем RAG workflow """ try: body = await request.json() claim_id = body.get("claim_id") session_id = body.get("session_id") force_refresh = body.get("force_refresh", False) # Принудительное обновление if not claim_id or not session_id: raise HTTPException( status_code=400, detail="claim_id и session_id обязательны", ) logger.info( "🔍 Check OCR status request", extra={ "claim_id": claim_id, "session_id": session_id, "force_refresh": force_refresh, }, ) # ===================================================== # ШАГ 1: Проверяем наличие черновика в БД # ===================================================== if not force_refresh: try: # Получаем form_draft и список документов claim_data = await db.fetch_one(""" SELECT c.payload->'form_draft' AS form_draft, ( SELECT array_agg(cd.id::text ORDER BY cd.id) FROM clpr_claim_documents cd WHERE cd.claim_id::uuid = c.id ) AS document_ids FROM clpr_claims c WHERE c.id = $1::uuid """, claim_id) if claim_data and claim_data.get('form_draft'): form_draft = claim_data['form_draft'] # Если form_draft — строка, парсим JSON if isinstance(form_draft, str): form_draft = json.loads(form_draft) saved_hash = form_draft.get('documents_hash', '') document_ids = claim_data.get('document_ids') or [] current_hash = compute_documents_hash(document_ids) logger.info( "📋 Draft check", extra={ "saved_hash": saved_hash, "current_hash": current_hash, "docs_count": len(document_ids), }, ) # ✅ Черновик актуален — возвращаем его! if saved_hash == current_hash: logger.info( "✅ Using cached form_draft", extra={ "claim_id": claim_id, "hash": saved_hash, }, ) # Публикуем событие что данные готовы event_data = { "event_type": "form_draft_ready", "status": "ready", "message": "Черновик формы готов", "claim_id": claim_id, "session_id": session_id, "form_draft": form_draft, "from_cache": True, "timestamp": datetime.utcnow().isoformat(), } await redis_service.publish( f"ocr_events:{session_id}", json.dumps(event_data, ensure_ascii=False) ) return { "success": True, "status": "ready", "message": "Черновик формы готов (из кэша)", "from_cache": True, "form_draft": form_draft, "listen_channel": f"ocr_events:{session_id}", } else: logger.info( "🔄 Draft outdated, running RAG", extra={ "reason": "documents_hash mismatch", "saved_hash": saved_hash, "current_hash": current_hash, }, ) except Exception as e: logger.warning(f"⚠️ Draft check failed: {e}, proceeding with RAG") # ===================================================== # ШАГ 2: Черновика нет или устарел — запускаем RAG # ===================================================== event_data = { "claim_id": claim_id, "session_token": session_id, "timestamp": datetime.utcnow().isoformat(), } channel = "clpr:check:ocr_status" subscribers = await redis_service.publish( channel, json.dumps(event_data, ensure_ascii=False) ) logger.info( "✅ OCR status check published (running RAG)", extra={ "channel": channel, "subscribers": subscribers, "claim_id": claim_id, }, ) return { "success": True, "status": "processing", "message": "Запрос на обработку документов отправлен", "from_cache": False, "channel": channel, "listen_channel": f"ocr_events:{session_id}", } except HTTPException: raise except Exception as e: logger.exception("❌ Error checking OCR status") raise HTTPException( status_code=500, detail=f"Ошибка проверки статуса: {str(e)}", ) router.add_api_route("/skip", skip_document, methods=["POST"], tags=["Documents"])