Python3 爬虫实战教程:静态网站全站爬取

1. 前言

本教程将带领你完成一个完整的静态网站爬取项目,涵盖从数据抓取到存储的全流程。通过这个案例,你将学会:

  • 使用 requests 库抓取网页内容
  • 使用正则表达式解析 HTML
  • 实现分页爬取逻辑
  • 数据存储到 JSON 文件
  • 使用多进程加速爬取

2. 技术准备

在开始之前,请确保:

  1. Python 3.6+ 环境已安装
  2. 安装必要库:
    pip install requests

3. 目标网站分析

我们将爬取的目标网站是:https://ssr1.scrape.center/

3.1 网站结构

  • 列表页:显示电影列表,支持分页
  • 详情页:显示单个电影的详细信息

3.2 需要抓取的数据

  • 电影名称
  • 封面图片
  • 类别
  • 上映时间
  • 评分
  • 剧情简介

4. 爬虫实现

4.1 基础配置

import requests
import logging
import re
from urllib.parse import urljoin
import json
import os
from multiprocessing import Pool

# 日志配置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

# 常量定义
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
os.makedirs(RESULTS_DIR, exist_ok=True)

4.2 通用爬取方法

def scrape_page(url):
    """
    通用页面爬取方法
    :param url: 目标URL
    :return: HTML内容
    """
    logging.info('scraping %s...', url)
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        logging.error('get invalid status code %s while scraping %s',
                     response.status_code, url)
    except requests.RequestException:
        logging.error('error occurred while scraping %s', url, exc_info=True)

4.3 列表页处理

def scrape_index(page):
    """
    爬取列表页
    :param page: 页码
    :return: 列表页HTML
    """
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

def parse_index(html):
    """
    解析列表页,获取详情页URL
    :param html: 列表页HTML
    :return: 详情页URL生成器
    """
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern, html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL, item)
        logging.info('get detail url %s', detail_url)
        yield detail_url

4.4 详情页处理

def scrape_detail(url):
    """
    爬取详情页
    :param url: 详情页URL
    :return: 详情页HTML
    """
    return scrape_page(url)

def parse_detail(html):
    """
    解析详情页
    :param html: 详情页HTML
    :return: 电影信息字典
    """
    # 封面
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
    cover = re.search(cover_pattern, html).group(1) if re.search(cover_pattern, html) else None
    
    # 名称
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    name = re.search(name_pattern, html).group(1) if re.search(name_pattern, html) else None
    
    # 类别
    category_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
    categories = re.findall(category_pattern, html) if re.findall(category_pattern, html) else []
    
    # 上映时间
    published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s*上映')
    published_at = re.search(published_at_pattern, html).group(1) if re.search(published_at_pattern, html) else None
    
    # 评分
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
    score = float(re.search(score_pattern, html).group(1).strip()) if re.search(score_pattern, html) else None
    
    # 简介
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
    drama = re.search(drama_pattern, html).group(1).strip() if re.search(drama_pattern, html) else None
    
    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'score': score,
        'drama': drama
    }

4.5 数据存储

def save_data(data):
    """
    保存数据到JSON文件
    :param data: 电影数据
    """
    name = data.get('name')
    data_path = f'{RESULTS_DIR}/{name}.json'
    json.dump(data, open(data_path, 'w', encoding='utf-8'),
              ensure_ascii=False, indent=2)
    logging.info('saved data to %s', data_path)

4.6 主流程

def main(page):
    """
    主处理函数
    :param page: 页码
    """
    # 爬取列表页
    index_html = scrape_index(page)
    # 解析列表页获取详情页URL
    detail_urls = parse_index(index_html)
    # 遍历详情页
    for detail_url in detail_urls:
        # 爬取详情页
        detail_html = scrape_detail(detail_url)
        # 解析详情页
        data = parse_detail(detail_html)
        # 保存数据
        save_data(data)

4.7 多进程优化

if __name__ == '__main__':
    # 多进程爬取
    pool = Pool()
    pages = range(1, TOTAL_PAGE + 1)
    pool.map(main, pages)
    pool.close()
    pool.join()

