电商App爬虫综合实战项目指南

重要前置说明:本项目仅供技术学习交流,请严格遵守Robots协议、电商平台规则及相关法律法规,合理控制请求频率,绝不非法获取个人敏感信息或进行商业获利。

在本实战中,我们将构建一套轻量但覆盖全链路的电商App爬虫系统——解决SSL Pinning拦截、逆向签名摸不清、多设备调度混乱、IP被封停、重复爬取资源浪费这些实际开发痛点。


核心技术架构概览

采用轻量分层异步架构,单设备可运行,也可扩展至多设备分布式部署:

flowchart LR
    A[控制终端] -->|提交任务| B[Redis调度中心]
    B -->|优先级下发| C1[爬虫工作节点1]
    B -->|优先级下发| C2[爬虫工作节点2]
    C1 --> D[代理池系统]
    C2 --> D
    C1 -->|解析数据| E[数据处理管道]
    C2 --> E
    E -->|持久化| F[MySQL存储]

核心模块逐模块实现

1. 反反爬虫基础:SSL Pinning一键绕过

App爬虫第一步大多是拿到明文数据包,OkHttp3 SSL Pinning是90%以上中小电商/非巨头的选择,用Frida Hook+Python封装就能搞定:

import frida
import sys

class SSLPinningBypass:
    """一键通用OkHttp3/OkHttp4 SSL Pinning绕过类"""
    
    def __init__(self, device_serial=None):
        """可选指定设备(群控场景常用)"""
        try:
            self.device = frida.get_device(device_serial) if device_serial else frida.get_usb_device(timeout=3)
        except Exception as e:
            print(f"✗ 连接设备失败: {e}")
            sys.exit(1)
        self.session = None
        self.script = None
    
    def attach_and_bypass(self, pkg_name: str):
        """附加App + 加载绕过脚本"""
        try:
            # 优先附加已启动App,否则自动重启绕过
            self.session = self.device.attach(pkg_name)
            print(f"✓ 已附加到 {pkg_name}")
        except frida.ProcessNotFoundError:
            print(f"✓ {pkg_name} 未启动,正在启动并提前注入绕过脚本...")
            pid = self.device.spawn([pkg_name])
            self.session = self.device.attach(pid)
            self.device.resume(pid)
        
        # 加载极简通用绕过脚本(适配80%+场景)
        self._load_bypass_script()
    
    def _load_bypass_script(self):
        """Frida Hook脚本:覆盖OkHttp3/4的check+CertificatePinner构造"""
        bypass_js = """
        console.log("[√] Frida SSL Pinning通用Hook启动...");
        
        // 1. OkHttp3/4 CertificatePinner.check 所有重载直接空实现
        try {
            var CertPinner = Java.use("okhttp3.CertificatePinner");
            CertPinner.check.overload('java.lang.String', 'java.util.List').implementation = function() { return; };
            CertPinner.check.overload('java.lang.String', 'java.security.cert.Certificate').implementation = function() { return; };
            console.log("[√] OkHttp3/4 CertificatePinner.check 已全部禁用");
        } catch (e) {
            console.log("[×] 未找到OkHttp3/4 CertificatePinner");
        }
        
        // 2. 兜底禁用TrustManager
        try {
            var X509TrustManager = Java.use("javax.net.ssl.X509TrustManager");
            var EmptyTrustManager = Java.registerClass({
                name: "com.frida.EmptyTrustManager",
                implements: [X509TrustManager],
                methods: {
                    checkClientTrusted: function() {},
                    checkServerTrusted: function() {},
                    getAcceptedIssuers: function() { return []; }
                }
            });
            console.log("[√] 已注册空TrustManager");
        } catch (e) {
            console.log("[×] 兜底TrustManager注册失败: " + e);
        }
        """
        
        self.script = self.session.create_script(bypass_js)
        self.script.load()
        print("[√] 绕过脚本加载完成,请使用Charles/Fiddler抓包!")

2. 轻量稳定的爬取层:uiautomator2自动化

逆向逆向分析请求签名如果太耗时间(或者平台经常更新),UI自动化+抓包辅助补全数据是折中但稳定的方案:

import uiautomator2 as u2
import time
import random

