泰兴市文章资讯

Python版SQLite数据库从入门到综合运用案例

2026-04-22 20:02:01 浏览次数:1
详细信息
Python SQLite 数据库从入门到综合运用案例

一、SQLite基础介绍

SQLite是一个轻量级的嵌入式关系型数据库,无需独立的服务器进程,数据库就是一个文件。Python内置了SQLite支持,通过sqlite3模块即可使用。

二、基本操作

1. 连接数据库和创建表

import sqlite3
import os

# 连接数据库(如果不存在则创建)
def create_database():
    # 连接数据库
    conn = sqlite3.connect('example.db')

    # 创建游标对象
    cursor = conn.cursor()

    # 创建用户表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT NOT NULL UNIQUE,
        email TEXT NOT NULL,
        age INTEGER,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')

    # 创建订单表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS orders (
        order_id INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id INTEGER NOT NULL,
        product_name TEXT NOT NULL,
        amount REAL NOT NULL,
        order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
    )
    ''')

    # 创建索引以提高查询性能
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_username ON users(username)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_id ON orders(user_id)')

    # 提交更改
    conn.commit()

    print("数据库和表创建成功!")

    return conn

# 使用上下文管理器连接数据库
def connect_with_context():
    with sqlite3.connect('example.db') as conn:
        conn.execute("PRAGMA foreign_keys = ON")  # 启用外键约束
        return conn

2. 插入数据

def insert_data(conn):
    cursor = conn.cursor()

    # 插入单条数据
    cursor.execute(
        "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
        ('alice', 'alice@example.com', 25)
    )

    # 插入多条数据
    users_data = [
        ('bob', 'bob@example.com', 30),
        ('charlie', 'charlie@example.com', 35),
        ('diana', 'diana@example.com', 28)
    ]

    cursor.executemany(
        "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
        users_data
    )

    # 获取最后插入的ID
    last_id = cursor.lastrowid
    print(f"最后插入的用户ID: {last_id}")

    # 插入订单数据
    orders_data = [
        (1, '笔记本电脑', 899.99),
        (1, '鼠标', 25.50),
        (2, '键盘', 75.00),
        (3, '显示器', 299.99),
        (4, '耳机', 89.99)
    ]

    cursor.executemany(
        "INSERT INTO orders (user_id, product_name, amount) VALUES (?, ?, ?)",
        orders_data
    )

    conn.commit()
    print("数据插入成功!")

3. 查询数据

def query_data(conn):
    cursor = conn.cursor()

    # 查询所有用户
    print("=== 所有用户 ===")
    cursor.execute("SELECT * FROM users")
    users = cursor.fetchall()

    for user in users:
        print(f"ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}, 年龄: {user[3]}, 注册时间: {user[4]}")

    # 查询特定用户
    print("\n=== 查询年龄大于28的用户 ===")
    cursor.execute(
        "SELECT username, email, age FROM users WHERE age > ?",
        (28,)
    )

    for row in cursor.fetchall():
        print(f"用户名: {row[0]}, 邮箱: {row[1]}, 年龄: {row[2]}")

    # 使用字典游标(Python 3.8+)
    print("\n=== 使用字典游标查询 ===")
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    cursor.execute("SELECT * FROM users WHERE username LIKE ?", ('%a%',))
    rows = cursor.fetchall()

    for row in rows:
        print(f"ID: {row['id']}, 用户名: {row['username']}")

4. 更新和删除数据

def update_and_delete(conn):
    cursor = conn.cursor()

    # 更新数据
    cursor.execute(
        "UPDATE users SET age = ? WHERE username = ?",
        (26, 'alice')
    )
    print(f"更新了 {cursor.rowcount} 条记录")

    # 删除数据
    cursor.execute("DELETE FROM users WHERE username = ?", ('charlie',))
    print(f"删除了 {cursor.rowcount} 条记录")

    conn.commit()

5. 事务处理

def transaction_example(conn):
    try:
        # 开始事务
        conn.execute("BEGIN TRANSACTION")

        cursor = conn.cursor()

        # 一系列操作
        cursor.execute(
            "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
            ('eva', 'eva@example.com', 32)
        )

        user_id = cursor.lastrowid

        cursor.execute(
            "INSERT INTO orders (user_id, product_name, amount) VALUES (?, ?, ?)",
            (user_id, '平板电脑', 499.99)
        )

        # 模拟错误(如果发生则回滚)
        # raise ValueError("模拟错误")

        # 提交事务
        conn.commit()
        print("事务提交成功!")

    except Exception as e:
        # 回滚事务
        conn.rollback()
        print(f"事务回滚,原因: {e}")

三、高级特性

1. 自定义行工厂和类型适配器

import sqlite3
from datetime import datetime

# 自定义类型适配器
def adapt_datetime(dt):
    return dt.isoformat()

def convert_datetime(text):
    return datetime.fromisoformat(text.decode())

# 注册适配器
sqlite3.register_adapter(datetime, adapt_datetime)
sqlite3.register_converter("timestamp", convert_datetime)

# 自定义行工厂
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

def advanced_features():
    # 连接时指定自定义类型
    conn = sqlite3.connect(
        'advanced.db',
        detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
    )

    conn.row_factory = dict_factory
    cursor = conn.cursor()

    # 创建包含时间戳的表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS logs (
        id INTEGER PRIMARY KEY,
        message TEXT,
        created_at TIMESTAMP
    )
    ''')

    # 插入带时间戳的数据
    now = datetime.now()
    cursor.execute(
        "INSERT INTO logs (message, created_at) VALUES (?, ?)",
        ("测试日志", now)
    )

    conn.commit()

    # 查询并自动转换时间戳
    cursor.execute("SELECT * FROM logs")
    for row in cursor.fetchall():
        print(f"日志: {row['message']}, 时间: {row['created_at']}, 类型: {type(row['created_at'])}")

    conn.close()