5. 代码优化建议

  1. 异常处理增强:增加更多异常处理逻辑,提高爬虫稳定性
  2. 请求头设置:添加合理的请求头,模拟浏览器访问
  3. 请求间隔:添加随机延迟,避免被封禁
  4. 代理支持:添加代理支持,应对IP限制
  5. 数据去重:添加去重逻辑,避免重复爬取
  6. 断点续爬:记录爬取进度,支持中断后继续

6. 总结

通过本教程,我们实现了一个完整的静态网站爬虫,主要功能包括:

  1. 列表页分页爬取
  2. 详情页信息提取
  3. 数据存储为JSON文件
  4. 多进程加速爬取

这个案例涵盖了爬虫开发的基本流程,可以作为更复杂爬虫项目的基础模板。

7. 完整代码

import json
from os import makedirs
from os.path import exists
import requests
import logging
import re
from urllib.parse import urljoin
import multiprocessing

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s: %(message)s')

BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10

RESULTS_DIR = 'results'
if not exists(RESULTS_DIR):
    makedirs(RESULTS_DIR)


def scrape_page(url):
    logging.info('scraping %s...', url)
    try:
        # 增加 timeout 防止死等,导致进程卡死不生成文件
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            return response.text
        logging.error('get invalid status code %s while scraping %s',
                      response.status_code, url)
    except requests.RequestException:
        logging.error('error occurred while scraping %s', url, exc_info=True)


def scrape_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)


def parse_index(html):
    if not html: return []  # 增加空值保护
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern, html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL, item)
        logging.info('get detail url %s', detail_url)
        yield detail_url


def scrape_detail(url):
    return scrape_page(url)


def parse_detail(html):
    if not html: return None  # 如果没拿到详情页内容直接返回

    # 修复核心:使用 r'' 前缀解决 SyntaxWarning: invalid escape sequence '\d'
    cover_pattern = re.compile(r'class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
    name_pattern = re.compile(r'<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile(r'<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
    published_at_pattern = re.compile(r'(\d{4}-\d{2}-\d{2})\s?上映')  # 这里是你的报错点
    drama_pattern = re.compile(r'<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
    score_pattern = re.compile(r'<p.*?score.*?>(.*?)</p>', re.S)

    # 优化提取逻辑,确保即使部分字段缺失也能返回已有的数据
    name_match = re.search(name_pattern, html)
    name = name_match.group(1).strip() if name_match else "未命名电影"  # 确保有名字作为文件名

    cover_match = re.search(cover_pattern, html)
    cover = cover_match.group(1).strip() if cover_match else None

    categories = re.findall(categories_pattern, html)

    pub_match = re.search(published_at_pattern, html)
    published_at = pub_match.group(1) if pub_match else None

    drama_match = re.search(drama_pattern, html)
    drama = drama_match.group(1).strip() if drama_match else None

    score_match = re.search(score_pattern, html)
    score = float(score_match.group(1).strip()) if score_match else None

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }


def save_data(data):
    if not data: return
    name = data.get('name')
    # 修复:清洗文件名,防止电影名包含非法字符导致保存失败
    safe_name = re.sub(r'[\\/:*?"<>|]', '_', name)
    data_path = f'{RESULTS_DIR}/{safe_name}.json'

    # 修复:使用 with open 确保文件正确关闭和写入
    try:
        with open(data_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    except Exception as e:
        logging.error('Error saving file %s: %s', data_path, e)


def main(page):
    index_html = scrape_index(page)
    detail_urls = parse_index(index_html)
    for detail_url in detail_urls:
        detail_html = scrape_detail(detail_url)
        data = parse_detail(detail_html)
        if data:
            logging.info('get detail data %s', data['name'])
            save_data(data)
            logging.info('data saved successfully')


if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pages = range(1, TOTAL_PAGE + 1)
    pool.map(main, pages)
    pool.close()
    pool.join()  # 建议加上 join 确保进程池任务全部完成后主程序再退出