Files
crm.clientright.ru/pdf_processor/app/main.py
Fedor 01c4fe80b5 chore: snapshot current working tree changes
Save all currently accumulated repository changes as a backup snapshot for Gitea so no local work is lost.
2026-03-26 14:19:01 +03:00

776 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
PDF Processor Microservice
Converts images and documents to PDF, merges them.
Supports async processing, monitoring, and comprehensive API documentation.
"""
import os
import json
import subprocess
import tempfile
import logging
import time
import uuid
import asyncio
from typing import Optional, Any, Dict
from datetime import datetime
from enum import Enum
from fastapi import FastAPI, HTTPException, Depends, Header, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, Field
import redis.asyncio as redis
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("pdf_processor")
# API Key из env
API_KEY = os.getenv("PDF_PROCESSOR_API_KEY", "pdf-processor-secret-key-2025")
# Redis настройки
REDIS_HOST = os.getenv("REDIS_HOST", "crm.clientright.ru")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "CRM_Redis_Pass_2025_Secure!")
REDIS_DB = int(os.getenv("REDIS_DB", 0))
REDIS_PREFIX = "pdf_processor:"
# Глобальный Redis клиент
redis_client: Optional[redis.Redis] = None
# Метрики (в памяти, можно вынести в Redis для production)
metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"async_tasks_created": 0,
"async_tasks_completed": 0,
"total_processing_time": 0.0,
"errors_by_type": {}
}
app = FastAPI(
title="PDF Processor Microservice",
description="""
Микросервис для обработки PDF файлов.
## Возможности:
* **Конвертация** - Изображения и документы в PDF
* **Объединение** - Слияние нескольких PDF в один
* **Асинхронная обработка** - Для больших файлов
* **Мониторинг** - Метрики и статистика
## Аутентификация:
Все POST endpoints требуют заголовок `X-API-Key` с валидным API ключом.
""",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# ============================================
# Models
# ============================================
class TaskStatus(str, Enum):
"""Статусы асинхронной задачи"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class ProcessRequest(BaseModel):
"""Запрос на обработку файлов"""
data: Any = Field(..., description="JSON данные для скрипта обработки")
mode: str = Field(default="--pdf-merge", description="Режим обработки (--pdf-merge)")
class Config:
json_schema_extra = {
"example": {
"data": {
"files": [
{
"file": {
"url": "https://example.com/file.pdf",
"file_name": "document.pdf"
}
}
]
},
"mode": "--pdf-merge"
}
}
class ProcessResponse(BaseModel):
"""Ответ синхронной обработки"""
success: bool = Field(..., description="Успешность обработки")
result: Optional[Any] = Field(None, description="Результат обработки (JSON)")
error: Optional[str] = Field(None, description="Сообщение об ошибке")
processing_time: float = Field(..., description="Время обработки в секундах")
logs: Optional[str] = Field(None, description="Логи обработки")
class AsyncTaskResponse(BaseModel):
"""Ответ создания асинхронной задачи"""
task_id: str = Field(..., description="ID задачи для проверки статуса")
status: str = Field(..., description="Текущий статус задачи")
message: str = Field(..., description="Сообщение")
status_url: str = Field(..., description="URL для проверки статуса")
class TaskStatusResponse(BaseModel):
"""Статус асинхронной задачи"""
task_id: str = Field(..., description="ID задачи")
status: TaskStatus = Field(..., description="Статус задачи")
created_at: str = Field(..., description="Время создания")
updated_at: str = Field(..., description="Время последнего обновления")
result: Optional[Any] = Field(None, description="Результат (если завершено)")
error: Optional[str] = Field(None, description="Ошибка (если есть)")
processing_time: Optional[float] = Field(None, description="Время обработки")
logs: Optional[str] = Field(None, description="Логи обработки")
class HealthResponse(BaseModel):
"""Health check ответ"""
status: str = Field(..., description="Статус сервиса")
timestamp: str = Field(..., description="Временная метка")
version: str = Field(..., description="Версия сервиса")
redis_connected: bool = Field(..., description="Статус подключения к Redis")
class MetricsResponse(BaseModel):
"""Метрики сервиса"""
total_requests: int = Field(..., description="Всего запросов")
successful_requests: int = Field(..., description="Успешных запросов")
failed_requests: int = Field(..., description="Неудачных запросов")
async_tasks_created: int = Field(..., description="Создано асинхронных задач")
async_tasks_completed: int = Field(..., description="Завершено асинхронных задач")
average_processing_time: float = Field(..., description="Среднее время обработки (сек)")
errors_by_type: Dict[str, int] = Field(..., description="Ошибки по типам")
# ============================================
# Redis Connection
# ============================================
async def get_redis() -> redis.Redis:
"""Получить Redis клиент"""
global redis_client
if redis_client is None:
try:
redis_client = await redis.from_url(
f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}",
encoding="utf-8",
decode_responses=True
)
await redis_client.ping()
logger.info(f"✅ Redis connected: {REDIS_HOST}:{REDIS_PORT}")
except Exception as e:
logger.error(f"❌ Redis connection error: {e}")
redis_client = None
return redis_client
async def close_redis():
"""Закрыть соединение с Redis"""
global redis_client
if redis_client:
await redis_client.close()
redis_client = None
@app.on_event("startup")
async def startup_event():
"""Инициализация при старте"""
await get_redis()
logger.info("🚀 PDF Processor Microservice started")
@app.on_event("shutdown")
async def shutdown_event():
"""Очистка при остановке"""
await close_redis()
logger.info("👋 PDF Processor Microservice stopped")
# ============================================
# Auth
# ============================================
async def verify_api_key(x_api_key: str = Header(None, alias="X-API-Key")):
"""Проверка API ключа"""
if not x_api_key:
raise HTTPException(status_code=401, detail="X-API-Key header required")
if x_api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
return x_api_key
# ============================================
# Helper Functions
# ============================================
async def save_task_status(task_id: str, status: TaskStatus, result: Any = None,
error: str = None, processing_time: float = None, logs: str = None):
"""Сохранить статус задачи в Redis"""
try:
redis = await get_redis()
if redis:
task_data = {
"task_id": task_id,
"status": status.value,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"result": json.dumps(result) if result else None,
"error": error,
"processing_time": processing_time,
"logs": logs
}
# Обновляем updated_at если задача уже существует
existing = await redis.get(f"{REDIS_PREFIX}task:{task_id}")
if existing:
existing_data = json.loads(existing)
task_data["created_at"] = existing_data.get("created_at", task_data["created_at"])
await redis.setex(
f"{REDIS_PREFIX}task:{task_id}",
3600, # TTL 1 час
json.dumps(task_data, ensure_ascii=False)
)
except Exception as e:
logger.error(f"Error saving task status: {e}")
async def get_task_status(task_id: str) -> Optional[Dict]:
"""Получить статус задачи из Redis"""
try:
redis = await get_redis()
if redis:
data = await redis.get(f"{REDIS_PREFIX}task:{task_id}")
if data:
return json.loads(data)
except Exception as e:
logger.error(f"Error getting task status: {e}")
return None
def update_metrics(success: bool, processing_time: float, error_type: str = None):
"""Обновить метрики"""
metrics["total_requests"] += 1
metrics["total_processing_time"] += processing_time
if success:
metrics["successful_requests"] += 1
else:
metrics["failed_requests"] += 1
if error_type:
metrics["errors_by_type"][error_type] = metrics["errors_by_type"].get(error_type, 0) + 1
async def process_pdf_task(task_id: str, json_data: str, mode: str):
"""Фоновая задача обработки PDF"""
start_time = time.time()
logs = []
try:
await save_task_status(task_id, TaskStatus.PROCESSING)
logs.append(f"[{datetime.utcnow().isoformat()}] Task {task_id} started")
script_path = "/app/scripts/process_pdfs.sh"
if not os.path.exists(script_path):
raise Exception(f"Script not found: {script_path}")
logs.append(f"[{datetime.utcnow().isoformat()}] Executing script: {script_path}")
logs.append(f"[{datetime.utcnow().isoformat()}] JSON data length: {len(json_data)} bytes")
# Передаём JSON через stdin вместо аргументов командной строки
# Это решает проблему "Argument list too long" для больших JSON
result = subprocess.run(
["/bin/bash", script_path, mode],
input=json_data,
capture_output=True,
text=True,
timeout=300
)
if result.stderr:
for line in result.stderr.strip().split('\n'):
if line:
logs.append(line)
logs.append(f"[{datetime.utcnow().isoformat()}] Script exit code: {result.returncode}")
if result.returncode != 0:
error_msg = result.stderr or "Unknown error"
logs.append(f"[{datetime.utcnow().isoformat()}] Script error: {error_msg}")
processing_time = time.time() - start_time
await save_task_status(
task_id, TaskStatus.FAILED,
error=error_msg,
processing_time=processing_time,
logs="\n".join(logs)
)
metrics["async_tasks_completed"] += 1
update_metrics(False, processing_time, "script_error")
return
stdout = result.stdout.strip()
if not stdout:
logs.append(f"[{datetime.utcnow().isoformat()}] Empty output from script")
processing_time = time.time() - start_time
await save_task_status(
task_id, TaskStatus.COMPLETED,
result=[],
processing_time=processing_time,
logs="\n".join(logs)
)
metrics["async_tasks_completed"] += 1
update_metrics(True, processing_time)
return
try:
parsed_result = json.loads(stdout)
logs.append(f"[{datetime.utcnow().isoformat()}] Successfully parsed output")
except json.JSONDecodeError as e:
error_msg = f"Failed to parse script output: {e}"
logs.append(f"[{datetime.utcnow().isoformat()}] JSON parse error: {e}")
processing_time = time.time() - start_time
await save_task_status(
task_id, TaskStatus.FAILED,
error=error_msg,
processing_time=processing_time,
logs="\n".join(logs)
)
metrics["async_tasks_completed"] += 1
update_metrics(False, processing_time, "json_parse_error")
return
processing_time = time.time() - start_time
logs.append(f"[{datetime.utcnow().isoformat()}] Processing completed in {processing_time:.2f}s")
await save_task_status(
task_id, TaskStatus.COMPLETED,
result=parsed_result,
processing_time=processing_time,
logs="\n".join(logs)
)
metrics["async_tasks_completed"] += 1
update_metrics(True, processing_time)
except subprocess.TimeoutExpired:
processing_time = time.time() - start_time
error_msg = "Script execution timed out (>5 minutes)"
logs.append(f"[{datetime.utcnow().isoformat()}] Script timeout after {processing_time:.2f}s")
await save_task_status(
task_id, TaskStatus.FAILED,
error=error_msg,
processing_time=processing_time,
logs="\n".join(logs)
)
metrics["async_tasks_completed"] += 1
update_metrics(False, processing_time, "timeout")
except Exception as e:
processing_time = time.time() - start_time
error_msg = str(e)
logs.append(f"[{datetime.utcnow().isoformat()}] Exception: {error_msg}")
logger.exception(f"Error in async task {task_id}")
await save_task_status(
task_id, TaskStatus.FAILED,
error=error_msg,
processing_time=processing_time,
logs="\n".join(logs)
)
metrics["async_tasks_completed"] += 1
update_metrics(False, processing_time, "exception")
# ============================================
# Endpoints
# ============================================
@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check():
"""
Health check endpoint
Проверяет состояние сервиса и подключение к Redis.
"""
redis_connected = False
try:
redis = await get_redis()
if redis:
await redis.ping()
redis_connected = True
except:
pass
return HealthResponse(
status="healthy",
timestamp=datetime.utcnow().isoformat(),
version="1.0.0",
redis_connected=redis_connected
)
@app.get("/", tags=["Info"])
async def root():
"""Root endpoint с информацией о сервисе"""
return {
"service": "PDF Processor Microservice",
"version": "1.0.0",
"endpoints": {
"health": "/health",
"docs": "/docs",
"redoc": "/redoc",
"process": "/process (POST, requires X-API-Key)",
"process_async": "/process/async (POST, requires X-API-Key)",
"task_status": "/process/status/{task_id} (GET, requires X-API-Key)",
"metrics": "/metrics (GET, requires X-API-Key)"
}
}
@app.post("/process", response_model=ProcessResponse, dependencies=[Depends(verify_api_key)], tags=["Processing"])
async def process_pdfs(request: ProcessRequest):
"""
Синхронная обработка файлов - конвертация в PDF и объединение.
Принимает JSON с данными о файлах, вызывает bash скрипт,
возвращает результат сразу.
**Используйте для небольших файлов (< 10MB)**
**Пример запроса:**
```json
{
"data": {
"files": [{
"file": {
"url": "https://example.com/file.pdf",
"file_name": "document.pdf"
}
}]
},
"mode": "--pdf-merge"
}
```
"""
start_time = time.time()
logs = []
try:
json_data = json.dumps(request.data, ensure_ascii=False)
logs.append(f"[{datetime.utcnow().isoformat()}] Received request with mode: {request.mode}")
script_path = "/app/scripts/process_pdfs.sh"
if not os.path.exists(script_path):
raise HTTPException(status_code=500, detail=f"Script not found: {script_path}")
logs.append(f"[{datetime.utcnow().isoformat()}] Executing script: {script_path}")
logs.append(f"[{datetime.utcnow().isoformat()}] JSON data length: {len(json_data)} bytes")
# Передаём JSON через stdin вместо аргументов командной строки
# Это решает проблему "Argument list too long" для больших JSON
result = subprocess.run(
["/bin/bash", script_path, request.mode],
input=json_data,
capture_output=True,
text=True,
timeout=300
)
if result.stderr:
for line in result.stderr.strip().split('\n'):
if line:
logs.append(line)
logs.append(f"[{datetime.utcnow().isoformat()}] Script exit code: {result.returncode}")
if result.returncode != 0:
error_msg = result.stderr or "Unknown error"
logs.append(f"[{datetime.utcnow().isoformat()}] Script error: {error_msg}")
processing_time = time.time() - start_time
update_metrics(False, processing_time, "script_error")
return ProcessResponse(
success=False,
error=error_msg,
processing_time=processing_time,
logs="\n".join(logs)
)
stdout = result.stdout.strip()
if not stdout:
logs.append(f"[{datetime.utcnow().isoformat()}] Empty output from script")
processing_time = time.time() - start_time
update_metrics(True, processing_time)
return ProcessResponse(
success=True,
result=[],
processing_time=processing_time,
logs="\n".join(logs)
)
try:
parsed_result = json.loads(stdout)
logs.append(f"[{datetime.utcnow().isoformat()}] Successfully parsed output")
except json.JSONDecodeError as e:
logs.append(f"[{datetime.utcnow().isoformat()}] JSON parse error: {e}")
processing_time = time.time() - start_time
update_metrics(False, processing_time, "json_parse_error")
return ProcessResponse(
success=False,
error=f"Failed to parse script output: {e}",
processing_time=processing_time,
logs="\n".join(logs)
)
processing_time = time.time() - start_time
logs.append(f"[{datetime.utcnow().isoformat()}] Processing completed in {processing_time:.2f}s")
update_metrics(True, processing_time)
return ProcessResponse(
success=True,
result=parsed_result,
processing_time=processing_time,
logs="\n".join(logs)
)
except subprocess.TimeoutExpired:
processing_time = time.time() - start_time
logs.append(f"[{datetime.utcnow().isoformat()}] Script timeout after {processing_time:.2f}s")
update_metrics(False, processing_time, "timeout")
return ProcessResponse(
success=False,
error="Script execution timed out (>5 minutes)",
processing_time=processing_time,
logs="\n".join(logs)
)
except Exception as e:
processing_time = time.time() - start_time
logs.append(f"[{datetime.utcnow().isoformat()}] Exception: {str(e)}")
logger.exception("Error processing request")
update_metrics(False, processing_time, "exception")
return ProcessResponse(
success=False,
error=str(e),
processing_time=processing_time,
logs="\n".join(logs)
)
@app.post("/process/async", response_model=AsyncTaskResponse, dependencies=[Depends(verify_api_key)], tags=["Processing"])
async def process_pdfs_async(request: ProcessRequest, background_tasks: BackgroundTasks):
"""
Асинхронная обработка файлов (для больших запросов).
Возвращает `task_id`, результат можно получить позже через `/process/status/{task_id}`.
**Используйте для больших файлов (> 10MB) или множественных файлов**
**Пример запроса:**
```json
{
"data": {
"files": [{
"file": {
"url": "https://example.com/large-file.pdf",
"file_name": "large-document.pdf"
}
}]
},
"mode": "--pdf-merge"
}
```
**Пример ответа:**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "pending",
"message": "Task created successfully",
"status_url": "/process/status/550e8400-e29b-41d4-a716-446655440000"
}
```
"""
task_id = str(uuid.uuid4())
json_data = json.dumps(request.data, ensure_ascii=False)
# Сохраняем задачу как pending
await save_task_status(task_id, TaskStatus.PENDING)
# Запускаем фоновую задачу
background_tasks.add_task(process_pdf_task, task_id, json_data, request.mode)
metrics["async_tasks_created"] += 1
return AsyncTaskResponse(
task_id=task_id,
status=TaskStatus.PENDING.value,
message="Task created successfully",
status_url=f"/process/status/{task_id}"
)
@app.get("/process/status/{task_id}", response_model=TaskStatusResponse, dependencies=[Depends(verify_api_key)], tags=["Processing"])
async def get_task_status_endpoint(task_id: str):
"""
Получить статус асинхронной задачи.
**Статусы:**
- `pending` - Задача создана, ожидает обработки
- `processing` - Задача выполняется
- `completed` - Задача завершена успешно
- `failed` - Задача завершена с ошибкой
**Пример ответа (completed):**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"created_at": "2025-12-29T12:00:00",
"updated_at": "2025-12-29T12:05:30",
"result": [...],
"processing_time": 330.5,
"logs": "..."
}
```
"""
task_data = await get_task_status(task_id)
if not task_data:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
return TaskStatusResponse(
task_id=task_data["task_id"],
status=TaskStatus(task_data["status"]),
created_at=task_data["created_at"],
updated_at=task_data["updated_at"],
result=json.loads(task_data["result"]) if task_data.get("result") else None,
error=task_data.get("error"),
processing_time=task_data.get("processing_time"),
logs=task_data.get("logs")
)
@app.get("/metrics", response_model=MetricsResponse, dependencies=[Depends(verify_api_key)], tags=["Monitoring"])
async def get_metrics():
"""
Получить метрики сервиса.
Возвращает статистику обработки:
- Общее количество запросов
- Успешные/неудачные запросы
- Асинхронные задачи
- Среднее время обработки
- Ошибки по типам
**Формат Prometheus:**
Используйте заголовок `Accept: text/plain` для получения метрик в формате Prometheus.
"""
avg_time = 0.0
if metrics["successful_requests"] + metrics["failed_requests"] > 0:
avg_time = metrics["total_processing_time"] / (metrics["successful_requests"] + metrics["failed_requests"])
return MetricsResponse(
total_requests=metrics["total_requests"],
successful_requests=metrics["successful_requests"],
failed_requests=metrics["failed_requests"],
async_tasks_created=metrics["async_tasks_created"],
async_tasks_completed=metrics["async_tasks_completed"],
average_processing_time=round(avg_time, 2),
errors_by_type=metrics["errors_by_type"]
)
@app.get("/metrics/prometheus", dependencies=[Depends(verify_api_key)], tags=["Monitoring"])
async def get_metrics_prometheus():
"""
Метрики в формате Prometheus.
Возвращает метрики в формате, совместимом с Prometheus.
"""
avg_time = 0.0
total_processed = metrics["successful_requests"] + metrics["failed_requests"]
if total_processed > 0:
avg_time = metrics["total_processing_time"] / total_processed
prom_metrics = f"""# HELP pdf_processor_total_requests Total number of requests
# TYPE pdf_processor_total_requests counter
pdf_processor_total_requests {metrics["total_requests"]}
# HELP pdf_processor_successful_requests Number of successful requests
# TYPE pdf_processor_successful_requests counter
pdf_processor_successful_requests {metrics["successful_requests"]}
# HELP pdf_processor_failed_requests Number of failed requests
# TYPE pdf_processor_failed_requests counter
pdf_processor_failed_requests {metrics["failed_requests"]}
# HELP pdf_processor_async_tasks_created Number of async tasks created
# TYPE pdf_processor_async_tasks_created counter
pdf_processor_async_tasks_created {metrics["async_tasks_created"]}
# HELP pdf_processor_async_tasks_completed Number of async tasks completed
# TYPE pdf_processor_async_tasks_completed counter
pdf_processor_async_tasks_completed {metrics["async_tasks_completed"]}
# HELP pdf_processor_average_processing_time Average processing time in seconds
# TYPE pdf_processor_average_processing_time gauge
pdf_processor_average_processing_time {avg_time:.2f}
"""
# Добавляем ошибки по типам
for error_type, count in metrics["errors_by_type"].items():
prom_metrics += f'\n# HELP pdf_processor_errors_by_type Number of errors by type\n'
prom_metrics += f'# TYPE pdf_processor_errors_by_type counter\n'
prom_metrics += f'pdf_processor_errors_by_type{{type="{error_type}"}} {count}\n'
return JSONResponse(content=prom_metrics, media_type="text/plain")
# ============================================
# Error handlers
# ============================================
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.exception(f"Unhandled exception: {exc}")
update_metrics(False, 0.0, "unhandled_exception")
return JSONResponse(
status_code=500,
content={
"success": False,
"error": str(exc),
"processing_time": 0,
"logs": None
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8300)