电商App商品滑动抓取项目

本文提供一套无Root、轻量易部署的Android电商App滑动采集方案:基于谷歌UI测试工具的Python封装uiautomator2,比Appium配置更快、触发风控概率更低;用适配器模式快速适配多平台;内置SQLite持久化和pandas/matplotlib的基础分析,核心代码可直接运行。

⚠️ 法律/合规声明:本项目仅用于个人技术学习/研究,严禁批量爬取未授权平台数据,务必遵守各平台的《用户协议》《隐私政策》及相关法律法规,合理控制采集频率、单次/总采集量!


1. 核心架构:三模块滑动采集引擎

我们将功能拆分为「配置管理」「SQLite轻存储」「UI交互与提取」三个低耦合模块,适合快速迭代。

为什么优先选uiautomator2?

方案对比API采集Appiumuiautomator2
配置复杂度中(需逆向/抓合法token)高(需Node+Server)低(一行pip安装+init)
风控触发概率高(接口有严格加密/校验)中(自动化特征明显)低(模拟原生点击滑动)
通用性弱(平台API全换则重写)中(兼容多系统但慢)中(仅Android但通用UI)
滑动响应速度快(无UI渲染)慢(跨进程调用)快(直接驱动Android系统UI)

精简版核心代码

删除了冗余字段(如满减券、用户评价等),适配绝大多数电商App的基础商品列表场景。

# core_scraper.py
import uiautomator2 as u2
import time
import random
import json
import sqlite3
import re
import logging
from dataclasses import dataclass
from typing import Optional, Dict, List
from datetime import datetime

# ---------------------------
# 1. 日志与配置
# ---------------------------
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler("scraper.log", encoding="utf-8"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

@dataclass
class ScrapingConfig:
    app_package: str = "com.taobao.taobao"
    category_keywords: List[str] = None
    max_products_per_category: int = 20
    scroll_interval: float = 2.8  # 模拟真实用户浏览间隔(随机±0.5)
    max_retry_no_new: int = 5      # 连续N次无新商品则停止该分类
    
    def __post_init__(self):
        if not self.category_keywords:
            self.category_keywords = ["平价手机壳", "入门机械键盘"]

# ---------------------------
# 2. SQLite本地存储
# ---------------------------
class EcommerceDB:
    def __init__(self, path: str = "ecommerce.db"):
        self.path = path
        self._init_tables()
    
    def _init_tables(self):
        with sqlite3.connect(self.path) as conn:
            cursor = conn.cursor()
            # 商品表(临时ID防重,其他字段只留核心)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    temp_id TEXT UNIQUE,
                    title TEXT,
                    price REAL,
                    sales_count INTEGER,
                    shop_name TEXT,
                    category TEXT,
                    crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            # 采集会话日志
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS crawl_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    category TEXT,
                    products_crawled INTEGER,
                    duration_seconds INTEGER,
                    started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            conn.commit()
        logger.info("✅ 本地数据库初始化完成")
    
    def save_product(self, p: Dict):
        with sqlite3.connect(self.path) as conn:
            try:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT OR IGNORE INTO products
                    (temp_id, title, price, sales_count, shop_name, category)
                    VALUES (?, ?, ?, ?, ?, ?)
                ''', (
                    p['temp_id'], p['title'][:120], p['price'], 
                    p['sales_count'], p['shop_name'][:60], p['category']
                ))
                conn.commit()
            except Exception as e:
                logger.warning(f"⚠️ 保存商品失败: {e}")
    
    def save_session_log(self, log: Dict):
        with sqlite3.connect(self.path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO crawl_logs
                (category, products_crawled, duration_seconds)
                VALUES (?, ?, ?)
            ''', (log['category'], log['count'], log['duration']))
            conn.commit()

