feat: Интеграция n8n + Redis Pub/Sub + SSE для real-time обработки заявок
🎯 Основные изменения: Backend: - ✅ Добавлен SSE endpoint для real-time событий (/api/v1/events/{task_id}) - ✅ Redis Pub/Sub для публикации/подписки на события OCR/Vision - ✅ Удален aioboto3 из requirements.txt (конфликт зависимостей) - ✅ Добавлен OCR worker (deprecated, логика перенесена в n8n) Frontend (React): - ✅ Автогенерация claim_id и session_id - ✅ Клиентская конвертация файлов в PDF (JPG/PNG/HEIC/WEBP) - ✅ Сжатие изображений до 2MB перед конвертацией - ✅ SSE подписка на события OCR/Vision в Step1Policy - ✅ Валидация документов (полис vs неподходящий контент) - ✅ Real-time прогресс загрузки и обработки файлов - ✅ Интеграция с n8n webhooks для проверки полиса и загрузки файлов n8n Workflows: - ✅ Проверка полиса в MySQL + запись в PostgreSQL - ✅ Загрузка файлов в S3 + OCR + Vision AI - ✅ Публикация событий в Redis через backend API - ✅ Валидация документов (распознавание полисов ERV) Документация: - 📝 N8N_INTEGRATION.md - интеграция с n8n - 📝 N8N_SQL_QUERIES.md - SQL запросы для workflows - 📝 N8N_PDF_COMPRESS.md - сжатие PDF - 📝 N8N_STIRLING_COMPRESS.md - интеграция Stirling-PDF Утилиты: - 🔧 monitor_redis.py/sh - мониторинг Redis Pub/Sub - 🔧 test_redis_events.sh - тестирование событий - 🔧 pdfConverter.ts - клиентская конвертация в PDF Архитектура: React → n8n webhooks (sync) → MySQL/PostgreSQL/S3 → n8n workflows (async) → OCR/Vision → Redis Pub/Sub → SSE → React
This commit is contained in:
131
backend/app/api/events.py
Normal file
131
backend/app/api/events.py
Normal file
@@ -0,0 +1,131 @@
|
||||
"""
|
||||
SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
from fastapi import APIRouter, Body
|
||||
from fastapi.responses import StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
from typing import Dict, Any
|
||||
from app.services.redis_service import redis_service
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
class EventPublish(BaseModel):
|
||||
"""Модель для публикации события"""
|
||||
event_type: str = "ocr_completed"
|
||||
status: str
|
||||
message: str
|
||||
data: Dict[str, Any] = {}
|
||||
timestamp: str = None
|
||||
|
||||
|
||||
@router.post("/events/{task_id}")
|
||||
async def publish_event(task_id: str, event: EventPublish):
|
||||
"""
|
||||
Публикация события в Redis канал
|
||||
|
||||
Используется n8n для отправки событий (OCR, AI и т.д.)
|
||||
|
||||
Args:
|
||||
task_id: ID задачи
|
||||
event: Данные события
|
||||
|
||||
Returns:
|
||||
Статус публикации
|
||||
"""
|
||||
try:
|
||||
channel = f"ocr_events:{task_id}"
|
||||
event_data = {
|
||||
"event_type": event.event_type,
|
||||
"status": event.status,
|
||||
"message": event.message,
|
||||
"data": event.data,
|
||||
"timestamp": event.timestamp
|
||||
}
|
||||
|
||||
# Публикуем в Redis
|
||||
event_json = json.dumps(event_data, ensure_ascii=False)
|
||||
await redis_service.publish(channel, event_json)
|
||||
|
||||
logger.info(f"📢 Event published to {channel}: {event.status}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"channel": channel,
|
||||
"event": event_data
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to publish event: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
@router.get("/events/{task_id}")
|
||||
async def stream_events(task_id: str):
|
||||
"""
|
||||
SSE стрим событий обработки OCR
|
||||
|
||||
Args:
|
||||
task_id: ID задачи
|
||||
|
||||
Returns:
|
||||
StreamingResponse с событиями
|
||||
"""
|
||||
|
||||
async def event_generator():
|
||||
"""Генератор событий из Redis Pub/Sub"""
|
||||
channel = f"ocr_events:{task_id}"
|
||||
|
||||
# Подписываемся на канал Redis
|
||||
pubsub = redis_service.redis.pubsub()
|
||||
await pubsub.subscribe(channel)
|
||||
|
||||
logger.info(f"📡 Client subscribed to {channel}")
|
||||
|
||||
# Отправляем начальное событие
|
||||
yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n"
|
||||
|
||||
try:
|
||||
# Слушаем события
|
||||
while True:
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=30.0)
|
||||
|
||||
if message and message['type'] == 'message':
|
||||
event_data = message['data'].decode('utf-8')
|
||||
|
||||
# Отправляем событие клиенту
|
||||
yield f"data: {event_data}\n\n"
|
||||
|
||||
# Если обработка завершена - закрываем соединение
|
||||
event = json.loads(event_data)
|
||||
if event.get('status') in ['completed', 'error']:
|
||||
logger.info(f"✅ Task {task_id} finished, closing SSE")
|
||||
break
|
||||
|
||||
# Пинг каждые 30 сек чтобы соединение не закрылось
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"❌ Client disconnected from {channel}")
|
||||
finally:
|
||||
await pubsub.unsubscribe(channel)
|
||||
await pubsub.close()
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Отключаем буферизацию nginx
|
||||
}
|
||||
)
|
||||
|
||||
@@ -12,7 +12,7 @@ from .services.redis_service import redis_service
|
||||
from .services.rabbitmq_service import rabbitmq_service
|
||||
from .services.policy_service import policy_service
|
||||
from .services.s3_service import s3_service
|
||||
from .api import sms, claims, policy, upload, draft
|
||||
from .api import sms, claims, policy, upload, draft, events
|
||||
|
||||
# Настройка логирования
|
||||
logging.basicConfig(
|
||||
@@ -98,6 +98,7 @@ app.include_router(claims.router)
|
||||
app.include_router(policy.router)
|
||||
app.include_router(upload.router)
|
||||
app.include_router(draft.router)
|
||||
app.include_router(events.router, prefix="/api/v1")
|
||||
|
||||
|
||||
@app.get("/")
|
||||
|
||||
@@ -6,6 +6,7 @@ import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from ..config import settings
|
||||
import json
|
||||
from .s3_service import s3_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -39,25 +40,104 @@ class OCRService:
|
||||
}
|
||||
|
||||
try:
|
||||
# Шаг 1: OCR распознавание текста
|
||||
# Шаг 0: Загружаем файл в S3 и получаем presigned URL
|
||||
logger.info(f"📤 Uploading file to S3: {filename}")
|
||||
|
||||
# Определяем content_type
|
||||
content_type = "image/jpeg"
|
||||
if filename.lower().endswith('.pdf'):
|
||||
content_type = "application/pdf"
|
||||
elif filename.lower().endswith('.png'):
|
||||
content_type = "image/png"
|
||||
elif filename.lower().endswith(('.heic', '.heif')):
|
||||
content_type = "image/heic"
|
||||
|
||||
# Загружаем в S3
|
||||
s3_url = await s3_service.upload_file(
|
||||
file_content=file_content,
|
||||
filename=filename,
|
||||
content_type=content_type,
|
||||
folder="ocr_temp"
|
||||
)
|
||||
|
||||
if not s3_url:
|
||||
logger.error("❌ Failed to upload file to S3")
|
||||
return result
|
||||
|
||||
# Используем простой публичный URL
|
||||
# Файлы в ocr_temp/ загружаются с ACL=public-read
|
||||
ocr_file_url = s3_url # Уже публичный URL!
|
||||
|
||||
logger.info(f"✅ File uploaded to S3, using public URL for OCR")
|
||||
|
||||
# Шаг 1: OCR распознавание текста через URL
|
||||
logger.info(f"🔍 Starting OCR for: {filename}")
|
||||
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
files = {"file": (filename, file_content, "image/jpeg")}
|
||||
# Определяем file_type по расширению (OCR API требует строку!)
|
||||
file_ext = filename.lower().split('.')[-1]
|
||||
file_type_map = {
|
||||
'pdf': 'pdf',
|
||||
'jpg': 'jpeg',
|
||||
'jpeg': 'jpeg',
|
||||
'png': 'png',
|
||||
'heic': 'heic',
|
||||
'heif': 'heic',
|
||||
'docx': 'docx',
|
||||
'doc': 'doc'
|
||||
}
|
||||
file_type = file_type_map.get(file_ext, 'pdf') # По умолчанию pdf
|
||||
|
||||
logger.info(f"📄 File type detected: {file_type}")
|
||||
|
||||
async with httpx.AsyncClient(timeout=90.0) as client:
|
||||
# OCR API ожидает JSON с file_url
|
||||
response = await client.post(
|
||||
f"{self.ocr_url}/analyze-file",
|
||||
files=files
|
||||
json={
|
||||
"file_url": ocr_file_url, # Публичный URL
|
||||
"file_name": filename,
|
||||
"file_type": file_type # ✅ Теперь строка, не None!
|
||||
}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
ocr_result = response.json()
|
||||
ocr_text = ocr_result.get("text", "")
|
||||
|
||||
# OCR API возвращает массив: [{text: "", pages_data: [...]}]
|
||||
ocr_text = ""
|
||||
|
||||
if isinstance(ocr_result, list) and len(ocr_result) > 0:
|
||||
data = ocr_result[0]
|
||||
|
||||
# Пробуем извлечь текст из pages_data
|
||||
if "pages_data" in data and len(data["pages_data"]) > 0:
|
||||
# Собираем текст со всех страниц
|
||||
texts = []
|
||||
for page in data["pages_data"]:
|
||||
page_text = page.get("ocr_text", "")
|
||||
if page_text:
|
||||
texts.append(page_text)
|
||||
ocr_text = "\n\n".join(texts)
|
||||
|
||||
# Если нет pages_data, пробуем text или full_text
|
||||
if not ocr_text:
|
||||
ocr_text = data.get("text", "") or data.get("full_text", "")
|
||||
|
||||
elif isinstance(ocr_result, dict):
|
||||
# Старый формат (на всякий случай)
|
||||
ocr_text = ocr_result.get("text", "") or ocr_result.get("full_text", "")
|
||||
|
||||
result["ocr_text"] = ocr_text
|
||||
|
||||
logger.info(f"📄 OCR completed: {len(ocr_text)} chars")
|
||||
logger.debug(f"OCR Text preview: {ocr_text[:200]}...")
|
||||
if ocr_text:
|
||||
logger.info(f"OCR Text preview: {ocr_text[:200]}...")
|
||||
else:
|
||||
logger.warning("⚠️ OCR returned empty text!")
|
||||
logger.debug(f"OCR response structure: {list(ocr_result.keys()) if isinstance(ocr_result, dict) else type(ocr_result)}")
|
||||
else:
|
||||
logger.error(f"❌ OCR failed: {response.status_code}")
|
||||
logger.error(f"Response: {response.text[:500]}")
|
||||
return result
|
||||
|
||||
# Шаг 2: AI анализ - что это за документ?
|
||||
|
||||
@@ -51,6 +51,13 @@ class RedisService:
|
||||
else:
|
||||
await self.client.set(full_key, value)
|
||||
|
||||
async def publish(self, channel: str, message: str):
|
||||
"""Публикация сообщения в канал Redis Pub/Sub"""
|
||||
try:
|
||||
await self.client.publish(channel, message)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Redis publish error: {e}")
|
||||
|
||||
async def delete(self, key: str) -> bool:
|
||||
"""Удалить ключ"""
|
||||
full_key = f"{settings.redis_prefix}{key}"
|
||||
|
||||
@@ -64,18 +64,22 @@ class S3Service:
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
safe_filename = f"{folder}/{timestamp}_{unique_id}_{filename}"
|
||||
|
||||
# Загружаем файл
|
||||
# Загружаем файл с публичным доступом (для OCR)
|
||||
# ВРЕМЕННОЕ РЕШЕНИЕ: делаем файлы публичными пока presigned URL не работает
|
||||
acl = 'public-read' if folder == 'ocr_temp' else 'private'
|
||||
|
||||
self.client.put_object(
|
||||
Bucket=self.bucket,
|
||||
Key=safe_filename,
|
||||
Body=file_content,
|
||||
ContentType=content_type
|
||||
ContentType=content_type,
|
||||
ACL=acl # Делаем ocr_temp файлы публичными
|
||||
)
|
||||
|
||||
# Генерируем URL
|
||||
file_url = f"{settings.s3_endpoint}/{self.bucket}/{safe_filename}"
|
||||
|
||||
logger.info(f"✅ File uploaded to S3: {safe_filename}")
|
||||
logger.info(f"✅ File uploaded to S3: {safe_filename} (ACL: {acl})")
|
||||
return file_url
|
||||
|
||||
except Exception as e:
|
||||
@@ -97,6 +101,51 @@ class S3Service:
|
||||
except Exception as e:
|
||||
logger.error(f"❌ S3 delete error: {e}")
|
||||
return False
|
||||
|
||||
def generate_presigned_url(self, file_key: str, expiration: int = 3600) -> Optional[str]:
|
||||
"""
|
||||
Генерация временного публичного URL для файла
|
||||
|
||||
Args:
|
||||
file_key: Ключ файла в S3 (путь)
|
||||
expiration: Время жизни URL в секундах (по умолчанию 1 час)
|
||||
|
||||
Returns:
|
||||
Presigned URL или None при ошибке
|
||||
"""
|
||||
if not self.client:
|
||||
self.connect()
|
||||
|
||||
try:
|
||||
# Для Timeweb Cloud Storage нужно использовать ClientMethod вместо обычного метода
|
||||
# И добавить HttpMethod явно
|
||||
url = self.client.generate_presigned_url(
|
||||
ClientMethod='get_object',
|
||||
Params={
|
||||
'Bucket': self.bucket,
|
||||
'Key': file_key
|
||||
},
|
||||
ExpiresIn=expiration,
|
||||
HttpMethod='GET'
|
||||
)
|
||||
logger.info(f"✅ Presigned URL generated for: {file_key} (expires in {expiration}s)")
|
||||
return url
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Presigned URL generation error: {e}")
|
||||
return None
|
||||
|
||||
def get_public_url(self, file_key: str) -> str:
|
||||
"""
|
||||
Простой публичный URL (без подписи)
|
||||
ВНИМАНИЕ: Работает только если bucket публичный!
|
||||
|
||||
Args:
|
||||
file_key: Ключ файла в S3
|
||||
|
||||
Returns:
|
||||
Публичный URL
|
||||
"""
|
||||
return f"{settings.s3_endpoint}/{self.bucket}/{file_key}"
|
||||
|
||||
|
||||
# Глобальный экземпляр
|
||||
|
||||
158
backend/app/workers/ocr_worker.py
Normal file
158
backend/app/workers/ocr_worker.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""
|
||||
OCR Worker - обработка файлов в фоне через RabbitMQ + Redis Pub/Sub
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
from aio_pika import connect_robust, IncomingMessage
|
||||
from app.config import settings
|
||||
from app.services.ocr_service import ocr_service
|
||||
from app.services.redis_service import redis_service
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OCRWorker:
|
||||
"""Worker для обработки OCR задач в фоне"""
|
||||
|
||||
def __init__(self):
|
||||
self.connection = None
|
||||
self.channel = None
|
||||
self.queue_name = "erv_ocr_processing"
|
||||
|
||||
async def connect(self):
|
||||
"""Подключение к RabbitMQ"""
|
||||
self.connection = await connect_robust(settings.rabbitmq_url)
|
||||
self.channel = await self.connection.channel()
|
||||
await self.channel.set_qos(prefetch_count=1) # По одной задаче
|
||||
|
||||
self.queue = await self.channel.declare_queue(
|
||||
self.queue_name,
|
||||
durable=True
|
||||
)
|
||||
|
||||
logger.info(f"✅ Worker connected to RabbitMQ: {self.queue_name}")
|
||||
|
||||
async def publish_event(self, task_id: str, event: Dict[str, Any]):
|
||||
"""
|
||||
Публикация события в Redis для real-time обновлений
|
||||
|
||||
Args:
|
||||
task_id: ID задачи
|
||||
event: Данные события
|
||||
"""
|
||||
channel = f"ocr_events:{task_id}"
|
||||
event_json = json.dumps(event, ensure_ascii=False)
|
||||
|
||||
try:
|
||||
await redis_service.publish(channel, event_json)
|
||||
logger.info(f"📢 Event published to {channel}: {event['status']}")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to publish event: {e}")
|
||||
|
||||
async def process_task(self, message: IncomingMessage):
|
||||
"""
|
||||
Обработка задачи OCR
|
||||
|
||||
Args:
|
||||
message: Сообщение из RabbitMQ
|
||||
"""
|
||||
async with message.process():
|
||||
try:
|
||||
# Парсим задачу
|
||||
task = json.loads(message.body.decode())
|
||||
task_id = task["task_id"]
|
||||
file_content = bytes.fromhex(task["file_content_hex"])
|
||||
filename = task["filename"]
|
||||
|
||||
logger.info(f"🔄 Processing task {task_id}: {filename}")
|
||||
|
||||
# Событие: начало обработки
|
||||
await self.publish_event(task_id, {
|
||||
"status": "processing",
|
||||
"message": "Начата обработка файла",
|
||||
"filename": filename
|
||||
})
|
||||
|
||||
# Шаг 1: OCR обработка
|
||||
await self.publish_event(task_id, {
|
||||
"status": "ocr_started",
|
||||
"message": "Запущено распознавание текста"
|
||||
})
|
||||
|
||||
result = await ocr_service.process_document(file_content, filename)
|
||||
|
||||
# Событие: OCR завершён
|
||||
await self.publish_event(task_id, {
|
||||
"status": "ocr_completed",
|
||||
"message": f"Распознано {len(result['ocr_text'])} символов",
|
||||
"chars": len(result['ocr_text'])
|
||||
})
|
||||
|
||||
# Шаг 2: AI анализ (если есть текст)
|
||||
if result['ocr_text']:
|
||||
await self.publish_event(task_id, {
|
||||
"status": "ai_started",
|
||||
"message": "Запущен AI анализ документа"
|
||||
})
|
||||
|
||||
# Событие: всё готово
|
||||
await self.publish_event(task_id, {
|
||||
"status": "completed",
|
||||
"message": "Обработка завершена",
|
||||
"result": {
|
||||
"document_type": result["document_type"],
|
||||
"is_valid": result["is_valid"],
|
||||
"confidence": result["confidence"],
|
||||
"extracted_data": result["extracted_data"],
|
||||
"ocr_text_length": len(result["ocr_text"])
|
||||
}
|
||||
})
|
||||
|
||||
# Сохраняем результат в Redis (TTL 1 час)
|
||||
cache_key = f"ocr_result:{task_id}"
|
||||
await redis_service.set_json(cache_key, result, ttl=3600)
|
||||
|
||||
logger.info(f"✅ Task {task_id} completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Task processing error: {e}")
|
||||
|
||||
# Событие: ошибка
|
||||
await self.publish_event(task_id, {
|
||||
"status": "error",
|
||||
"message": f"Ошибка обработки: {str(e)}"
|
||||
})
|
||||
|
||||
async def start(self):
|
||||
"""Запуск worker"""
|
||||
await self.connect()
|
||||
|
||||
logger.info(f"🚀 OCR Worker started, waiting for tasks...")
|
||||
|
||||
# Слушаем очередь
|
||||
await self.queue.consume(self.process_task)
|
||||
|
||||
# Держим worker живым
|
||||
try:
|
||||
await asyncio.Future()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("👋 Worker stopped")
|
||||
|
||||
|
||||
async def main():
|
||||
"""Точка входа"""
|
||||
worker = OCRWorker()
|
||||
await worker.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ aiofiles==24.1.0
|
||||
|
||||
# S3
|
||||
boto3==1.35.56
|
||||
aioboto3==13.2.0
|
||||
|
||||
# Validation
|
||||
pydantic==2.10.0
|
||||
|
||||
Reference in New Issue
Block a user