fix: Исправление загрузки документов и SQL запросов
- Исправлена потеря документов при обновлении черновика (SQL объединяет вместо перезаписи) - Исправлено определение типа документа (приоритет field_label над field_name) - Исправлены дубликаты в documents_meta и documents_uploaded - Добавлена передача group_index с фронтенда для правильного field_name - Исправлены все документы в таблице clpr_claim_documents с правильными field_name - Обновлены SQL запросы: claimsave и claimsave_final для нового флоу - Добавлена поддержка multi-file upload для одного документа - Исправлены дубликаты в списке загруженных документов на фронтенде Файлы: - SQL: SQL_CLAIMSAVE_FIXED_NEW_FLOW.sql, SQL_CLAIMSAVE_FINAL_FIXED_NEW_FLOW_WITH_UPLOADED.sql - n8n: N8N_CODE_PROCESS_UPLOADED_FILES_FIXED.js (поддержка group_index) - Backend: documents.py (передача group_index в n8n) - Frontend: StepWizardPlan.tsx (передача group_index, исправление дубликатов) - Скрипты: fix_claim_documents_field_names.py, fix_documents_meta_duplicates.py Результат: документы больше не теряются, имеют правильные типы и field_name
This commit is contained in:
@@ -400,6 +400,12 @@ async def get_draft(claim_id: str):
|
||||
|
||||
logger.info(f"🔍 Загружен черновик: id={row.get('id')}, claim_id={final_claim_id}, channel={row.get('channel')}")
|
||||
|
||||
# 🔍 ОТЛАДКА: Логируем наличие documents_required
|
||||
documents_required = payload.get('documents_required', []) if isinstance(payload, dict) else []
|
||||
logger.info(f"🔍 Черновик {final_claim_id}: status_code={row.get('status_code')}, documents_required count={len(documents_required) if isinstance(documents_required, list) else 0}")
|
||||
if documents_required:
|
||||
logger.info(f"🔍 documents_required: {documents_required[:2]}...") # Первые 2 для примера
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"claim": {
|
||||
@@ -426,14 +432,13 @@ async def delete_draft(claim_id: str):
|
||||
"""
|
||||
Удалить черновик по claim_id
|
||||
|
||||
Удаляет только черновики (status_code = 'draft')
|
||||
Удаляет черновики с любым статусом (кроме submitted/completed)
|
||||
"""
|
||||
try:
|
||||
query = """
|
||||
DELETE FROM clpr_claims
|
||||
WHERE payload->>'claim_id' = $1
|
||||
AND status_code = 'draft'
|
||||
AND channel = 'web_form'
|
||||
WHERE (payload->>'claim_id' = $1 OR id::text = $1)
|
||||
AND status_code NOT IN ('submitted', 'completed', 'rejected')
|
||||
RETURNING id
|
||||
"""
|
||||
|
||||
@@ -688,6 +693,8 @@ async def publish_ticket_form_description(payload: TicketFormDescriptionRequest)
|
||||
"claim_id": payload.claim_id, # Опционально - может быть None
|
||||
"phone": payload.phone,
|
||||
"email": payload.email,
|
||||
"unified_id": payload.unified_id, # ✅ Unified ID пользователя
|
||||
"contact_id": payload.contact_id, # ✅ Contact ID пользователя
|
||||
"description": payload.problem_description.strip(),
|
||||
"source": payload.source,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
@@ -701,6 +708,8 @@ async def publish_ticket_form_description(payload: TicketFormDescriptionRequest)
|
||||
"session_id": payload.session_id,
|
||||
"claim_id": payload.claim_id or "not_set",
|
||||
"phone": payload.phone,
|
||||
"unified_id": payload.unified_id or "not_set",
|
||||
"contact_id": payload.contact_id or "not_set",
|
||||
"description_length": len(payload.problem_description),
|
||||
"channel": channel,
|
||||
},
|
||||
@@ -716,18 +725,25 @@ async def publish_ticket_form_description(payload: TicketFormDescriptionRequest)
|
||||
},
|
||||
)
|
||||
|
||||
await redis_service.publish(channel, event_json)
|
||||
subscribers_count = await redis_service.publish(channel, event_json)
|
||||
|
||||
logger.info(
|
||||
"✅ TicketForm description published to Redis",
|
||||
extra={
|
||||
"channel": channel,
|
||||
"session_id": payload.session_id,
|
||||
"subscribers_notified": True,
|
||||
"subscribers_count": subscribers_count,
|
||||
"event_json_preview": event_json[:500],
|
||||
},
|
||||
)
|
||||
|
||||
if subscribers_count == 0:
|
||||
logger.warning(
|
||||
f"⚠️ WARNING: No subscribers on channel {channel}! "
|
||||
f"n8n workflow is not listening to this channel. "
|
||||
f"Event was published but will be lost."
|
||||
)
|
||||
|
||||
# Дополнительная проверка: логируем полный event для отладки
|
||||
logger.debug(
|
||||
"🔍 Full event data published",
|
||||
|
||||
909
backend/app/api/documents.py
Normal file
909
backend/app/api/documents.py
Normal file
@@ -0,0 +1,909 @@
|
||||
"""
|
||||
Documents API Routes - Загрузка и обработка документов
|
||||
|
||||
Новый флоу: поэкранная загрузка документов
|
||||
"""
|
||||
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Request
|
||||
from typing import Optional, List
|
||||
import httpx
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from ..services.redis_service import redis_service
|
||||
from ..config import settings
|
||||
|
||||
router = APIRouter(prefix="/api/v1/documents", tags=["Documents"])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# n8n webhook для загрузки документов
|
||||
N8N_DOCUMENT_UPLOAD_WEBHOOK = "https://n8n.clientright.pro/webhook/webform_document_upload"
|
||||
|
||||
|
||||
@router.post("/upload")
|
||||
async def upload_document(
|
||||
request: Request,
|
||||
file: UploadFile = File(...),
|
||||
claim_id: str = Form(...),
|
||||
session_id: str = Form(...),
|
||||
document_type: str = Form(...),
|
||||
document_name: Optional[str] = Form(None),
|
||||
document_description: Optional[str] = Form(None),
|
||||
unified_id: Optional[str] = Form(None),
|
||||
contact_id: Optional[str] = Form(None),
|
||||
phone: Optional[str] = Form(None),
|
||||
):
|
||||
"""
|
||||
Загрузка одного документа.
|
||||
|
||||
Принимает файл и метаданные, отправляет в n8n для:
|
||||
1. Сохранения в S3
|
||||
2. OCR обработки
|
||||
3. Обновления черновика в PostgreSQL
|
||||
|
||||
После успешной обработки n8n публикует событие document_ocr_completed в Redis.
|
||||
"""
|
||||
try:
|
||||
# Генерируем уникальный ID файла
|
||||
file_id = f"doc_{uuid.uuid4().hex[:12]}"
|
||||
|
||||
logger.info(
|
||||
"📤 Document upload received",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"file_name": file.filename,
|
||||
"file_size": file.size if hasattr(file, 'size') else 'unknown',
|
||||
"content_type": file.content_type,
|
||||
},
|
||||
)
|
||||
|
||||
# Читаем содержимое файла
|
||||
file_content = await file.read()
|
||||
file_size = len(file_content)
|
||||
|
||||
# Получаем IP клиента
|
||||
client_ip = request.client.host if request.client else "unknown"
|
||||
forwarded_for = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
|
||||
if forwarded_for:
|
||||
client_ip = forwarded_for
|
||||
|
||||
# Формируем данные в формате совместимом с существующим n8n воркфлоу
|
||||
form_data = {
|
||||
# Основные идентификаторы
|
||||
"form_id": "ticket_form",
|
||||
"stage": "document_upload",
|
||||
"session_id": session_id,
|
||||
"claim_id": claim_id,
|
||||
"client_ip": client_ip,
|
||||
|
||||
# Идентификаторы пользователя
|
||||
"unified_id": unified_id or "",
|
||||
"contact_id": contact_id or "",
|
||||
"phone": phone or "",
|
||||
|
||||
# Информация о документе
|
||||
"document_type": document_type,
|
||||
"file_id": file_id,
|
||||
"original_filename": file.filename,
|
||||
"content_type": file.content_type or "application/octet-stream",
|
||||
"file_size": str(file_size),
|
||||
"upload_timestamp": datetime.utcnow().isoformat(),
|
||||
|
||||
# Формат uploads_* для совместимости
|
||||
"uploads_field_names[0]": document_type,
|
||||
"uploads_field_labels[0]": document_name or document_type,
|
||||
"uploads_descriptions[0]": document_description or "",
|
||||
}
|
||||
|
||||
# Файл для multipart (ключ uploads[0] для совместимости)
|
||||
files = {
|
||||
"uploads[0]": (file.filename, file_content, file.content_type or "application/octet-stream")
|
||||
}
|
||||
|
||||
# Отправляем в n8n
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
response = await client.post(
|
||||
N8N_DOCUMENT_UPLOAD_WEBHOOK,
|
||||
data=form_data,
|
||||
files=files,
|
||||
)
|
||||
|
||||
response_text = response.text or ""
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(
|
||||
"✅ Document uploaded to n8n",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"document_type": document_type,
|
||||
"file_id": file_id,
|
||||
"response_preview": response_text[:200],
|
||||
},
|
||||
)
|
||||
|
||||
# Парсим ответ от n8n
|
||||
try:
|
||||
n8n_response = json.loads(response_text)
|
||||
except json.JSONDecodeError:
|
||||
n8n_response = {"raw": response_text}
|
||||
|
||||
# Публикуем событие в Redis для фронтенда
|
||||
event_data = {
|
||||
"event_type": "document_uploaded",
|
||||
"status": "processing",
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"file_id": file_id,
|
||||
"original_filename": file.filename,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
await redis_service.publish(
|
||||
f"ocr_events:{session_id}",
|
||||
json.dumps(event_data, ensure_ascii=False)
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"file_id": file_id,
|
||||
"document_type": document_type,
|
||||
"ocr_status": "processing",
|
||||
"message": "Документ загружен и отправлен на обработку",
|
||||
"n8n_response": n8n_response,
|
||||
}
|
||||
|
||||
else:
|
||||
logger.error(
|
||||
"❌ n8n document upload error",
|
||||
extra={
|
||||
"status_code": response.status_code,
|
||||
"body": response_text[:500],
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Ошибка n8n: {response_text}",
|
||||
)
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.error("⏱️ n8n document upload timeout")
|
||||
raise HTTPException(status_code=504, detail="Таймаут загрузки документа")
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Document upload error")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка загрузки документа: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/upload-multiple")
|
||||
async def upload_multiple_documents(
|
||||
request: Request,
|
||||
files: List[UploadFile] = File(...),
|
||||
claim_id: str = Form(...),
|
||||
session_id: str = Form(...),
|
||||
document_type: str = Form(...),
|
||||
document_name: Optional[str] = Form(None),
|
||||
document_description: Optional[str] = Form(None),
|
||||
unified_id: Optional[str] = Form(None),
|
||||
contact_id: Optional[str] = Form(None),
|
||||
phone: Optional[str] = Form(None),
|
||||
):
|
||||
"""
|
||||
Загрузка нескольких файлов для одного документа (например, несколько страниц паспорта).
|
||||
Все файлы отправляются одним запросом в n8n.
|
||||
"""
|
||||
try:
|
||||
logger.info(
|
||||
"📤 Multiple documents upload received",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"files_count": len(files),
|
||||
"file_names": [f.filename for f in files],
|
||||
},
|
||||
)
|
||||
|
||||
# Получаем IP клиента
|
||||
client_ip = request.client.host if request.client else "unknown"
|
||||
forwarded_for = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
|
||||
if forwarded_for:
|
||||
client_ip = forwarded_for
|
||||
|
||||
# Генерируем ID для каждого файла и читаем контент
|
||||
file_ids = []
|
||||
files_multipart = {}
|
||||
|
||||
for i, file in enumerate(files):
|
||||
file_id = f"doc_{uuid.uuid4().hex[:12]}"
|
||||
file_ids.append(file_id)
|
||||
|
||||
file_content = await file.read()
|
||||
files_multipart[f"uploads[{i}]"] = (
|
||||
file.filename,
|
||||
file_content,
|
||||
file.content_type or "application/octet-stream"
|
||||
)
|
||||
|
||||
# Формируем данные формы
|
||||
form_data = {
|
||||
# Основные идентификаторы
|
||||
"form_id": "ticket_form",
|
||||
"stage": "document_upload",
|
||||
"session_id": session_id,
|
||||
"claim_id": claim_id,
|
||||
"client_ip": client_ip,
|
||||
|
||||
# Идентификаторы пользователя
|
||||
"unified_id": unified_id or "",
|
||||
"contact_id": contact_id or "",
|
||||
"phone": phone or "",
|
||||
|
||||
# Информация о документе
|
||||
"document_type": document_type,
|
||||
"files_count": str(len(files)),
|
||||
"upload_timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
# ✅ Получаем group_index из Form (индекс документа в documents_required)
|
||||
form_params = await request.form()
|
||||
group_index = form_params.get("group_index")
|
||||
if group_index:
|
||||
form_data["group_index"] = group_index
|
||||
logger.info(f"📋 group_index передан в n8n: {group_index}")
|
||||
|
||||
# Добавляем информацию о каждом файле
|
||||
for i, (file, file_id) in enumerate(zip(files, file_ids)):
|
||||
form_data[f"file_ids[{i}]"] = file_id
|
||||
form_data[f"uploads_field_names[{i}]"] = document_type
|
||||
form_data[f"uploads_field_labels[{i}]"] = document_name or document_type
|
||||
form_data[f"uploads_descriptions[{i}]"] = document_description or ""
|
||||
form_data[f"original_filenames[{i}]"] = file.filename
|
||||
|
||||
# Отправляем в n8n одним запросом
|
||||
async with httpx.AsyncClient(timeout=180.0) as client:
|
||||
response = await client.post(
|
||||
N8N_DOCUMENT_UPLOAD_WEBHOOK,
|
||||
data=form_data,
|
||||
files=files_multipart,
|
||||
)
|
||||
|
||||
response_text = response.text or ""
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(
|
||||
"✅ Multiple documents uploaded to n8n",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"document_type": document_type,
|
||||
"file_ids": file_ids,
|
||||
"files_count": len(files),
|
||||
},
|
||||
)
|
||||
|
||||
# Парсим ответ от n8n
|
||||
try:
|
||||
n8n_response = json.loads(response_text)
|
||||
except json.JSONDecodeError:
|
||||
n8n_response = {"raw": response_text}
|
||||
|
||||
# Публикуем событие в Redis
|
||||
event_data = {
|
||||
"event_type": "documents_uploaded",
|
||||
"status": "processing",
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"file_ids": file_ids,
|
||||
"files_count": len(files),
|
||||
"original_filenames": [f.filename for f in files],
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
await redis_service.publish(
|
||||
f"ocr_events:{session_id}",
|
||||
json.dumps(event_data, ensure_ascii=False)
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"file_ids": file_ids,
|
||||
"files_count": len(files),
|
||||
"document_type": document_type,
|
||||
"ocr_status": "processing",
|
||||
"message": f"Загружено {len(files)} файл(ов)",
|
||||
"n8n_response": n8n_response,
|
||||
}
|
||||
|
||||
else:
|
||||
logger.error(
|
||||
"❌ n8n multiple upload error",
|
||||
extra={
|
||||
"status_code": response.status_code,
|
||||
"body": response_text[:500],
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Ошибка n8n: {response_text}",
|
||||
)
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.error("⏱️ n8n multiple upload timeout")
|
||||
raise HTTPException(status_code=504, detail="Таймаут загрузки документов")
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Multiple upload error")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка загрузки документов: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/status/{claim_id}")
|
||||
async def get_documents_status(claim_id: str):
|
||||
"""
|
||||
Получить статус обработки документов для заявки.
|
||||
|
||||
Возвращает:
|
||||
- Список загруженных документов и их OCR статус
|
||||
- Общий прогресс обработки
|
||||
"""
|
||||
try:
|
||||
# TODO: Запрос в PostgreSQL для получения статуса документов
|
||||
# Пока возвращаем mock данные
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"claim_id": claim_id,
|
||||
"documents": [],
|
||||
"ocr_progress": {
|
||||
"total": 0,
|
||||
"completed": 0,
|
||||
"processing": 0,
|
||||
"failed": 0,
|
||||
},
|
||||
"wizard_ready": False,
|
||||
"claim_ready": False,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Error getting documents status")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка получения статуса: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/generate-list")
|
||||
async def generate_documents_list(request: Request):
|
||||
"""
|
||||
Запрос на генерацию списка документов для проблемы.
|
||||
|
||||
Принимает описание проблемы, отправляет в n8n для быстрого AI-анализа.
|
||||
n8n публикует результат в Redis канал ocr_events:{session_id} с event_type=documents_list_ready.
|
||||
"""
|
||||
try:
|
||||
body = await request.json()
|
||||
|
||||
session_id = body.get("session_id")
|
||||
problem_description = body.get("problem_description")
|
||||
|
||||
if not session_id or not problem_description:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="session_id и problem_description обязательны",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"📝 Generate documents list request",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"description_length": len(problem_description),
|
||||
},
|
||||
)
|
||||
|
||||
# Публикуем событие в Redis для n8n
|
||||
event_data = {
|
||||
"type": "generate_documents_list",
|
||||
"session_id": session_id,
|
||||
"claim_id": body.get("claim_id"),
|
||||
"unified_id": body.get("unified_id"),
|
||||
"phone": body.get("phone"),
|
||||
"problem_description": problem_description,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
channel = f"{settings.redis_prefix}documents_list"
|
||||
|
||||
subscribers = await redis_service.publish(
|
||||
channel,
|
||||
json.dumps(event_data, ensure_ascii=False)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"✅ Documents list request published",
|
||||
extra={
|
||||
"channel": channel,
|
||||
"subscribers": subscribers,
|
||||
},
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Запрос на генерацию списка документов отправлен",
|
||||
"channel": channel,
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Error generating documents list")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка генерации списка: {str(e)}",
|
||||
)
|
||||
from typing import Optional, List
|
||||
import httpx
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from ..services.redis_service import redis_service
|
||||
from ..config import settings
|
||||
|
||||
router = APIRouter(prefix="/api/v1/documents", tags=["Documents"])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# n8n webhook для загрузки документов
|
||||
N8N_DOCUMENT_UPLOAD_WEBHOOK = "https://n8n.clientright.pro/webhook/webform_document_upload"
|
||||
|
||||
|
||||
@router.post("/upload")
|
||||
async def upload_document(
|
||||
request: Request,
|
||||
file: UploadFile = File(...),
|
||||
claim_id: str = Form(...),
|
||||
session_id: str = Form(...),
|
||||
document_type: str = Form(...),
|
||||
document_name: Optional[str] = Form(None),
|
||||
document_description: Optional[str] = Form(None),
|
||||
unified_id: Optional[str] = Form(None),
|
||||
contact_id: Optional[str] = Form(None),
|
||||
phone: Optional[str] = Form(None),
|
||||
):
|
||||
"""
|
||||
Загрузка одного документа.
|
||||
|
||||
Принимает файл и метаданные, отправляет в n8n для:
|
||||
1. Сохранения в S3
|
||||
2. OCR обработки
|
||||
3. Обновления черновика в PostgreSQL
|
||||
|
||||
После успешной обработки n8n публикует событие document_ocr_completed в Redis.
|
||||
"""
|
||||
try:
|
||||
# Генерируем уникальный ID файла
|
||||
file_id = f"doc_{uuid.uuid4().hex[:12]}"
|
||||
|
||||
logger.info(
|
||||
"📤 Document upload received",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"file_name": file.filename,
|
||||
"file_size": file.size if hasattr(file, 'size') else 'unknown',
|
||||
"content_type": file.content_type,
|
||||
},
|
||||
)
|
||||
|
||||
# Читаем содержимое файла
|
||||
file_content = await file.read()
|
||||
file_size = len(file_content)
|
||||
|
||||
# Получаем IP клиента
|
||||
client_ip = request.client.host if request.client else "unknown"
|
||||
forwarded_for = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
|
||||
if forwarded_for:
|
||||
client_ip = forwarded_for
|
||||
|
||||
# Формируем данные в формате совместимом с существующим n8n воркфлоу
|
||||
form_data = {
|
||||
# Основные идентификаторы
|
||||
"form_id": "ticket_form",
|
||||
"stage": "document_upload",
|
||||
"session_id": session_id,
|
||||
"claim_id": claim_id,
|
||||
"client_ip": client_ip,
|
||||
|
||||
# Идентификаторы пользователя
|
||||
"unified_id": unified_id or "",
|
||||
"contact_id": contact_id or "",
|
||||
"phone": phone or "",
|
||||
|
||||
# Информация о документе
|
||||
"document_type": document_type,
|
||||
"file_id": file_id,
|
||||
"original_filename": file.filename,
|
||||
"content_type": file.content_type or "application/octet-stream",
|
||||
"file_size": str(file_size),
|
||||
"upload_timestamp": datetime.utcnow().isoformat(),
|
||||
|
||||
# Формат uploads_* для совместимости
|
||||
"uploads_field_names[0]": document_type,
|
||||
"uploads_field_labels[0]": document_name or document_type,
|
||||
"uploads_descriptions[0]": document_description or "",
|
||||
}
|
||||
|
||||
# Файл для multipart (ключ uploads[0] для совместимости)
|
||||
files = {
|
||||
"uploads[0]": (file.filename, file_content, file.content_type or "application/octet-stream")
|
||||
}
|
||||
|
||||
# Отправляем в n8n
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
response = await client.post(
|
||||
N8N_DOCUMENT_UPLOAD_WEBHOOK,
|
||||
data=form_data,
|
||||
files=files,
|
||||
)
|
||||
|
||||
response_text = response.text or ""
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(
|
||||
"✅ Document uploaded to n8n",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"document_type": document_type,
|
||||
"file_id": file_id,
|
||||
"response_preview": response_text[:200],
|
||||
},
|
||||
)
|
||||
|
||||
# Парсим ответ от n8n
|
||||
try:
|
||||
n8n_response = json.loads(response_text)
|
||||
except json.JSONDecodeError:
|
||||
n8n_response = {"raw": response_text}
|
||||
|
||||
# Публикуем событие в Redis для фронтенда
|
||||
event_data = {
|
||||
"event_type": "document_uploaded",
|
||||
"status": "processing",
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"file_id": file_id,
|
||||
"original_filename": file.filename,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
await redis_service.publish(
|
||||
f"ocr_events:{session_id}",
|
||||
json.dumps(event_data, ensure_ascii=False)
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"file_id": file_id,
|
||||
"document_type": document_type,
|
||||
"ocr_status": "processing",
|
||||
"message": "Документ загружен и отправлен на обработку",
|
||||
"n8n_response": n8n_response,
|
||||
}
|
||||
|
||||
else:
|
||||
logger.error(
|
||||
"❌ n8n document upload error",
|
||||
extra={
|
||||
"status_code": response.status_code,
|
||||
"body": response_text[:500],
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Ошибка n8n: {response_text}",
|
||||
)
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.error("⏱️ n8n document upload timeout")
|
||||
raise HTTPException(status_code=504, detail="Таймаут загрузки документа")
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Document upload error")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка загрузки документа: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/upload-multiple")
|
||||
async def upload_multiple_documents(
|
||||
request: Request,
|
||||
files: List[UploadFile] = File(...),
|
||||
claim_id: str = Form(...),
|
||||
session_id: str = Form(...),
|
||||
document_type: str = Form(...),
|
||||
document_name: Optional[str] = Form(None),
|
||||
document_description: Optional[str] = Form(None),
|
||||
unified_id: Optional[str] = Form(None),
|
||||
contact_id: Optional[str] = Form(None),
|
||||
phone: Optional[str] = Form(None),
|
||||
):
|
||||
"""
|
||||
Загрузка нескольких файлов для одного документа (например, несколько страниц паспорта).
|
||||
Все файлы отправляются одним запросом в n8n.
|
||||
"""
|
||||
try:
|
||||
logger.info(
|
||||
"📤 Multiple documents upload received",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"files_count": len(files),
|
||||
"file_names": [f.filename for f in files],
|
||||
},
|
||||
)
|
||||
|
||||
# Получаем IP клиента
|
||||
client_ip = request.client.host if request.client else "unknown"
|
||||
forwarded_for = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
|
||||
if forwarded_for:
|
||||
client_ip = forwarded_for
|
||||
|
||||
# Генерируем ID для каждого файла и читаем контент
|
||||
file_ids = []
|
||||
files_multipart = {}
|
||||
|
||||
for i, file in enumerate(files):
|
||||
file_id = f"doc_{uuid.uuid4().hex[:12]}"
|
||||
file_ids.append(file_id)
|
||||
|
||||
file_content = await file.read()
|
||||
files_multipart[f"uploads[{i}]"] = (
|
||||
file.filename,
|
||||
file_content,
|
||||
file.content_type or "application/octet-stream"
|
||||
)
|
||||
|
||||
# Формируем данные формы
|
||||
form_data = {
|
||||
# Основные идентификаторы
|
||||
"form_id": "ticket_form",
|
||||
"stage": "document_upload",
|
||||
"session_id": session_id,
|
||||
"claim_id": claim_id,
|
||||
"client_ip": client_ip,
|
||||
|
||||
# Идентификаторы пользователя
|
||||
"unified_id": unified_id or "",
|
||||
"contact_id": contact_id or "",
|
||||
"phone": phone or "",
|
||||
|
||||
# Информация о документе
|
||||
"document_type": document_type,
|
||||
"files_count": str(len(files)),
|
||||
"upload_timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
# ✅ Получаем group_index из Form (индекс документа в documents_required)
|
||||
form_params = await request.form()
|
||||
group_index = form_params.get("group_index")
|
||||
if group_index:
|
||||
form_data["group_index"] = group_index
|
||||
logger.info(f"📋 group_index передан в n8n: {group_index}")
|
||||
|
||||
# Добавляем информацию о каждом файле
|
||||
for i, (file, file_id) in enumerate(zip(files, file_ids)):
|
||||
form_data[f"file_ids[{i}]"] = file_id
|
||||
form_data[f"uploads_field_names[{i}]"] = document_type
|
||||
form_data[f"uploads_field_labels[{i}]"] = document_name or document_type
|
||||
form_data[f"uploads_descriptions[{i}]"] = document_description or ""
|
||||
form_data[f"original_filenames[{i}]"] = file.filename
|
||||
|
||||
# Отправляем в n8n одним запросом
|
||||
async with httpx.AsyncClient(timeout=180.0) as client:
|
||||
response = await client.post(
|
||||
N8N_DOCUMENT_UPLOAD_WEBHOOK,
|
||||
data=form_data,
|
||||
files=files_multipart,
|
||||
)
|
||||
|
||||
response_text = response.text or ""
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(
|
||||
"✅ Multiple documents uploaded to n8n",
|
||||
extra={
|
||||
"claim_id": claim_id,
|
||||
"document_type": document_type,
|
||||
"file_ids": file_ids,
|
||||
"files_count": len(files),
|
||||
},
|
||||
)
|
||||
|
||||
# Парсим ответ от n8n
|
||||
try:
|
||||
n8n_response = json.loads(response_text)
|
||||
except json.JSONDecodeError:
|
||||
n8n_response = {"raw": response_text}
|
||||
|
||||
# Публикуем событие в Redis
|
||||
event_data = {
|
||||
"event_type": "documents_uploaded",
|
||||
"status": "processing",
|
||||
"claim_id": claim_id,
|
||||
"session_id": session_id,
|
||||
"document_type": document_type,
|
||||
"file_ids": file_ids,
|
||||
"files_count": len(files),
|
||||
"original_filenames": [f.filename for f in files],
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
await redis_service.publish(
|
||||
f"ocr_events:{session_id}",
|
||||
json.dumps(event_data, ensure_ascii=False)
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"file_ids": file_ids,
|
||||
"files_count": len(files),
|
||||
"document_type": document_type,
|
||||
"ocr_status": "processing",
|
||||
"message": f"Загружено {len(files)} файл(ов)",
|
||||
"n8n_response": n8n_response,
|
||||
}
|
||||
|
||||
else:
|
||||
logger.error(
|
||||
"❌ n8n multiple upload error",
|
||||
extra={
|
||||
"status_code": response.status_code,
|
||||
"body": response_text[:500],
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Ошибка n8n: {response_text}",
|
||||
)
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.error("⏱️ n8n multiple upload timeout")
|
||||
raise HTTPException(status_code=504, detail="Таймаут загрузки документов")
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Multiple upload error")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка загрузки документов: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/status/{claim_id}")
|
||||
async def get_documents_status(claim_id: str):
|
||||
"""
|
||||
Получить статус обработки документов для заявки.
|
||||
|
||||
Возвращает:
|
||||
- Список загруженных документов и их OCR статус
|
||||
- Общий прогресс обработки
|
||||
"""
|
||||
try:
|
||||
# TODO: Запрос в PostgreSQL для получения статуса документов
|
||||
# Пока возвращаем mock данные
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"claim_id": claim_id,
|
||||
"documents": [],
|
||||
"ocr_progress": {
|
||||
"total": 0,
|
||||
"completed": 0,
|
||||
"processing": 0,
|
||||
"failed": 0,
|
||||
},
|
||||
"wizard_ready": False,
|
||||
"claim_ready": False,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Error getting documents status")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка получения статуса: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/generate-list")
|
||||
async def generate_documents_list(request: Request):
|
||||
"""
|
||||
Запрос на генерацию списка документов для проблемы.
|
||||
|
||||
Принимает описание проблемы, отправляет в n8n для быстрого AI-анализа.
|
||||
n8n публикует результат в Redis канал ocr_events:{session_id} с event_type=documents_list_ready.
|
||||
"""
|
||||
try:
|
||||
body = await request.json()
|
||||
|
||||
session_id = body.get("session_id")
|
||||
problem_description = body.get("problem_description")
|
||||
|
||||
if not session_id or not problem_description:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="session_id и problem_description обязательны",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"📝 Generate documents list request",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"description_length": len(problem_description),
|
||||
},
|
||||
)
|
||||
|
||||
# Публикуем событие в Redis для n8n
|
||||
event_data = {
|
||||
"type": "generate_documents_list",
|
||||
"session_id": session_id,
|
||||
"claim_id": body.get("claim_id"),
|
||||
"unified_id": body.get("unified_id"),
|
||||
"phone": body.get("phone"),
|
||||
"problem_description": problem_description,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
channel = f"{settings.redis_prefix}documents_list"
|
||||
|
||||
subscribers = await redis_service.publish(
|
||||
channel,
|
||||
json.dumps(event_data, ensure_ascii=False)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"✅ Documents list request published",
|
||||
extra={
|
||||
"channel": channel,
|
||||
"subscribers": subscribers,
|
||||
},
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Запрос на генерацию списка документов отправлен",
|
||||
"channel": channel,
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("❌ Error generating documents list")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Ошибка генерации списка: {str(e)}",
|
||||
)
|
||||
|
||||
@@ -123,10 +123,18 @@ async def stream_events(task_id: str):
|
||||
# Формат уже плоский (от backend API или старых источников)
|
||||
actual_event = event
|
||||
|
||||
# ✅ Логируем полученное событие
|
||||
event_type = actual_event.get('event_type')
|
||||
logger.info(f"🔍 Processing event: event_type={event_type}, has claim_id={bool(actual_event.get('claim_id'))}")
|
||||
|
||||
# ✅ Обработка нового формата: documents_list_ready
|
||||
if event_type == 'documents_list_ready':
|
||||
logger.info(f"📋 Documents list received: {len(actual_event.get('documents_required', []))} documents")
|
||||
# Просто пропускаем дальше к yield
|
||||
|
||||
# ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type
|
||||
# Это значит, что n8n пушит минимальный payload для wizard_ready
|
||||
logger.info(f"🔍 Checking event: has event_type={bool(actual_event.get('event_type'))}, has claim_id={bool(actual_event.get('claim_id'))}")
|
||||
if not actual_event.get('event_type') and actual_event.get('claim_id'):
|
||||
elif not event_type and actual_event.get('claim_id'):
|
||||
logger.info(f"📦 Detected minimal wizard payload (no event_type), wrapping for claim_id={actual_event.get('claim_id')}")
|
||||
# Обёртываем в правильный формат
|
||||
actual_event = {
|
||||
@@ -209,13 +217,21 @@ async def stream_events(task_id: str):
|
||||
|
||||
# Отправляем событие клиенту (плоский формат)
|
||||
event_json = json.dumps(actual_event, ensure_ascii=False)
|
||||
logger.info(f"📤 Sending event to client: {actual_event.get('status', 'unknown')}")
|
||||
event_type_sent = actual_event.get('event_type', 'unknown')
|
||||
event_status = actual_event.get('status', 'unknown')
|
||||
logger.info(f"📤 Sending event to client: type={event_type_sent}, status={event_status}")
|
||||
yield f"data: {event_json}\n\n"
|
||||
|
||||
# Если обработка завершена - закрываем соединение
|
||||
if actual_event.get('status') in ['completed', 'error', 'success']:
|
||||
# НЕ закрываем для documents_list_ready и document_ocr_completed (ждём ещё события)
|
||||
if event_status in ['completed', 'error'] and event_type_sent not in ['documents_list_ready', 'document_ocr_completed', 'document_uploaded']:
|
||||
logger.info(f"✅ Task {task_id} finished, closing SSE")
|
||||
break
|
||||
|
||||
# Закрываем для финальных событий
|
||||
if event_type_sent in ['claim_ready', 'claim_plan_ready']:
|
||||
logger.info(f"✅ Final event {event_type_sent} sent, closing SSE")
|
||||
break
|
||||
else:
|
||||
logger.info(f"⏰ Timeout waiting for message on {channel}")
|
||||
|
||||
|
||||
@@ -69,6 +69,8 @@ class TicketFormDescriptionRequest(BaseModel):
|
||||
claim_id: Optional[str] = Field(None, description="ID заявки (если уже создана)")
|
||||
phone: Optional[str] = Field(None, description="Номер телефона заявителя")
|
||||
email: Optional[str] = Field(None, description="Email заявителя")
|
||||
unified_id: Optional[str] = Field(None, description="Unified ID пользователя из PostgreSQL")
|
||||
contact_id: Optional[str] = Field(None, description="Contact ID пользователя в CRM")
|
||||
problem_description: str = Field(..., min_length=10, description="Свободное описание ситуации")
|
||||
source: str = Field("ticket_form", description="Источник события")
|
||||
channel: Optional[str] = Field(None, description="Переопределение Redis канала (опционально)")
|
||||
|
||||
@@ -12,7 +12,7 @@ from .services.redis_service import redis_service
|
||||
from .services.rabbitmq_service import rabbitmq_service
|
||||
from .services.policy_service import policy_service
|
||||
from .services.s3_service import s3_service
|
||||
from .api import sms, claims, policy, upload, draft, events, n8n_proxy, session
|
||||
from .api import sms, claims, policy, upload, draft, events, n8n_proxy, session, documents
|
||||
|
||||
# Настройка логирования
|
||||
logging.basicConfig(
|
||||
@@ -103,6 +103,7 @@ app.include_router(draft.router)
|
||||
app.include_router(events.router)
|
||||
app.include_router(n8n_proxy.router) # 🔒 Безопасный proxy к n8n webhooks
|
||||
app.include_router(session.router) # 🔑 Session management через Redis
|
||||
app.include_router(documents.router) # 📄 Documents upload and processing
|
||||
|
||||
|
||||
@app.get("/")
|
||||
@@ -228,3 +229,4 @@ async def info():
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8200)
|
||||
|
||||
|
||||
@@ -54,9 +54,18 @@ class RedisService:
|
||||
async def publish(self, channel: str, message: str):
|
||||
"""Публикация сообщения в канал Redis Pub/Sub"""
|
||||
try:
|
||||
await self.client.publish(channel, message)
|
||||
subscribers_count = await self.client.publish(channel, message)
|
||||
logger.info(
|
||||
f"📢 Redis publish: channel={channel}, message_length={len(message)}, subscribers={subscribers_count}"
|
||||
)
|
||||
if subscribers_count == 0:
|
||||
logger.warning(
|
||||
f"⚠️ No subscribers on channel {channel}. Message published but no one is listening!"
|
||||
)
|
||||
return subscribers_count
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Redis publish error: {e}")
|
||||
raise
|
||||
|
||||
async def delete(self, key: str) -> bool:
|
||||
"""Удалить ключ"""
|
||||
|
||||
Reference in New Issue
Block a user