# ---------------------------
# 3. UI交互与提取
# ---------------------------
class EcommerceScraper:
    def __init__(self, config: ScrapingConfig = None):
        self.config = config or ScrapingConfig()
        self.db = EcommerceDB()
        self.d = None
        self._connect_device()
    
    def _connect_device(self):
        """自动连接已开启USB调试的Android设备"""
        try:
            self.d = u2.connect()
            logger.info(f"✅ 设备连接成功: 序列号 {self.d.serial}")
            self.d.app_start("com.github.uiautomator", stop=True)
            time.sleep(3)
        except Exception as e:
            logger.error(f"❌ 设备连接失败: {e}\n请检查USB调试、授权状态和驱动")
            exit(1)
    
    def launch_app_and_search(self, keyword: str) -> bool:
        """启动目标App并搜索指定关键词"""
        try:
            self.d.app_start(self.config.app_package, stop=True)
            logger.info(f"⏳ 等待应用完全启动...")
            time.sleep(7 + random.uniform(0, 2))
            
            # 适配主流平台的搜索框(优先资源ID,其次文本/描述)
            search_box = None
            for rid in [
                "com.taobao.taobao:id/searchbar_hint_view",
                "com.jingdong.app.mall:id/search_widget_text",
                "com.xunmeng.pinduoduo:id/tv_search"
            ]:
                if self.d(resourceId=rid).exists(timeout=2):
                    search_box = self.d(resourceId=rid)
                    break
            if not search_box:
                search_box = self.d(descriptionMatches=r'^搜索.*$|^Search.*$')
            if not search_box:
                search_box = self.d(textMatches=r'^搜索.*$|^Search.*$')
            if not search_box or not search_box.click_exists(timeout=2):
                logger.error(f"❌ 未找到可用搜索框")
                return False
            
            time.sleep(1.2 + random.uniform(0, 1))
            self.d.clear_text()
            time.sleep(0.5)
            self.d.send_keys(keyword, clear=False)
            time.sleep(0.8)
            
            # 触发搜索(优先点击按钮,其次系统搜索键)
            if not self.d(textMatches=r'^搜索$|^Search$').click_exists(timeout=2):
                self.d.press("search")
            time.sleep(5 + random.uniform(0, 2))
            logger.info(f"✅ 搜索成功: {keyword}")
            return True
        except Exception as e:
            logger.error(f"❌ 启动或搜索失败: {e}")
            return False
    
    def _simulate_scroll_down(self) -> bool:
        """模拟真实用户的上滑浏览(带随机偏移)"""
        try:
            w, h = self.d.window_size()
            start_x = w//2 + random.randint(-30, 30)
            start_y = int(h*0.78 + random.randint(-20, 20))
            end_x = w//2 + random.randint(-30, 30)
            end_y = int(h*0.22 + random.randint(-20, 20))
            duration = 0.6 + random.uniform(0, 0.3)
            self.d.swipe(start_x, start_y, end_x, end_y, duration)
            time.sleep(self.config.scroll_interval + random.uniform(-0.5, 0.5))
            return True
        except Exception as e:
            logger.warning(f"⚠️ 模拟滑动失败: {e}")
            return False
    
    def _extract_single_product(self, container, category: str) -> Optional[Dict]:
        """从单个UI容器中提取商品核心信息"""
        try:
            temp_id = f"{category}_{int(time.time()*1000)}_{random.randint(1000,9999)}"
            title = ""
            price = 0.0
            sales = 0
            shop = ""
            
            # 提取标题(优先找长文本非价格的控件)
            for tv in container(className="android.widget.TextView"):
                text = tv.get_text().strip()
                if len(text) > 8 and not text.startswith(("¥", "¥", "$", "€")):
                    title = text
                    break
            
            # 提取价格、销量、店铺名(暂时用dump_hierarchy正则快速适配通用场景)
            hierarchy = container.dump_hierarchy()
            price_match = re.search(r'[¥¥](\d{1,6}\.?\d{0,2})', hierarchy)
            if price_match:
                price = float(price_match.group(1))
            
            sales_match = re.search(r'(\d+(?:\.\d+)?)(?:|)?(?:人付款|销量|已拼)', hierarchy)
            if sales_match:
                num = sales_match.group(1)
                unit = sales_match.group(0)[len(num):]
                sales = int(float(num) * (10000 if "万" in unit else 1000 if "千" in unit else 1))
            
            shop_match = re.search(r'([^\n]{2,40}?(?:旗舰店|专卖店|专营店|自营|官方))', hierarchy)
            if shop_match:
                shop = shop_match.group(1).strip()
            
            if price == 0.0:  # 过滤无效占位商品
                return None
            
            return {
                "temp_id": temp_id,
                "title": title,
                "price": price,
                "sales_count": sales,
                "shop_name": shop,
                "category": category
            }
        except Exception as e:
            logger.debug(f"🔍 提取商品细节失败: {e}")
            return None
    
    def scrape_single_category(self, category: str) -> int:
        """采集单个分类的商品"""
        start_time = time.time()
        if not self.launch_app_and_search(category):
            return 0
        
        count = 0
        retry_no_new = 0
        seen_bounds = set()
        
        while count < self.config.max_products_per_category and retry_no_new < self.config.max_retry_no_new:
            # 获取当前屏幕的所有常用商品容器
            containers = (
                self.d(className="android.widget.RelativeLayout").all()
                + self.d(className="android.widget.LinearLayout").all()
                + self.d(className="androidx.recyclerview.widget.RecyclerView").child().all()
            )
            new_found = False
            
            for c in containers:
                if count >= self.config.max_products_per_category:
                    break
                try:
                    # 按容器边界去重,过滤太小的无效控件
                    bounds = c.bounds()
                    b_key = (bounds['left'], bounds['top'], bounds['right'], bounds['bottom'])
                    if b_key in seen_bounds or bounds['bottom'] - bounds['top'] < 80:
                        continue
                    seen_bounds.add(b_key)
                    
                    product = self._extract_single_product(c, category)
                    if product:
                        self.db.save_product(product)
                        count += 1
                        new_found = True
                        logger.info(f"📦 已采集 {count}/{self.config.max_products_per_category}: {product['title'][:20]}...")
                except Exception as e:
                    logger.debug(f"🔄 处理UI容器失败: {e}")
            
            if not new_found:
                retry_no_new += 1
                logger.warning(f"⚠️ 未发现新商品,剩余重试次数: {self.config.max_retry_no_new - retry_no_new}")
                time.sleep(1.5)
            else:
                retry_no_new = 0
            
            if count < self.config.max_products_per_category:
                self._simulate_scroll_down()
        
        duration = int(time.time() - start_time)
        self.db.save_session_log({"category": category, "count": count, "duration": duration})
        logger.info(f"🏁 分类 {category} 采集结束: 共 {count} 件, 耗时 {duration} 秒")
        return count
    
    def run_full_session(self):
        """运行完整的多分类采集会话"""
        logger.info("🚀 开始多分类采集会话")
        total = 0
        for i, cat in enumerate(self.config.category_keywords):
            total += self.scrape_single_category(cat)
            if i < len(self.config.category_keywords) - 1:
                rest_time = random.uniform(18, 35)
                logger.info(f"😴 休息 {rest_time:.1f} 秒,避免频繁操作...")
                time.sleep(rest_time)
        logger.info(f"🎉 会话结束: 总计采集 {total} 件商品")

