Files
aiform_dev/backend/app/services/rabbitmq_service.py
AI Assistant 0f82eef08d 🚀 MVP: FastAPI + React форма с SMS верификацией
 Инфраструктура: PostgreSQL, Redis, RabbitMQ, S3
 Backend: SMS сервис + API endpoints
 Frontend: React форма (3 шага) + SMS верификация
2025-10-24 16:19:58 +03:00

227 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
RabbitMQ Service для асинхронной обработки задач
"""
import aio_pika
from aio_pika import Connection, Channel, Queue, Exchange, Message
from aio_pika.pool import Pool
from typing import Optional, Callable, Dict, Any
import json
import logging
from ..config import settings
logger = logging.getLogger(__name__)
class RabbitMQService:
"""Сервис для работы с RabbitMQ"""
# Названия очередей
QUEUE_OCR_PROCESSING = "erv_ocr_processing"
QUEUE_AI_EXTRACTION = "erv_ai_extraction"
QUEUE_FLIGHT_CHECK = "erv_flight_check"
QUEUE_CRM_INTEGRATION = "erv_crm_integration"
QUEUE_NOTIFICATIONS = "erv_notifications"
def __init__(self):
self.connection: Optional[Connection] = None
self.channel: Optional[Channel] = None
self.queues: Dict[str, Queue] = {}
async def connect(self):
"""Подключение к RabbitMQ"""
try:
self.connection = await aio_pika.connect_robust(
settings.rabbitmq_url,
timeout=30
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
logger.info(f"✅ RabbitMQ connected: {settings.rabbitmq_host}:{settings.rabbitmq_port}")
# Объявляем очереди
await self._declare_queues()
except Exception as e:
logger.error(f"❌ RabbitMQ connection error: {e}")
raise
async def disconnect(self):
"""Отключение от RabbitMQ"""
if self.connection:
await self.connection.close()
logger.info("RabbitMQ connection closed")
async def _declare_queues(self):
"""Объявляем все рабочие очереди"""
queue_names = [
self.QUEUE_OCR_PROCESSING,
self.QUEUE_AI_EXTRACTION,
self.QUEUE_FLIGHT_CHECK,
self.QUEUE_CRM_INTEGRATION,
self.QUEUE_NOTIFICATIONS,
]
for queue_name in queue_names:
queue = await self.channel.declare_queue(
queue_name,
durable=True, # Очередь переживет перезапуск
arguments={
"x-message-ttl": 3600000, # TTL сообщений 1 час
"x-max-length": 10000, # Максимум сообщений в очереди
}
)
self.queues[queue_name] = queue
logger.info(f"✅ Queue declared: {queue_name}")
async def publish(
self,
queue_name: str,
message: Dict[str, Any],
priority: int = 5,
headers: Optional[Dict[str, Any]] = None
):
"""
Публикация сообщения в очередь
Args:
queue_name: Название очереди
message: Данные сообщения (dict)
priority: Приоритет (0-10, где 10 - максимальный)
headers: Дополнительные заголовки
"""
try:
msg_body = json.dumps(message).encode()
msg = Message(
body=msg_body,
priority=priority,
headers=headers or {},
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT # Сохранять на диск
)
# Публикуем в default exchange с routing_key = queue_name
await self.channel.default_exchange.publish(
msg,
routing_key=queue_name
)
logger.debug(f"📤 Message published to {queue_name}: {message.get('task_id', 'unknown')}")
except Exception as e:
logger.error(f"❌ Failed to publish message to {queue_name}: {e}")
raise
async def consume(
self,
queue_name: str,
callback: Callable,
prefetch_count: int = 1
):
"""
Подписка на сообщения из очереди
Args:
queue_name: Название очереди
callback: Асинхронная функция-обработчик
prefetch_count: Количество сообщений для одновременной обработки
"""
try:
queue = self.queues.get(queue_name)
if not queue:
logger.error(f"Queue {queue_name} not found")
return
await self.channel.set_qos(prefetch_count=prefetch_count)
await queue.consume(callback)
logger.info(f"👂 Consuming from {queue_name}")
except Exception as e:
logger.error(f"❌ Failed to consume from {queue_name}: {e}")
raise
async def health_check(self) -> bool:
"""Проверка здоровья RabbitMQ"""
try:
if self.connection and not self.connection.is_closed:
return True
return False
except Exception as e:
logger.error(f"RabbitMQ health check failed: {e}")
return False
# ============================================
# ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ ДЛЯ ЗАДАЧ
# ============================================
async def publish_ocr_task(self, claim_id: str, file_id: str, file_path: str):
"""Отправка задачи на OCR обработку"""
await self.publish(
self.QUEUE_OCR_PROCESSING,
{
"task_type": "ocr_processing",
"claim_id": claim_id,
"file_id": file_id,
"file_path": file_path
},
priority=8
)
async def publish_ai_extraction_task(self, claim_id: str, file_id: str, ocr_text: str):
"""Отправка задачи на AI извлечение данных"""
await self.publish(
self.QUEUE_AI_EXTRACTION,
{
"task_type": "ai_extraction",
"claim_id": claim_id,
"file_id": file_id,
"ocr_text": ocr_text
},
priority=7
)
async def publish_flight_check_task(self, claim_id: str, flight_number: str, flight_date: str):
"""Отправка задачи на проверку рейса"""
await self.publish(
self.QUEUE_FLIGHT_CHECK,
{
"task_type": "flight_check",
"claim_id": claim_id,
"flight_number": flight_number,
"flight_date": flight_date
},
priority=6
)
async def publish_crm_integration_task(self, claim_id: str, form_data: Dict[str, Any]):
"""Отправка задачи на интеграцию с CRM"""
await self.publish(
self.QUEUE_CRM_INTEGRATION,
{
"task_type": "crm_integration",
"claim_id": claim_id,
"form_data": form_data
},
priority=9 # Высокий приоритет
)
async def publish_notification_task(self, claim_id: str, notification_type: str, data: Dict[str, Any]):
"""Отправка задачи на отправку уведомления"""
await self.publish(
self.QUEUE_NOTIFICATIONS,
{
"task_type": "notification",
"claim_id": claim_id,
"notification_type": notification_type,
"data": data
},
priority=5
)
# Глобальный экземпляр
rabbitmq_service = RabbitMQService()