Files
hotels/scraper_safe.py
Фёдор 0cf3297290 Проект аудита отелей: основные скрипты и документация
- Краулеры: smart_crawler.py, regional_crawler.py
- Аудит: audit_orel_to_excel.py, audit_chukotka_to_excel.py
- РКН проверка: check_rkn_registry.py, recheck_unclear_rkn.py
- Отчёты: create_orel_horizontal_report.py
- Обработка: process_all_hotels_embeddings.py
- Документация: README.md, DB_SCHEMA_REFERENCE.md
2025-10-16 10:52:09 +03:00

306 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Безопасный парсер данных об отелях с tourism.fsa.gov.ru
Особенности:
- Rate limiting (5 req/sec)
- Checkpoint каждые 100 отелей
- Batch INSERT по 50 записей
- Возможность возобновления
"""
import requests
import psycopg2
from psycopg2.extras import execute_batch
import time
import logging
from datetime import datetime
from urllib.parse import unquote
from typing import Optional, Dict, List
import json
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'scraper_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Параметры подключения к БД
DB_CONFIG = {
'host': "147.45.189.234",
'port': 5432,
'database': "default_db",
'user': "gen_user",
'password': unquote("2~~9_%5EkVsU%3F2%5CS")
}
# Параметры парсинга
API_BASE_URL = "https://tourism.fsa.gov.ru/api/v1"
RATE_LIMIT_DELAY = 0.2 # 5 запросов в секунду
BATCH_SIZE = 50 # Записей в одном INSERT
CHECKPOINT_INTERVAL = 100 # Сохранять прогресс каждые N отелей
PAGE_SIZE = 100 # Отелей на страницу
class HotelScraper:
def __init__(self):
self.conn = None
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; HotelDataCollector/1.0)'
})
self.processed_count = 0
self.error_count = 0
self.start_time = None
def connect_db(self):
"""Подключение к базе данных"""
try:
self.conn = psycopg2.connect(**DB_CONFIG)
logger.info("✓ Подключено к базе данных")
except Exception as e:
logger.error(f"✗ Ошибка подключения к БД: {e}")
raise
def api_request(self, url: str, method='GET', **kwargs) -> Optional[Dict]:
"""Безопасный запрос к API с rate limiting"""
time.sleep(RATE_LIMIT_DELAY)
try:
response = self.session.request(method, url, timeout=30, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {url} - {e}")
return None
def get_hotels_list(self, page: int) -> Optional[List[Dict]]:
"""Получить список отелей с страницы"""
url = f"{API_BASE_URL}/resorts/hotels/showcase"
params = {'page': page, 'limit': PAGE_SIZE}
logger.info(f"Загружаю страницу {page}...")
data = self.api_request(url, params=params)
if data and 'data' in data:
return data['data']
return None
def get_hotel_details(self, hotel_id: str) -> Dict[str, Optional[Dict]]:
"""Получить детальную информацию об отеле"""
details = {
'main': None,
'additional_info': None,
'sanatorium': None,
'drawer': None
}
# Main info
url = f"{API_BASE_URL}/resorts/hotels/{hotel_id}/main"
details['main'] = self.api_request(url)
# Additional info
url = f"{API_BASE_URL}/resorts/common/{hotel_id}/additional-info"
details['additional_info'] = self.api_request(url)
# Sanatorium info (может не быть для обычных отелей)
url = f"{API_BASE_URL}/resorts/hotels/{hotel_id}/sanatoriumDrawer"
details['sanatorium'] = self.api_request(url)
# Drawer (услуги)
url = f"{API_BASE_URL}/resorts/hotels/{hotel_id}/drawer"
details['drawer'] = self.api_request(url)
return details
def save_hotel_batch(self, hotels_data: List[tuple]):
"""Сохранить батч отелей в базу"""
if not hotels_data:
return
cur = self.conn.cursor()
try:
# INSERT hotel_main
insert_sql = """
INSERT INTO hotel_main
(id, full_name, short_name, status_id, status_name,
category_id, category_name, region_id, region_name,
hotel_type_id, hotel_type_name, register_record,
register_record_date, owner_full_name, owner_ogrn, owner_inn,
phone, email, website_address, addresses, photo_ids,
has_seasonal, activation_datetime)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
full_name = EXCLUDED.full_name,
updated_at = CURRENT_TIMESTAMP;
"""
execute_batch(cur, insert_sql, hotels_data, page_size=BATCH_SIZE)
self.conn.commit()
logger.info(f"✓ Сохранено {len(hotels_data)} отелей")
except Exception as e:
self.conn.rollback()
logger.error(f"✗ Ошибка сохранения батча: {e}")
self.error_count += len(hotels_data)
finally:
cur.close()
def save_checkpoint(self, page: int, total_pages: int, status='in_progress'):
"""Сохранить контрольную точку"""
cur = self.conn.cursor()
try:
cur.execute("""
INSERT INTO hotel_parsing_progress
(page_number, total_pages, processed_count, status, started_at)
VALUES (%s, %s, %s, %s, %s)
""", (page, total_pages, self.processed_count, status, self.start_time))
self.conn.commit()
except Exception as e:
logger.error(f"Ошибка сохранения checkpoint: {e}")
finally:
cur.close()
def parse_showcase_data(self, hotel: Dict) -> tuple:
"""Распарсить данные из showcase"""
try:
addresses = json.dumps(hotel.get('addressList', []))
photo_ids = [photo for photo in [hotel.get('photoId')] if photo]
return (
hotel.get('id'),
hotel.get('fullName'),
None, # short_name не в showcase
hotel.get('status', {}).get('id'),
hotel.get('status', {}).get('name'),
hotel.get('category', {}).get('id'),
hotel.get('category', {}).get('name'),
hotel.get('region', {}).get('id'),
hotel.get('region', {}).get('name'),
hotel.get('hotelType', {}).get('id'),
hotel.get('hotelType', {}).get('name'),
hotel.get('registerRecord'),
hotel.get('registerRecordDate'),
hotel.get('ownerName'),
hotel.get('ownerOgrn'),
hotel.get('ownerInn'),
None, # phone не в showcase
None, # email не в showcase
None, # website не в showcase
addresses,
photo_ids,
None, # has_seasonal не в showcase
hotel.get('activationDateTime')
)
except Exception as e:
logger.error(f"Ошибка парсинга отеля {hotel.get('id')}: {e}")
return None
def run(self, start_page=0, max_pages=None):
"""Запустить парсинг"""
self.start_time = datetime.now()
self.connect_db()
logger.info("=" * 60)
logger.info("Запуск парсера отелей tourism.fsa.gov.ru")
logger.info(f"Начало: {self.start_time}")
logger.info("=" * 60)
page = start_page
batch = []
try:
while True:
# Проверка лимита страниц
if max_pages and page >= start_page + max_pages:
logger.info(f"Достигнут лимит страниц: {max_pages}")
break
# Получаем список отелей
hotels = self.get_hotels_list(page)
if not hotels:
logger.info("Больше нет данных или ошибка API")
break
# Обрабатываем каждый отель
for hotel in hotels:
hotel_data = self.parse_showcase_data(hotel)
if hotel_data:
batch.append(hotel_data)
self.processed_count += 1
# Сохраняем батч
if len(batch) >= BATCH_SIZE:
self.save_hotel_batch(batch)
batch = []
# Checkpoint
if self.processed_count % CHECKPOINT_INTERVAL == 0:
self.save_checkpoint(page, -1)
elapsed = (datetime.now() - self.start_time).total_seconds()
rate = self.processed_count / elapsed if elapsed > 0 else 0
logger.info(f"Progress: {self.processed_count} отелей, {rate:.1f} отелей/сек")
page += 1
# Если вернулось меньше PAGE_SIZE, значит это последняя страница
if len(hotels) < PAGE_SIZE:
logger.info("Достигнута последняя страница")
break
# Сохраняем остаток
if batch:
self.save_hotel_batch(batch)
# Финальный checkpoint
self.save_checkpoint(page, page, 'completed')
except KeyboardInterrupt:
logger.info("\n⚠ Парсинг прерван пользователем")
if batch:
logger.info("Сохраняю незавершенный батч...")
self.save_hotel_batch(batch)
self.save_checkpoint(page, -1, 'interrupted')
except Exception as e:
logger.error(f"✗ Критическая ошибка: {e}")
self.save_checkpoint(page, -1, 'failed')
finally:
if self.conn:
self.conn.close()
elapsed = (datetime.now() - self.start_time).total_seconds()
logger.info("=" * 60)
logger.info("Парсинг завершен")
logger.info(f"Обработано: {self.processed_count} отелей")
logger.info(f"Ошибок: {self.error_count}")
logger.info(f"Время работы: {elapsed/60:.1f} минут")
logger.info(f"Скорость: {self.processed_count/elapsed:.1f} отелей/сек")
logger.info("=" * 60)
if __name__ == "__main__":
import sys
# Параметры запуска
start_page = int(sys.argv[1]) if len(sys.argv) > 1 else 0
max_pages = int(sys.argv[2]) if len(sys.argv) > 2 else None
logger.info(f"Параметры: start_page={start_page}, max_pages={max_pages or 'все'}")
scraper = HotelScraper()
scraper.run(start_page=start_page, max_pages=max_pages)