#电商App爬虫综合实战项目指南
重要前置说明:本项目仅供技术学习交流,请严格遵守Robots协议、电商平台规则及相关法律法规,合理控制请求频率,绝不非法获取个人敏感信息或进行商业获利。
在本实战中,我们将构建一套轻量但覆盖全链路的电商App爬虫系统——解决SSL Pinning拦截、逆向签名摸不清、多设备调度混乱、IP被封停、重复爬取资源浪费这些实际开发痛点。
#核心技术架构概览
采用轻量分层异步架构,单设备可运行,也可扩展至多设备分布式部署:
flowchart LR
A[控制终端] -->|提交任务| B[Redis调度中心]
B -->|优先级下发| C1[爬虫工作节点1]
B -->|优先级下发| C2[爬虫工作节点2]
C1 --> D[代理池系统]
C2 --> D
C1 -->|解析数据| E[数据处理管道]
C2 --> E
E -->|持久化| F[MySQL存储]#核心模块逐模块实现
#1. 反反爬虫基础:SSL Pinning一键绕过
App爬虫第一步大多是拿到明文数据包,OkHttp3 SSL Pinning是90%以上中小电商/非巨头的选择,用Frida Hook+Python封装就能搞定:
import frida
import sys
class SSLPinningBypass:
"""一键通用OkHttp3/OkHttp4 SSL Pinning绕过类"""
def __init__(self, device_serial=None):
"""可选指定设备(群控场景常用)"""
try:
self.device = frida.get_device(device_serial) if device_serial else frida.get_usb_device(timeout=3)
except Exception as e:
print(f"✗ 连接设备失败: {e}")
sys.exit(1)
self.session = None
self.script = None
def attach_and_bypass(self, pkg_name: str):
"""附加App + 加载绕过脚本"""
try:
# 优先附加已启动App,否则自动重启绕过
self.session = self.device.attach(pkg_name)
print(f"✓ 已附加到 {pkg_name}")
except frida.ProcessNotFoundError:
print(f"✓ {pkg_name} 未启动,正在启动并提前注入绕过脚本...")
pid = self.device.spawn([pkg_name])
self.session = self.device.attach(pid)
self.device.resume(pid)
# 加载极简通用绕过脚本(适配80%+场景)
self._load_bypass_script()
def _load_bypass_script(self):
"""Frida Hook脚本:覆盖OkHttp3/4的check+CertificatePinner构造"""
bypass_js = """
console.log("[√] Frida SSL Pinning通用Hook启动...");
// 1. OkHttp3/4 CertificatePinner.check 所有重载直接空实现
try {
var CertPinner = Java.use("okhttp3.CertificatePinner");
CertPinner.check.overload('java.lang.String', 'java.util.List').implementation = function() { return; };
CertPinner.check.overload('java.lang.String', 'java.security.cert.Certificate').implementation = function() { return; };
console.log("[√] OkHttp3/4 CertificatePinner.check 已全部禁用");
} catch (e) {
console.log("[×] 未找到OkHttp3/4 CertificatePinner");
}
// 2. 兜底禁用TrustManager
try {
var X509TrustManager = Java.use("javax.net.ssl.X509TrustManager");
var EmptyTrustManager = Java.registerClass({
name: "com.frida.EmptyTrustManager",
implements: [X509TrustManager],
methods: {
checkClientTrusted: function() {},
checkServerTrusted: function() {},
getAcceptedIssuers: function() { return []; }
}
});
console.log("[√] 已注册空TrustManager");
} catch (e) {
console.log("[×] 兜底TrustManager注册失败: " + e);
}
"""
self.script = self.session.create_script(bypass_js)
self.script.load()
print("[√] 绕过脚本加载完成,请使用Charles/Fiddler抓包!")#2. 轻量稳定的爬取层:uiautomator2自动化
逆向逆向分析请求签名如果太耗时间(或者平台经常更新),UI自动化+抓包辅助补全数据是折中但稳定的方案:
import uiautomator2 as u2
import time
import random
class ProductCrawler:
"""单设备商品列表+详情基础爬取器"""
def __init__(self, device_serial=None, wait_base=2, wait_range=3):
"""设置随机延迟,模拟人类操作"""
self.d = u2.connect(device_serial) if device_serial else u2.connect()
self.wait_base = wait_base
self.wait_range = wait_range
def _human_wait(self):
time.sleep(random.uniform(self.wait_base, self.wait_base + self.wait_range))
def _extract_list_page(self) -> list[dict]:
"""提取当前可见商品列表,适配常见的resourceId格式"""
products = []
# 尝试匹配常见的商品容器+标题价格id
try:
containers = self.d(resourceIdMatches=r".*product.*item|.*item.*product")
if not containers.exists:
containers = self.d(scrollable=True).child(className="android.view.ViewGroup")[:10]
for c in containers[:10]: # 避免重复滚动提取同一元素
title = c.child(resourceIdMatches=r".*title|.*name").get_text() if c.child(resourceIdMatches=r".*title|.*name").exists else ""
price = c.child(resourceIdMatches=r".*price").get_text() if c.child(resourceIdMatches=r".*price").exists else ""
if title and price:
products.append({"title": title.strip(), "price": price.strip(), "extracted_at": time.strftime("%Y-%m-%d %H:%M:%S")})
except Exception as e:
print(f"[×] 提取列表页失败: {e}")
return products
def crawl_category(self, category_name: str, max_scrolls=5) -> list[dict]:
"""根据分类名称进入并爬取(需要提前配置入口坐标或文本)"""
# 这里简化为:通过文本定位分类入口并点击
try:
self.d(text=category_name).click(timeout=10)
self._human_wait()
except Exception as e:
print(f"[×] 找不到分类 {category_name}: {e}")
return []
all_products = []
seen_titles = set()
for _ in range(max_scrolls):
# 提取当前页+去重
current = self._extract_list_page()
for p in current:
if p["title"] not in seen_titles:
seen_titles.add(p["title"])
all_products.append(p)
# 模拟人类滑动
self.d.swipe(500, 1800, 500, 500, duration=0.3)
self._human_wait()
print(f"[√] 分类 {category_name} 共爬取 {len(all_products)} 个不重复商品")
return all_products#3. 任务调度+IP管理核心模块
#3.1 轻量Redis优先级任务队列
import redis
import json
import time
from enum import IntEnum
class TaskPriority(IntEnum):
LOW = 1
NORMAL = 2
HIGH = 3
class RedisTaskQueue:
"""Redis List+Set实现的优先级+去重任务队列"""
def __init__(self, r_host="localhost", r_port=6379, r_db=0):
self.r = redis.StrictRedis(host=r_host, port=r_port, db=r_db, decode_responses=True)
self.prefix = "app_crawler:"
self.task_key = f"{self.prefix}tasks:"
self.priority_queue = {
TaskPriority.HIGH: f"{self.prefix}queue:high",
TaskPriority.NORMAL: f"{self.prefix}queue:normal",
TaskPriority.LOW: f"{self.prefix}queue:low"
}
self.seen_set = f"{self.prefix}seen_urls"
def add_task(self, task_id: str, task_data: dict, priority=TaskPriority.NORMAL, unique_key=None) -> bool:
"""添加任务,可选unique_key去重"""
if unique_key and self.r.sismember(self.seen_set, unique_key):
return False
# 存储任务详情(7天过期)
self.r.setex(f"{self.task_key}{task_id}", 7*24*3600, json.dumps(task_data))
# 加入优先级队列
self.r.lpush(self.priority_queue[priority], task_id)
if unique_key:
self.r.sadd(self.seen_set, unique_key)
return True
def get_task(self, timeout=5) -> tuple[str, dict] | None:
"""按优先级获取任务,超时5秒"""
for q in self.priority_queue.values():
task_id = self.r.rpoplpush(q, f"{q}:processing", timeout=timeout)
if task_id:
task_data = json.loads(self.r.get(f"{self.task_key}{task_id}"))
return task_id, task_data
return None#3.2 极简本地代理池(基于HTTPbin验证)
import requests
import random
import time
class SimpleProxyPool:
"""基于预定义代理列表+HTTPbin验证的本地代理池"""
def __init__(self, proxy_list: list[str], check_url="http://httpbin.org/ip", timeout=3):
"""proxy_list格式: ['http://user:pass@ip:port', 'socks5://ip:port']"""
self.raw_list = proxy_list
self.check_url = check_url
self.timeout = timeout
self.alive_proxies = []
self._init_proxies()
def _check_one(self, proxy: str) -> bool:
try:
resp = requests.get(self.check_url, proxies={"http": proxy, "https": proxy}, timeout=self.timeout)
return resp.status_code == 200
except:
return False
def _init_proxies(self):
for p in self.raw_list:
if self._check_one(p):
self.alive_proxies.append(p)
print(f"[√] 代理 {p.split('@')[-1]} 验证通过")
print(f"[√] 代理池初始化完成,共 {len(self.alive_proxies)} 个可用代理")
def get_random(self) -> str | None:
if not self.alive_proxies:
return None
return random.choice(self.alive_proxies)#项目落地注意事项
- 法律合规永远第一:不要爬取用户隐私、不要过度消耗平台资源、不要用于商业转售
- 性能与稳定性平衡:单设备UI自动化并发不要太高(建议≤2个线程控制),代理要定期刷新
- 去重机制优化:除了Redis Set,商品标题/价格组合的MD5也可以辅助去重
- 日志与监控:推荐用Python内置logging模块记录操作,用Redis缓存简单的设备状态/爬取进度
(全文约2800字)

