""" RabbitMQ Service для асинхронной обработки задач """ import aio_pika from aio_pika import Connection, Channel, Queue, Exchange, Message from aio_pika.pool import Pool from typing import Optional, Callable, Dict, Any import json import logging from ..config import settings logger = logging.getLogger(__name__) class RabbitMQService: """Сервис для работы с RabbitMQ""" # Названия очередей QUEUE_OCR_PROCESSING = "erv_ocr_processing" QUEUE_AI_EXTRACTION = "erv_ai_extraction" QUEUE_FLIGHT_CHECK = "erv_flight_check" QUEUE_CRM_INTEGRATION = "erv_crm_integration" QUEUE_NOTIFICATIONS = "erv_notifications" def __init__(self): self.connection: Optional[Connection] = None self.channel: Optional[Channel] = None self.queues: Dict[str, Queue] = {} async def connect(self): """Подключение к RabbitMQ""" try: self.connection = await aio_pika.connect_robust( settings.rabbitmq_url, timeout=30 ) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=10) logger.info(f"✅ RabbitMQ connected: {settings.rabbitmq_host}:{settings.rabbitmq_port}") # Объявляем очереди await self._declare_queues() except Exception as e: logger.error(f"❌ RabbitMQ connection error: {e}") raise async def disconnect(self): """Отключение от RabbitMQ""" if self.connection: await self.connection.close() logger.info("RabbitMQ connection closed") async def _declare_queues(self): """Объявляем все рабочие очереди""" queue_names = [ self.QUEUE_OCR_PROCESSING, self.QUEUE_AI_EXTRACTION, self.QUEUE_FLIGHT_CHECK, self.QUEUE_CRM_INTEGRATION, self.QUEUE_NOTIFICATIONS, ] for queue_name in queue_names: queue = await self.channel.declare_queue( queue_name, durable=True, # Очередь переживет перезапуск arguments={ "x-message-ttl": 3600000, # TTL сообщений 1 час "x-max-length": 10000, # Максимум сообщений в очереди } ) self.queues[queue_name] = queue logger.info(f"✅ Queue declared: {queue_name}") async def publish( self, queue_name: str, message: Dict[str, Any], priority: int = 5, headers: Optional[Dict[str, Any]] = None ): """ Публикация сообщения в очередь Args: queue_name: Название очереди message: Данные сообщения (dict) priority: Приоритет (0-10, где 10 - максимальный) headers: Дополнительные заголовки """ try: msg_body = json.dumps(message).encode() msg = Message( body=msg_body, priority=priority, headers=headers or {}, content_type="application/json", delivery_mode=aio_pika.DeliveryMode.PERSISTENT # Сохранять на диск ) # Публикуем в default exchange с routing_key = queue_name await self.channel.default_exchange.publish( msg, routing_key=queue_name ) logger.debug(f"📤 Message published to {queue_name}: {message.get('task_id', 'unknown')}") except Exception as e: logger.error(f"❌ Failed to publish message to {queue_name}: {e}") raise async def consume( self, queue_name: str, callback: Callable, prefetch_count: int = 1 ): """ Подписка на сообщения из очереди Args: queue_name: Название очереди callback: Асинхронная функция-обработчик prefetch_count: Количество сообщений для одновременной обработки """ try: queue = self.queues.get(queue_name) if not queue: logger.error(f"Queue {queue_name} not found") return await self.channel.set_qos(prefetch_count=prefetch_count) await queue.consume(callback) logger.info(f"👂 Consuming from {queue_name}") except Exception as e: logger.error(f"❌ Failed to consume from {queue_name}: {e}") raise async def health_check(self) -> bool: """Проверка здоровья RabbitMQ""" try: if self.connection and not self.connection.is_closed: return True return False except Exception as e: logger.error(f"RabbitMQ health check failed: {e}") return False # ============================================ # ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ ДЛЯ ЗАДАЧ # ============================================ async def publish_ocr_task(self, claim_id: str, file_id: str, file_path: str): """Отправка задачи на OCR обработку""" await self.publish( self.QUEUE_OCR_PROCESSING, { "task_type": "ocr_processing", "claim_id": claim_id, "file_id": file_id, "file_path": file_path }, priority=8 ) async def publish_ai_extraction_task(self, claim_id: str, file_id: str, ocr_text: str): """Отправка задачи на AI извлечение данных""" await self.publish( self.QUEUE_AI_EXTRACTION, { "task_type": "ai_extraction", "claim_id": claim_id, "file_id": file_id, "ocr_text": ocr_text }, priority=7 ) async def publish_flight_check_task(self, claim_id: str, flight_number: str, flight_date: str): """Отправка задачи на проверку рейса""" await self.publish( self.QUEUE_FLIGHT_CHECK, { "task_type": "flight_check", "claim_id": claim_id, "flight_number": flight_number, "flight_date": flight_date }, priority=6 ) async def publish_crm_integration_task(self, claim_id: str, form_data: Dict[str, Any]): """Отправка задачи на интеграцию с CRM""" await self.publish( self.QUEUE_CRM_INTEGRATION, { "task_type": "crm_integration", "claim_id": claim_id, "form_data": form_data }, priority=9 # Высокий приоритет ) async def publish_notification_task(self, claim_id: str, notification_type: str, data: Dict[str, Any]): """Отправка задачи на отправку уведомления""" await self.publish( self.QUEUE_NOTIFICATIONS, { "task_type": "notification", "claim_id": claim_id, "notification_type": notification_type, "data": data }, priority=5 ) # Глобальный экземпляр rabbitmq_service = RabbitMQService()