2. 使用WITH语句管理连接

class DatabaseManager:
    def __init__(self, db_name):
        self.db_name = db_name

    def __enter__(self):
        self.conn = sqlite3.connect(self.db_name)
        self.conn.row_factory = sqlite3.Row
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.conn.commit()
        else:
            self.conn.rollback()
        self.conn.close()
        return False  # 不抑制异常

# 使用示例
def use_database_manager():
    with DatabaseManager('managed.db') as conn:
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY,
                name TEXT,
                price REAL
            )
        ''')

        products = [
            ('产品A', 19.99),
            ('产品B', 29.99),
            ('产品C', 39.99)
        ]

        cursor.executemany(
            "INSERT INTO products (name, price) VALUES (?, ?)",
            products
        )

        cursor.execute("SELECT * FROM products")
        for row in cursor.fetchall():
            print(dict(row))

四、综合运用案例:学生管理系统

import sqlite3
from datetime import datetime
import csv
import json

class StudentManagementSystem:
    def __init__(self, db_name='students.db'):
        self.db_name = db_name
        self.init_database()

    def init_database(self):
        """初始化数据库和表"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()

            # 创建学生表
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS students (
                student_id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                gender TEXT CHECK(gender IN ('男', '女')),
                birth_date TEXT,
                major TEXT,
                enrollment_date TEXT,
                gpa REAL DEFAULT 0.0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            ''')

            # 创建课程表
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS courses (
                course_id INTEGER PRIMARY KEY AUTOINCREMENT,
                course_name TEXT NOT NULL,
                credit INTEGER DEFAULT 1,
                teacher TEXT,
                semester TEXT
            )
            ''')

            # 创建成绩表(关联学生和课程)
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS grades (
                grade_id INTEGER PRIMARY KEY AUTOINCREMENT,
                student_id INTEGER,
                course_id INTEGER,
                score REAL CHECK(score >= 0 AND score <= 100),
                grade_point REAL,
                grade_date TEXT,
                FOREIGN KEY (student_id) REFERENCES students(student_id) ON DELETE CASCADE,
                FOREIGN KEY (course_id) REFERENCES courses(course_id) ON DELETE CASCADE,
                UNIQUE(student_id, course_id)
            )
            ''')

            # 创建索引
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_student_name ON students(name)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_student_major ON students(major)')

            conn.commit()

    def add_student(self, name, gender, birth_date, major, enrollment_date):
        """添加学生"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            INSERT INTO students (name, gender, birth_date, major, enrollment_date)
            VALUES (?, ?, ?, ?, ?)
            ''', (name, gender, birth_date, major, enrollment_date))
            conn.commit()
            print(f"学生 {name} 添加成功!ID: {cursor.lastrowid}")

    def add_course(self, course_name, credit, teacher, semester):
        """添加课程"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            INSERT INTO courses (course_name, credit, teacher, semester)
            VALUES (?, ?, ?, ?)
            ''', (course_name, credit, teacher, semester))
            conn.commit()
            print(f"课程 {course_name} 添加成功!")

    def add_grade(self, student_id, course_id, score):
        """添加成绩并计算绩点"""
        # 计算绩点(标准4.0制)
        if score >= 90:
            grade_point = 4.0
        elif score >= 85:
            grade_point = 3.7
        elif score >= 82:
            grade_point = 3.3
        elif score >= 78:
            grade_point = 3.0
        elif score >= 75:
            grade_point = 2.7
        elif score >= 72:
            grade_point = 2.3
        elif score >= 68:
            grade_point = 2.0
        elif score >= 64:
            grade_point = 1.5
        elif score >= 60:
            grade_point = 1.0
        else:
            grade_point = 0.0

        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            INSERT INTO grades (student_id, course_id, score, grade_point, grade_date)
            VALUES (?, ?, ?, ?, date('now'))
            ''', (student_id, course_id, score, grade_point))

            # 更新学生平均绩点
            self._update_student_gpa(conn, student_id)

            conn.commit()
            print(f"成绩添加成功!绩点: {grade_point}")

    def _update_student_gpa(self, conn, student_id):
        """更新学生平均绩点"""
        cursor = conn.cursor()
        cursor.execute('''
        SELECT AVG(g.grade_point)
        FROM grades g
        JOIN courses c ON g.course_id = c.course_id
        WHERE g.student_id = ?
        ''', (student_id,))

        result = cursor.fetchone()
        gpa = result[0] if result[0] is not None else 0.0

        cursor.execute('''
        UPDATE students SET gpa = ? WHERE student_id = ?
        ''', (round(gpa, 2), student_id))

    def query_students(self, major=None, min_gpa=None):
        """查询学生信息"""
        with sqlite3.connect(self.db_name) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()

            query = "SELECT * FROM students WHERE 1=1"
            params = []

            if major:
                query += " AND major = ?"
                params.append(major)

            if min_gpa is not None:
                query += " AND gpa >= ?"
                params.append(min_gpa)

            query += " ORDER BY gpa DESC"
            cursor.execute(query, params)

            students = []
            for row in cursor.fetchall():
                students.append(dict(row))

            return students

    def get_student_report(self, student_id):
        """获取学生成绩报告"""
        with sqlite3.connect(self.db_name) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()

            # 获取学生基本信息
            cursor.execute('''
            SELECT s.*, 
                   COUNT(g.grade_id) as course_count,
                   AVG(g.score) as avg_score
            FROM students s
            LEFT JOIN grades g ON s.student_id = g.student_id
            WHERE s.student_id = ?
            GROUP BY s.student_id
            ''', (student_id,))

            student_info = cursor.fetchone()

            if not student_info:
                return None

            # 获取详细成绩
            cursor.execute('''
            SELECT c.course_name, c.credit, g.score, g.grade_point
            FROM grades g
            JOIN courses c ON g.course_id = c.course_id
            WHERE g.student_id = ?
            ORDER BY c.course_name
            ''', (student_id,))

            grades = [dict(row) for row in cursor.fetchall()]

            return {
                'student_info': dict(student_info),
                'grades': grades
            }

    def export_to_csv(self, filename='students_export.csv'):
        """导出学生数据到CSV"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()
            cursor.execute('''
            SELECT student_id, name, gender, birth_date, major, enrollment_date, gpa
            FROM students
            ORDER BY student_id
            ''')

            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                # 写入标题行
                writer.writerow(['学号', '姓名', '性别', '出生日期', '专业', '入学日期', '平均绩点'])
                # 写入数据
                writer.writerows(cursor.fetchall())

            print(f"数据已导出到 {filename}")

    def import_from_csv(self, filename):
        """从CSV导入学生数据"""
        with sqlite3.connect(self.db_name) as conn:
            cursor = conn.cursor()

            with open(filename, 'r', newline='', encoding='utf-8') as f:
                reader = csv.reader(f)
                next(reader)  # 跳过标题行

                for row in reader:
                    try:
                        cursor.execute('''
                        INSERT INTO students (name, gender, birth_date, major, enrollment_date, gpa)
                        VALUES (?, ?, ?, ?, ?, ?)
                        ''', (row[1], row[2], row[3], row[4], row[5], float(row[6]) if row[6] else 0.0))
                    except Exception as e:
                        print(f"导入失败: {row}, 错误: {e}")

            conn.commit()
            print(f"从 {filename} 导入数据完成")

    def backup_database(self, backup_name=None):
        """备份数据库"""
        if backup_name is None:
            backup_name = f"{self.db_name}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        with sqlite3.connect(self.db_name) as source:
            with sqlite3.connect(backup_name) as target:
                source.backup(target)

        print(f"数据库已备份到 {backup_name}")
        return backup_name

# 使用示例
def demo_student_system():
    # 创建系统实例
    sms = StudentManagementSystem('university.db')

    # 添加测试数据
    print("=== 添加学生 ===")
    students = [
        ('张三', '男', '2000-05-15', '计算机科学', '2020-09-01'),
        ('李四', '女', '2001-03-22', '软件工程', '2020-09-01'),
        ('王五', '男', '1999-11-30', '计算机科学', '2019-09-01'),
        ('赵六', '女', '2000-08-14', '人工智能', '2020-09-01')
    ]

    for student in students:
        sms.add_student(*student)

    print("\n=== 添加课程 ===")
    courses = [
        ('数据结构', 4, '王教授', '2023-秋季'),
        ('数据库原理', 3, '李教授', '2023-秋季'),
        ('算法设计', 4, '张教授', '2023-春季'),
        ('机器学习', 3, '陈教授', '2023-春季')
    ]

    for course in courses:
        sms.add_course(*course)

    print("\n=== 添加成绩 ===")
    # 假设学生ID和课程ID从1开始
    grades = [
        (1, 1, 85),  # 张三 数据结构 85分
        (1, 2, 92),  # 张三 数据库原理 92分
        (2, 1, 78),  # 李四 数据结构 78分
        (2, 3, 88),  # 李四 算法设计 88分
        (3, 2, 95),  # 王五 数据库原理 95分
        (4, 4, 91),  # 赵六 机器学习 91分
    ]

    for grade in grades:
        sms.add_grade(*grade)

    print("\n=== 查询计算机科学专业的学生 ===")
    cs_students = sms.query_students(major='计算机科学')
    for student in cs_students:
        print(f"{student['name']} - {student['major']} - GPA: {student['gpa']}")

    print("\n=== 获取学生成绩报告 ===")
    report = sms.get_student_report(1)  # 获取张三的成绩报告
    if report:
        print(f"学生: {report['student_info']['name']}")
        print(f"专业: {report['student_info']['major']}")
        print(f"平均绩点: {report['student_info']['gpa']}")
        print("课程成绩:")
        for grade in report['grades']:
            print(f"  {grade['course_name']}: {grade['score']}分, 绩点: {grade['grade_point']}")

    print("\n=== 导出数据到CSV ===")
    sms.export_to_csv('students.csv')

    print("\n=== 备份数据库 ===")
    sms.backup_database()

    return sms

if __name__ == "__main__":
    # 运行演示
    sms = demo_student_system()

    # 额外功能:查询绩点高于3.0的学生
    print("\n=== 绩点高于3.0的学生 ===")
    high_gpa_students = sms.query_students(min_gpa=3.0)
    for student in high_gpa_students:
        print(f"{student['name']}: {student['gpa']}")

五、最佳实践和性能优化

import sqlite3
import time
from contextlib import contextmanager

class OptimizedDatabase:
    def __init__(self, db_name):
        self.db_name = db_name

    @contextmanager
    def get_connection(self):
        """获取数据库连接的上下文管理器"""
        conn = sqlite3.connect(self.db_name)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    def batch_insert(self, data, batch_size=1000):
        """批量插入数据优化"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # 开始事务
            cursor.execute("BEGIN TRANSACTION")

            try:
                for i in range(0, len(data), batch_size):
                    batch = data[i:i+batch_size]
                    cursor.executemany(
                        "INSERT INTO large_table (name, value) VALUES (?, ?)",
                        batch
                    )

                cursor.execute("COMMIT")
                print(f"批量插入了 {len(data)} 条记录")

            except Exception as e:
                cursor.execute("ROLLBACK")
                print(f"批量插入失败: {e}")

    def create_indexes(self):
        """创建合适的索引"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # 分析查询模式并创建索引
            indexes = [
                "CREATE INDEX IF NOT EXISTS idx_name ON large_table(name)",
                "CREATE INDEX IF NOT EXISTS idx_value ON large_table(value)",
                "CREATE INDEX IF NOT EXISTS idx_name_value ON large_table(name, value)"
            ]

            for index_sql in indexes:
                cursor.execute(index_sql)

    def vacuum_database(self):
        """压缩数据库文件,回收空间"""
        with self.get_connection() as conn:
            conn.execute("VACUUM")
            print("数据库压缩完成")

    def analyze_performance(self):
        """分析查询性能"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # 启用性能分析
            cursor.execute("PRAGMA journal_mode = WAL")  # Write-Ahead Logging
            cursor.execute("PRAGMA synchronous = NORMAL")
            cursor.execute("PRAGMA cache_size = -2000")  # 2MB缓存

            # 执行计划分析
            cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM large_table WHERE name = ?", ('test',))
            plan = cursor.fetchall()
            print("查询执行计划:")
            for row in plan:
                print(row)

    def use_window_functions(self):
        """使用窗口函数(SQLite 3.25+)"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # 创建示例数据
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS sales (
                id INTEGER PRIMARY KEY,
                salesperson TEXT,
                region TEXT,
                amount REAL,
                sale_date TEXT
            )
            ''')

            # 使用窗口函数计算排名
            cursor.execute('''
            SELECT 
                salesperson,
                region,
                amount,
                sale_date,
                RANK() OVER (PARTITION BY region ORDER BY amount DESC) as rank_in_region,
                SUM(amount) OVER (PARTITION BY region) as region_total
            FROM sales
            ORDER BY region, rank_in_region
            ''')

            results = cursor.fetchall()
            for row in results:
                print(dict(row))

def performance_comparison():
    """性能对比测试"""
    db = OptimizedDatabase('performance_test.db')

    # 准备测试数据
    test_data = [(f'name_{i}', i * 1.5) for i in range(10000)]

    # 测试批量插入性能
    print("测试批量插入性能...")

    # 单条插入
    start_time = time.time()
    with db.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS test_table (name TEXT, value REAL)")
        for item in test_data[:1000]:  # 只测试1000条
            cursor.execute("INSERT INTO test_table (name, value) VALUES (?, ?)", item)
    print(f"单条插入1000条记录耗时: {time.time() - start_time:.2f}秒")

    # 批量插入
    start_time = time.time()
    with db.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DROP TABLE IF EXISTS test_table_batch")
        cursor.execute("CREATE TABLE test_table_batch (name TEXT, value REAL)")
        cursor.executemany("INSERT INTO test_table_batch (name, value) VALUES (?, ?)", test_data[:1000])
    print(f"批量插入1000条记录耗时: {time.time() - start_time:.2f}秒")

# 运行性能测试
if __name__ == "__main__":
    print("=== 性能优化示例 ===")
    performance_comparison()

六、总结

SQLite在Python中的优势:

内置支持:无需安装额外依赖 轻量级:数据库为单个文件,易于部署 零配置:无需服务器配置 事务支持:ACID兼容 类型灵活:动态类型系统

最佳实践:

总是使用参数化查询防止SQL注入 使用事务处理批量操作 合理创建索引提高查询性能 使用上下文管理器管理连接 定期备份重要数据 使用WAL模式提高并发性能

适用场景:

这个全面的指南涵盖了从基础到高级的SQLite使用,以及一个完整的学生管理系统案例。你可以根据需要调整和扩展这些代码来满足具体需求。

相关推荐