#!/usr/bin/env python3 """ ЭТАП 1: Создание hotel_website_processed для Питера через Browserless Scrape HTML → Browserless → cleaned_text → hotel_website_processed """ import psycopg2 from psycopg2.extras import RealDictCursor from urllib.parse import unquote import requests import json import time import logging from datetime import datetime import threading from concurrent.futures import ThreadPoolExecutor, as_completed import queue # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f'spb_processed_creation_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Конфигурация БД DB_CONFIG = { 'host': '147.45.189.234', 'port': 5432, 'database': 'default_db', 'user': 'gen_user', 'password': unquote('2~~9_%5EkVsU%3F2%5CS') } # Browserless API BROWSERLESS_URL = "http://147.45.146.17:3000/function?token=9ahhnpjkchxtcho9" # Многопоточность MAX_WORKERS = 5 # Количество потоков для Browserless class SpbProcessor: def __init__(self): self.conn = None self.cur = None self.connect_db() def connect_db(self): """Подключение к БД""" try: self.conn = psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor) self.conn.autocommit = True self.cur = self.conn.cursor() logger.info("✅ Подключение к БД установлено") except Exception as e: logger.error(f"❌ Ошибка подключения к БД: {e}") raise def create_processed_table(self): """Создание таблицы hotel_website_processed если не существует""" try: self.cur.execute(""" CREATE TABLE IF NOT EXISTS hotel_website_processed ( id SERIAL PRIMARY KEY, hotel_id UUID NOT NULL, url TEXT, cleaned_text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (hotel_id) REFERENCES hotel_main(id) ); """) # Создаем индекс self.cur.execute(""" CREATE INDEX IF NOT EXISTS idx_hotel_website_processed_hotel_id ON hotel_website_processed(hotel_id); """) logger.info("✅ Таблица hotel_website_processed готова") except Exception as e: logger.error(f"❌ Ошибка создания таблицы: {e}") raise def clean_html_with_browserless(self, html: str, max_retries: int = 3) -> str: """Очистка HTML через Browserless Scrape API с retry логикой""" # JavaScript функция для извлечения текста scrape_function = """ export default async function ({ page, context }) { const html = context.html; // Устанавливаем HTML в страницу await page.setContent(html); // Извлекаем весь текст const text = await page.evaluate(() => { // Удаляем script и style элементы const scripts = document.querySelectorAll('script, style'); scripts.forEach(el => el.remove()); // Получаем весь текст return document.body.innerText || document.body.textContent || ''; }); return { text: text, length: text.length }; } """ payload = { "code": scrape_function, "context": {"html": html} } for attempt in range(max_retries): try: response = requests.post(BROWSERLESS_URL, json=payload, timeout=30) response.raise_for_status() result = response.json() if result and 'text' in result: return result['text'] return "" except Exception as e: if attempt < max_retries - 1: wait_time = 2 ** attempt # Экспоненциальная задержка logger.warning(f"⚠️ Попытка {attempt + 1}/{max_retries} не удалась, ждём {wait_time}с: {e}") time.sleep(wait_time) else: logger.error(f"❌ Все попытки исчерпаны для Browserless API: {e}") return "" return "" def process_page(self, page_data: dict) -> dict: """Обработка одной страницы (для многопоточности)""" page_id = page_data['id'] url = page_data['url'] html = page_data['html'] hotel_id = page_data['hotel_id'] try: # Очищаем HTML через Browserless cleaned_text = self.clean_html_with_browserless(html) if cleaned_text and len(cleaned_text.strip()) > 50: return { 'success': True, 'page_id': page_id, 'hotel_id': hotel_id, 'url': url, 'cleaned_text': cleaned_text, 'length': len(cleaned_text) } else: return { 'success': False, 'page_id': page_id, 'hotel_id': hotel_id, 'error': 'Слишком короткий текст' } except Exception as e: return { 'success': False, 'page_id': page_id, 'hotel_id': hotel_id, 'error': str(e) } def process_hotel_pages(self, hotel_id: str) -> int: """Обработка всех страниц одного отеля (многопоточно)""" try: # Получаем HTML страницы отеля self.cur.execute(""" SELECT id, url, html FROM hotel_website_raw WHERE hotel_id = %s AND html IS NOT NULL ORDER BY id """, (hotel_id,)) pages = self.cur.fetchall() if not pages: logger.warning(f"⚠️ Нет HTML для отеля {hotel_id}") return 0 logger.info(f"📄 Найдено {len(pages)} страниц для отеля") # Подготавливаем данные для многопоточности page_data_list = [] for page in pages: page_data_list.append({ 'id': page['id'], 'url': page['url'], 'html': page['html'], 'hotel_id': hotel_id }) processed_count = 0 # Многопоточная обработка страниц with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # Отправляем задачи future_to_page = { executor.submit(self.process_page, page_data): page_data for page_data in page_data_list } # Обрабатываем результаты for future in as_completed(future_to_page): result = future.result() if result['success']: # Сохраняем в hotel_website_processed self.cur.execute(""" INSERT INTO hotel_website_processed (hotel_id, url, cleaned_text) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (result['hotel_id'], result['url'], result['cleaned_text'])) processed_count += 1 logger.info(f" ✅ Страница {result['page_id']}: {result['length']} символов") else: logger.warning(f" ⚠️ Страница {result['page_id']}: {result['error']}") logger.info(f"✅ Отель обработан: {processed_count}/{len(pages)} страниц") return processed_count except Exception as e: logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}") return 0 def get_spb_hotels(self): """Получение списка отелей Питера для обработки""" try: self.cur.execute(""" SELECT DISTINCT h.id, h.full_name FROM hotel_main h INNER JOIN hotel_website_raw hwr ON h.id = hwr.hotel_id WHERE h.region_name = 'г. Санкт-Петербург' AND hwr.html IS NOT NULL AND NOT EXISTS ( SELECT 1 FROM hotel_website_processed hwp WHERE hwp.hotel_id = h.id ) ORDER BY h.id """) hotels = self.cur.fetchall() logger.info(f"📊 Найдено {len(hotels)} отелей для обработки") return hotels except Exception as e: logger.error(f"❌ Ошибка получения списка отелей: {e}") return [] def get_stats(self): """Получение статистики""" try: # Всего отелей с HTML self.cur.execute(""" SELECT COUNT(DISTINCT h.id) FROM hotel_main h INNER JOIN hotel_website_raw hwr ON h.id = hwr.hotel_id WHERE h.region_name = 'г. Санкт-Петербург' AND hwr.html IS NOT NULL """) total_hotels = self.cur.fetchone()[0] # Обработанных отелей self.cur.execute(""" SELECT COUNT(DISTINCT hotel_id) FROM hotel_website_processed hwp WHERE EXISTS ( SELECT 1 FROM hotel_main h WHERE h.id = hwp.hotel_id AND h.region_name = 'г. Санкт-Петербург' ) """) processed_hotels = self.cur.fetchone()[0] # Всего страниц обработано self.cur.execute(""" SELECT COUNT(*) FROM hotel_website_processed hwp WHERE EXISTS ( SELECT 1 FROM hotel_main h WHERE h.id = hwp.hotel_id AND h.region_name = 'г. Санкт-Петербург' ) """) processed_pages = self.cur.fetchone()[0] return { 'total_hotels': total_hotels, 'processed_hotels': processed_hotels, 'processed_pages': processed_pages } except Exception as e: logger.error(f"❌ Ошибка получения статистики: {e}") return {} def close(self): """Закрытие соединения""" if self.cur: self.cur.close() if self.conn: self.conn.close() def main(): """Основная функция""" logger.info("🚀 ЭТАП 1: Создание hotel_website_processed для Питера") logger.info("🌐 Используем Browserless Scrape для лучшего качества") logger.info(f"⚡ Многопоточность: {MAX_WORKERS} потоков") processor = SpbProcessor() try: # Создаем таблицу processor.create_processed_table() # Получаем статистику stats = processor.get_stats() logger.info(f"📊 Статистика:") logger.info(f" Всего отелей: {stats.get('total_hotels', 0)}") logger.info(f" Обработано отелей: {stats.get('processed_hotels', 0)}") logger.info(f" Обработано страниц: {stats.get('processed_pages', 0)}") # Получаем список отелей для обработки hotels = processor.get_spb_hotels() if not hotels: logger.info("✅ Все отели уже обработаны!") return logger.info(f"🔄 Начинаем обработку {len(hotels)} отелей...") total_processed = 0 start_time = time.time() for i, hotel in enumerate(hotels, 1): hotel_id = hotel['id'] hotel_name = hotel['full_name'] logger.info(f"🏨 [{i}/{len(hotels)}] {hotel_name[:50]}...") processed = processor.process_hotel_pages(hotel_id) total_processed += processed # Обновляем статистику каждые 10 отелей if i % 10 == 0: stats = processor.get_stats() elapsed = time.time() - start_time rate = i / elapsed * 3600 # отелей в час logger.info(f"📈 Прогресс: {i}/{len(hotels)} отелей") logger.info(f"⏱️ Скорость: {rate:.1f} отелей/час") logger.info(f"📊 Обработано страниц: {stats.get('processed_pages', 0)}") # Финальная статистика elapsed = time.time() - start_time stats = processor.get_stats() logger.info("=" * 60) logger.info("✅ ЭТАП 1 ЗАВЕРШЁН!") logger.info(f" Время: {elapsed/3600:.1f} часов") logger.info(f" Обработано отелей: {stats.get('processed_hotels', 0)}") logger.info(f" Обработано страниц: {stats.get('processed_pages', 0)}") logger.info("=" * 60) except Exception as e: logger.error(f"❌ Критическая ошибка: {e}") finally: processor.close() if __name__ == "__main__": main()