if __name__ == "__main__":
    # 修改这里的配置即可运行
    custom_config = ScrapingConfig(
        category_keywords=["便携保温杯", "百元蓝牙耳机"],
        max_products_per_category=12
    )
    scraper = EcommerceScraper(custom_config)
    scraper.run_full_session()

2. 数据增值:1分钟快速看数据

采集到的SQLite数据,用pandas和matplotlib做基础分析和可视化(已解决中文乱码问题)。

# quick_analytics.py
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict

# ---------------------------
# 全局配置(解决中文乱码)
# ---------------------------
plt.rcParams['font.sans-serif'] = ['SimHei']  # Windows
# plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']  # macOS
# plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei']  # Linux
plt.rcParams['axes.unicode_minus'] = False

class QuickAnalytics:
    def __init__(self, db_path: str = "ecommerce.db"):
        self.db_path = db_path
    
    def _load_products(self) -> pd.DataFrame:
        """从SQLite加载商品数据"""
        with sqlite3.connect(self.db_path) as conn:
            df = pd.read_sql_query("SELECT * FROM products", conn)
        return df
    
    def get_basic_stats(self) -> Dict:
        """获取基础统计信息"""
        df = self._load_products()
        if df.empty:
            return {"msg": "⚠️ 数据库中暂无商品数据"}
        return {
            "总采集商品数": len(df),
            "商品均价(元)": round(df['price'].mean(), 2),
            "商品最高单价(元)": round(df['price'].max(), 2),
            "商品最低单价(元)": round(df['price'].min(), 2),
            "采集商品最多的分类": df['category'].value_counts().idxmax(),
            "各分类采集数": df['category'].value_counts().to_dict()
        }
    
    def plot_price_by_category(self):
        """绘制各分类的价格箱线图"""
        df = self._load_products()
        if df.empty:
            return
        plt.figure(figsize=(10, 6))
        sns.boxplot(x='category', y='price', data=df, palette='pastel')
        plt.title('各分类商品价格分布(箱线图)')
        plt.xlabel('商品分类')
        plt.ylabel('价格(元)')
        plt.tight_layout()
        plt.savefig('price_by_category.png', dpi=300)
        plt.show()
        print("📈 各分类价格分布图已保存为 price_by_category.png")

if __name__ == "__main__":
    analytics = QuickAnalytics()
    stats = analytics.get_basic_stats()
    print("📊 快速统计报告:\n", json.dumps(stats, ensure_ascii=False, indent=4))
    analytics.plot_price_by_category()

3. 快速部署指南

环境准备

  1. 硬件软件:一台Windows/macOS/Linux电脑,一台已开启「USB调试」(开发者选项)的Android手机/模拟器
  2. Python环境:Python 3.8+(推荐3.9-3.11,兼容性更好)
  3. 依赖安装
    # 安装核心依赖
    pip install uiautomator2 pandas matplotlib seaborn
    # 首次运行需在手机上安装ATX辅助服务
    python -m uiautomator2 init

运行步骤

  1. 连接设备:用USB线连接手机,在手机上允许「USB调试授权」,运行adb devices(可选)确认设备已识别
  2. 修改配置:打开core_scraper.py,修改custom_config中的app_packagecategory_keywordsmax_products_per_category
  3. 执行采集
    python core_scraper.py
  4. 查看分析:采集完成后运行
    python quick_analytics.py

4. 简单的反自动化小技巧(可选)

  1. 随机调整滑动参数:代码中已加入滑动起点终点偏移、间隔随机
  2. 偶尔模拟停顿/点错:在_simulate_scroll_down中偶尔加0.5-1.5秒的小停顿
  3. 修改ATX服务包名:可以逆向或重打包ATX服务,避免部分平台直接检测
  4. 控制总采集时长:单次连续采集不建议超过1小时,分时段进行