Python 操作 MySQL 数据库完整教程

1. 前言

在现代 Web 开发和数据爬取中,关系型数据库仍然扮演着重要角色。本教程将介绍如何使用 Python 操作 MySQL 数据库,涵盖从基础连接到高级操作的完整流程。

2. 环境准备

2.1 安装 MySQL 数据库

推荐使用 Docker 快速部署 MySQL:

docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 -d mysql:8.0

2.2 安装 Python 连接库

目前主流的选择是:

  • PyMySQL:纯 Python 实现
  • mysql-connector-python:MySQL 官方驱动
  • SQLAlchemy:ORM 工具
pip install pymysql cryptography  # cryptography 用于加密连接

3. 连接数据库

3.1 基本连接

import pymysql

# 创建连接
db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='spiders',
    port=3306,
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor  # 返回字典形式结果
)

try:
    with db.cursor() as cursor:
        # 执行SQL查询
        cursor.execute("SELECT VERSION()")
        result = cursor.fetchone()
        print(f"Database version: {result['VERSION()']}")
finally:
    db.close()

3.2 使用连接池(推荐)

对于生产环境,建议使用连接池:

from dbutils.pooled_db import PooledDB

pool = PooledDB(
    creator=pymysql,
    maxconnections=10,
    mincached=2,
    host='localhost',
    user='root',
    password='123456',
    database='spiders',
    charset='utf8mb4'
)

def get_connection():
    return pool.connection()

4. 数据库操作

4.1 创建表

def create_table():
    sql = """
    CREATE TABLE IF NOT EXISTS students (
        id VARCHAR(255) NOT NULL,
        name VARCHAR(255) NOT NULL,
        age INT NOT NULL,
        gender VARCHAR(10),
        PRIMARY KEY (id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
    """
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql)
        conn.commit()

4.2 插入数据

4.2.1 基本插入

def insert_data_simple():
    sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, ('20120001', 'Bob', 20))
        conn.commit()

4.2.2 批量插入

def insert_batch():
    sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
    data = [
        ('20120002', 'Alice', 21),
        ('20120003', 'Mike', 22),
        ('20120004', 'Jane', 20)
    ]
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.executemany(sql, data)
        conn.commit()

4.3 更新数据

4.3.1 基本更新

def update_data():
    sql = "UPDATE students SET age=%s WHERE name=%s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (22, 'Bob'))
        conn.commit()

4.3.2 存在则更新,不存在则插入

def upsert_data():
    sql = """
    INSERT INTO students(id, name, age, gender) 
    VALUES(%s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE 
    name=VALUES(name), age=VALUES(age), gender=VALUES(gender)
    """
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, ('20120001', 'Bob', 23, 'male'))
        conn.commit()

4.4 删除数据

def delete_data():
    sql = "DELETE FROM students WHERE age < %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (20))
            print(f"Deleted {cursor.rowcount} rows")
        conn.commit()

4.5 查询数据

4.5.1 基本查询

def query_data():
    sql = "SELECT * FROM students WHERE age >= %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (20))
            results = cursor.fetchall()
            for row in results:
                print(row)

4.5.2 分页查询

def query_pagination(page=1, page_size=10):
    offset = (page - 1) * page_size
    sql = "SELECT * FROM students LIMIT %s OFFSET %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (page_size, offset))
            results = cursor.fetchall()
            for row in results:
                print(row)

4.5.3 流式查询(大数据量)

def query_large_data():
    sql = "SELECT * FROM students"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql)
            while True:
                row = cursor.fetchone()
                if not row:
                    break
                print(row)

5. 事务处理

def transfer_money(from_id, to_id, amount):
    try:
        with get_connection() as conn:
            with conn.cursor() as cursor:
                # 检查账户余额
                cursor.execute("SELECT balance FROM accounts WHERE id=%s", (from_id,))
                from_balance = cursor.fetchone()['balance']
                if from_balance < amount:
                    raise ValueError("Insufficient balance")
                
                # 扣款
                cursor.execute(
                    "UPDATE accounts SET balance=balance-%s WHERE id=%s",
                    (amount, from_id)
                )
                
                # 存款
                cursor.execute(
                    "UPDATE accounts SET balance=balance+%s WHERE id=%s",
                    (amount, to_id)
                )
                
            conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"Transaction failed: {e}")

6. 使用 ORM (SQLAlchemy)

对于复杂应用,推荐使用 ORM:

from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Student(Base):
    __tablename__ = 'students'
    
    id = Column(String(255), primary_key=True)
    name = Column(String(255))
    age = Column(Integer)
    gender = Column(String(10))

# 创建引擎
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/spiders')

# 创建表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 添加数据
new_student = Student(id='20120005', name='Tom', age=24, gender='male')
session.add(new_student)
session.commit()

# 查询数据
students = session.query(Student).filter(Student.age >= 20).all()
for student in students:
    print(student.name, student.age)

session.close()

7. 最佳实践

  1. 使用连接池:避免频繁创建和销毁连接
  2. 参数化查询:防止 SQL 注入
  3. 事务管理:确保数据一致性
  4. 错误处理:捕获并处理数据库异常
  5. 资源释放:使用 with 语句或确保关闭连接
  6. 索引优化:对常用查询字段建立索引
  7. 批量操作:大数据量时使用批量插入/更新

8. 常见问题

8.1 连接超时

db = pymysql.connect(
    ...,
    connect_timeout=10,
    read_timeout=30,
    write_timeout=30
)

8.2 字符集问题

使用 utf8mb4 而不是 utf8,以支持完整的 Unicode 字符(如 emoji)

8.3 时区问题

db = pymysql.connect(
    ...,
    init_command='SET time_zone="+00:00"'
)

9. 总结

本教程详细介绍了 Python 操作 MySQL 数据库的各个方面,从基础连接到高级操作,再到 ORM 使用和最佳实践。掌握这些知识后,你将能够:

  • 安全高效地连接 MySQL 数据库
  • 执行 CRUD 操作
  • 处理事务和错误
  • 优化数据库性能
  • 构建健壮的数据库应用