""" OCR Worker - обработка файлов в фоне через RabbitMQ + Redis Pub/Sub """ import asyncio import json import logging from typing import Dict, Any from aio_pika import connect_robust, IncomingMessage from app.config import settings from app.services.ocr_service import ocr_service from app.services.redis_service import redis_service logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class OCRWorker: """Worker для обработки OCR задач в фоне""" def __init__(self): self.connection = None self.channel = None self.queue_name = "erv_ocr_processing" async def connect(self): """Подключение к RabbitMQ""" self.connection = await connect_robust(settings.rabbitmq_url) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=1) # По одной задаче self.queue = await self.channel.declare_queue( self.queue_name, durable=True ) logger.info(f"✅ Worker connected to RabbitMQ: {self.queue_name}") async def publish_event(self, task_id: str, event: Dict[str, Any]): """ Публикация события в Redis для real-time обновлений Args: task_id: ID задачи event: Данные события """ channel = f"ocr_events:{task_id}" event_json = json.dumps(event, ensure_ascii=False) try: await redis_service.publish(channel, event_json) logger.info(f"📢 Event published to {channel}: {event['status']}") except Exception as e: logger.error(f"❌ Failed to publish event: {e}") async def process_task(self, message: IncomingMessage): """ Обработка задачи OCR Args: message: Сообщение из RabbitMQ """ async with message.process(): try: # Парсим задачу task = json.loads(message.body.decode()) task_id = task["task_id"] file_content = bytes.fromhex(task["file_content_hex"]) filename = task["filename"] logger.info(f"🔄 Processing task {task_id}: {filename}") # Событие: начало обработки await self.publish_event(task_id, { "status": "processing", "message": "Начата обработка файла", "filename": filename }) # Шаг 1: OCR обработка await self.publish_event(task_id, { "status": "ocr_started", "message": "Запущено распознавание текста" }) result = await ocr_service.process_document(file_content, filename) # Событие: OCR завершён await self.publish_event(task_id, { "status": "ocr_completed", "message": f"Распознано {len(result['ocr_text'])} символов", "chars": len(result['ocr_text']) }) # Шаг 2: AI анализ (если есть текст) if result['ocr_text']: await self.publish_event(task_id, { "status": "ai_started", "message": "Запущен AI анализ документа" }) # Событие: всё готово await self.publish_event(task_id, { "status": "completed", "message": "Обработка завершена", "result": { "document_type": result["document_type"], "is_valid": result["is_valid"], "confidence": result["confidence"], "extracted_data": result["extracted_data"], "ocr_text_length": len(result["ocr_text"]) } }) # Сохраняем результат в Redis (TTL 1 час) cache_key = f"ocr_result:{task_id}" await redis_service.set_json(cache_key, result, ttl=3600) logger.info(f"✅ Task {task_id} completed successfully") except Exception as e: logger.error(f"❌ Task processing error: {e}") # Событие: ошибка await self.publish_event(task_id, { "status": "error", "message": f"Ошибка обработки: {str(e)}" }) async def start(self): """Запуск worker""" await self.connect() logger.info(f"🚀 OCR Worker started, waiting for tasks...") # Слушаем очередь await self.queue.consume(self.process_task) # Держим worker живым try: await asyncio.Future() except KeyboardInterrupt: logger.info("👋 Worker stopped") async def main(): """Точка входа""" worker = OCRWorker() await worker.start() if __name__ == "__main__": asyncio.run(main())