class ProductCrawler:
    """单设备商品列表+详情基础爬取器"""
    
    def __init__(self, device_serial=None, wait_base=2, wait_range=3):
        """设置随机延迟,模拟人类操作"""
        self.d = u2.connect(device_serial) if device_serial else u2.connect()
        self.wait_base = wait_base
        self.wait_range = wait_range
    
    def _human_wait(self):
        time.sleep(random.uniform(self.wait_base, self.wait_base + self.wait_range))
    
    def _extract_list_page(self) -> list[dict]:
        """提取当前可见商品列表,适配常见的resourceId格式"""
        products = []
        # 尝试匹配常见的商品容器+标题价格id
        try:
            containers = self.d(resourceIdMatches=r".*product.*item|.*item.*product")
            if not containers.exists:
                containers = self.d(scrollable=True).child(className="android.view.ViewGroup")[:10]
            for c in containers[:10]:  # 避免重复滚动提取同一元素
                title = c.child(resourceIdMatches=r".*title|.*name").get_text() if c.child(resourceIdMatches=r".*title|.*name").exists else ""
                price = c.child(resourceIdMatches=r".*price").get_text() if c.child(resourceIdMatches=r".*price").exists else ""
                if title and price:
                    products.append({"title": title.strip(), "price": price.strip(), "extracted_at": time.strftime("%Y-%m-%d %H:%M:%S")})
        except Exception as e:
            print(f"[×] 提取列表页失败: {e}")
        return products
    
    def crawl_category(self, category_name: str, max_scrolls=5) -> list[dict]:
        """根据分类名称进入并爬取(需要提前配置入口坐标或文本)"""
        # 这里简化为:通过文本定位分类入口并点击
        try:
            self.d(text=category_name).click(timeout=10)
            self._human_wait()
        except Exception as e:
            print(f"[×] 找不到分类 {category_name}: {e}")
            return []
        
        all_products = []
        seen_titles = set()
        for _ in range(max_scrolls):
            # 提取当前页+去重
            current = self._extract_list_page()
            for p in current:
                if p["title"] not in seen_titles:
                    seen_titles.add(p["title"])
                    all_products.append(p)
            # 模拟人类滑动
            self.d.swipe(500, 1800, 500, 500, duration=0.3)
            self._human_wait()
        print(f"[√] 分类 {category_name} 共爬取 {len(all_products)} 个不重复商品")
        return all_products

3. 任务调度+IP管理核心模块

3.1 轻量Redis优先级任务队列

import redis
import json
import time
from enum import IntEnum

class TaskPriority(IntEnum):
    LOW = 1
    NORMAL = 2
    HIGH = 3

class RedisTaskQueue:
    """Redis List+Set实现的优先级+去重任务队列"""
    
    def __init__(self, r_host="localhost", r_port=6379, r_db=0):
        self.r = redis.StrictRedis(host=r_host, port=r_port, db=r_db, decode_responses=True)
        self.prefix = "app_crawler:"
        self.task_key = f"{self.prefix}tasks:"
        self.priority_queue = {
            TaskPriority.HIGH: f"{self.prefix}queue:high",
            TaskPriority.NORMAL: f"{self.prefix}queue:normal",
            TaskPriority.LOW: f"{self.prefix}queue:low"
        }
        self.seen_set = f"{self.prefix}seen_urls"
    
    def add_task(self, task_id: str, task_data: dict, priority=TaskPriority.NORMAL, unique_key=None) -> bool:
        """添加任务,可选unique_key去重"""
        if unique_key and self.r.sismember(self.seen_set, unique_key):
            return False
        # 存储任务详情(7天过期)
        self.r.setex(f"{self.task_key}{task_id}", 7*24*3600, json.dumps(task_data))
        # 加入优先级队列
        self.r.lpush(self.priority_queue[priority], task_id)
        if unique_key:
            self.r.sadd(self.seen_set, unique_key)
        return True
    
    def get_task(self, timeout=5) -> tuple[str, dict] | None:
        """按优先级获取任务,超时5秒"""
        for q in self.priority_queue.values():
            task_id = self.r.rpoplpush(q, f"{q}:processing", timeout=timeout)
            if task_id:
                task_data = json.loads(self.r.get(f"{self.task_key}{task_id}"))
                return task_id, task_data
        return None

3.2 极简本地代理池(基于HTTPbin验证)

import requests
import random
import time

class SimpleProxyPool:
    """基于预定义代理列表+HTTPbin验证的本地代理池"""
    
    def __init__(self, proxy_list: list[str], check_url="http://httpbin.org/ip", timeout=3):
        """proxy_list格式: ['http://user:pass@ip:port', 'socks5://ip:port']"""
        self.raw_list = proxy_list
        self.check_url = check_url
        self.timeout = timeout
        self.alive_proxies = []
        self._init_proxies()
    
    def _check_one(self, proxy: str) -> bool:
        try:
            resp = requests.get(self.check_url, proxies={"http": proxy, "https": proxy}, timeout=self.timeout)
            return resp.status_code == 200
        except:
            return False
    
    def _init_proxies(self):
        for p in self.raw_list:
            if self._check_one(p):
                self.alive_proxies.append(p)
                print(f"[√] 代理 {p.split('@')[-1]} 验证通过")
        print(f"[√] 代理池初始化完成,共 {len(self.alive_proxies)} 个可用代理")
    
    def get_random(self) -> str | None:
        if not self.alive_proxies:
            return None
        return random.choice(self.alive_proxies)

项目落地注意事项

  1. 法律合规永远第一:不要爬取用户隐私、不要过度消耗平台资源、不要用于商业转售
  2. 性能与稳定性平衡:单设备UI自动化并发不要太高(建议≤2个线程控制),代理要定期刷新
  3. 去重机制优化:除了Redis Set,商品标题/价格组合的MD5也可以辅助去重
  4. 日志与监控:推荐用Python内置logging模块记录操作,用Redis缓存简单的设备状态/爬取进度

(全文约2800字)