#Python3 爬虫实战教程:静态网站全站爬取
#1. 前言
本教程将带领你完成一个完整的静态网站爬取项目,涵盖从数据抓取到存储的全流程。通过这个案例,你将学会:
- 使用 requests 库抓取网页内容
- 使用正则表达式解析 HTML
- 实现分页爬取逻辑
- 数据存储到 JSON 文件
- 使用多进程加速爬取
#2. 技术准备
在开始之前,请确保:
- Python 3.6+ 环境已安装
- 安装必要库:
pip install requests
#3. 目标网站分析
我们将爬取的目标网站是:https://ssr1.scrape.center/
#3.1 网站结构
- 列表页:显示电影列表,支持分页
- 详情页:显示单个电影的详细信息
#3.2 需要抓取的数据
- 电影名称
- 封面图片
- 类别
- 上映时间
- 评分
- 剧情简介
#4. 爬虫实现
#4.1 基础配置
import requests
import logging
import re
from urllib.parse import urljoin
import json
import os
from multiprocessing import Pool
# 日志配置
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量定义
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
os.makedirs(RESULTS_DIR, exist_ok=True)#4.2 通用爬取方法
def scrape_page(url):
"""
通用页面爬取方法
:param url: 目标URL
:return: HTML内容
"""
logging.info('scraping %s...', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid status code %s while scraping %s',
response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)#4.3 列表页处理
def scrape_index(page):
"""
爬取列表页
:param page: 页码
:return: 列表页HTML
"""
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
def parse_index(html):
"""
解析列表页,获取详情页URL
:param html: 列表页HTML
:return: 详情页URL生成器
"""
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern, html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url#4.4 详情页处理
def scrape_detail(url):
"""
爬取详情页
:param url: 详情页URL
:return: 详情页HTML
"""
return scrape_page(url)
def parse_detail(html):
"""
解析详情页
:param html: 详情页HTML
:return: 电影信息字典
"""
# 封面
cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
cover = re.search(cover_pattern, html).group(1) if re.search(cover_pattern, html) else None
# 名称
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
name = re.search(name_pattern, html).group(1) if re.search(name_pattern, html) else None
# 类别
category_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
categories = re.findall(category_pattern, html) if re.findall(category_pattern, html) else []
# 上映时间
published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s*上映')
published_at = re.search(published_at_pattern, html).group(1) if re.search(published_at_pattern, html) else None
# 评分
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)
score = float(re.search(score_pattern, html).group(1).strip()) if re.search(score_pattern, html) else None
# 简介
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
drama = re.search(drama_pattern, html).group(1).strip() if re.search(drama_pattern, html) else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'score': score,
'drama': drama
}#4.5 数据存储
def save_data(data):
"""
保存数据到JSON文件
:param data: 电影数据
"""
name = data.get('name')
data_path = f'{RESULTS_DIR}/{name}.json'
json.dump(data, open(data_path, 'w', encoding='utf-8'),
ensure_ascii=False, indent=2)
logging.info('saved data to %s', data_path)#4.6 主流程
def main(page):
"""
主处理函数
:param page: 页码
"""
# 爬取列表页
index_html = scrape_index(page)
# 解析列表页获取详情页URL
detail_urls = parse_index(index_html)
# 遍历详情页
for detail_url in detail_urls:
# 爬取详情页
detail_html = scrape_detail(detail_url)
# 解析详情页
data = parse_detail(detail_html)
# 保存数据
save_data(data)#4.7 多进程优化
if __name__ == '__main__':
# 多进程爬取
pool = Pool()
pages = range(1, TOTAL_PAGE + 1)
pool.map(main, pages)
pool.close()
pool.join()#5. 代码优化建议
- 异常处理增强:增加更多异常处理逻辑,提高爬虫稳定性
- 请求头设置:添加合理的请求头,模拟浏览器访问
- 请求间隔:添加随机延迟,避免被封禁
- 代理支持:添加代理支持,应对IP限制
- 数据去重:添加去重逻辑,避免重复爬取
- 断点续爬:记录爬取进度,支持中断后继续
#6. 总结
通过本教程,我们实现了一个完整的静态网站爬虫,主要功能包括:
- 列表页分页爬取
- 详情页信息提取
- 数据存储为JSON文件
- 多进程加速爬取
这个案例涵盖了爬虫开发的基本流程,可以作为更复杂爬虫项目的基础模板。
#7. 完整代码
import json
from os import makedirs
from os.path import exists
import requests
import logging
import re
from urllib.parse import urljoin
import multiprocessing
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULTS_DIR = 'results'
if not exists(RESULTS_DIR):
makedirs(RESULTS_DIR)
def scrape_page(url):
logging.info('scraping %s...', url)
try:
# 增加 timeout 防止死等,导致进程卡死不生成文件
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.text
logging.error('get invalid status code %s while scraping %s',
response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
def parse_index(html):
if not html: return [] # 增加空值保护
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern, html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL, item)
logging.info('get detail url %s', detail_url)
yield detail_url
def scrape_detail(url):
return scrape_page(url)
def parse_detail(html):
if not html: return None # 如果没拿到详情页内容直接返回
# 修复核心:使用 r'' 前缀解决 SyntaxWarning: invalid escape sequence '\d'
cover_pattern = re.compile(r'class="item.*?<img.*?src="(.*?)".*?class="cover">', re.S)
name_pattern = re.compile(r'<h2.*?>(.*?)</h2>')
categories_pattern = re.compile(r'<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
published_at_pattern = re.compile(r'(\d{4}-\d{2}-\d{2})\s?上映') # 这里是你的报错点
drama_pattern = re.compile(r'<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
score_pattern = re.compile(r'<p.*?score.*?>(.*?)</p>', re.S)
# 优化提取逻辑,确保即使部分字段缺失也能返回已有的数据
name_match = re.search(name_pattern, html)
name = name_match.group(1).strip() if name_match else "未命名电影" # 确保有名字作为文件名
cover_match = re.search(cover_pattern, html)
cover = cover_match.group(1).strip() if cover_match else None
categories = re.findall(categories_pattern, html)
pub_match = re.search(published_at_pattern, html)
published_at = pub_match.group(1) if pub_match else None
drama_match = re.search(drama_pattern, html)
drama = drama_match.group(1).strip() if drama_match else None
score_match = re.search(score_pattern, html)
score = float(score_match.group(1).strip()) if score_match else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def save_data(data):
if not data: return
name = data.get('name')
# 修复:清洗文件名,防止电影名包含非法字符导致保存失败
safe_name = re.sub(r'[\\/:*?"<>|]', '_', name)
data_path = f'{RESULTS_DIR}/{safe_name}.json'
# 修复:使用 with open 确保文件正确关闭和写入
try:
with open(data_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error('Error saving file %s: %s', data_path, e)
def main(page):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
if data:
logging.info('get detail data %s', data['name'])
save_data(data)
logging.info('data saved successfully')
if __name__ == '__main__':
pool = multiprocessing.Pool()
pages = range(1, TOTAL_PAGE + 1)
pool.map(main, pages)
pool.close()
pool.join() # 建议加上 join 确保进程池任务全部完成后主程序再退出
