Python实现轻量级数据库引擎(续)——新增“事务功能”

liftword4个月前 (03-05)技术文章27

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

前期基础教程:

「Python3.11.0」手把手教你安装最新版Python运行环境

讲讲Python环境使用Pip命令快速下载各类库的方法

Python启航:30天编程速成之旅(第2天)-IDE安装

【Python教程】JupyterLab 开发环境安装


接上期教程:

Python实现轻量级数据库引擎——用200行代码复刻SQLite3核心功能

上期教程带着大家实现了简单的数据库基本功能,今天继续增加新的功能:事务处理功能,确保数据操作的原子性

一、事务功能介绍

事务用于将多个数据库操作作为单个逻辑单元执行,保证:

  • 原子性:所有操作成功或全部回滚
  • 一致性:保持数据完整性
  • 隔离性:事务中间状态不可见
  • 持久性:提交后修改永久保存

二、代码修改步骤

1. 添加事务状态管理

import json
import copy  # 新增导入

class SimpleDatabase:
    def __init__(self, db_file):
        self.db_file = db_file
        self.in_transaction = False  # 事务状态标志
        self.checkpoint = None       # 数据快照
        # ...其他原有初始化代码...

2. 新增事务控制方法

    def begin_transaction(self):
        """开启事务"""
        if self.in_transaction:
            print("事务已在进行中")
            return
        self.checkpoint = copy.deepcopy(self.data)  # 创建数据快照
        self.in_transaction = True

    def commit(self):
        """提交事务"""
        if not self.in_transaction:
            print("没有活跃事务可提交")
            return
        try:
            self.save()  # 持久化数据
        finally:
            self.in_transaction = False
            self.checkpoint = None

    def rollback(self):
        """回滚事务"""
        if not self.in_transaction:
            print("没有活跃事务可回滚")
            return
        self.data = copy.deepcopy(self.checkpoint)  # 恢复数据
        self.in_transaction = False
        self.checkpoint = None

3. 修改原有保存逻辑

在所有数据修改方法中增加事务状态判断:

# 以insert方法为例
def insert(self, table_name, values):
    if table_name in self.data:
        table = self.data[table_name]
        if len(values) == len(table["columns"]):
            table["rows"].append(values)
            if not self.in_transaction:  # 非事务中立即保存
                self.save()
        # ...其余原有代码...

三、使用示例

# 初始化数据库
db = SimpleDatabase('test_db.json')
db.create_table('accounts', ['id', 'balance'])

# 正常事务流程
try:
    db.begin_transaction()
    db.insert('accounts', [1, 1000])
    db.insert('accounts', [2, 2000])
    db.commit()
except:
    db.rollback()

# 异常回滚流程
try:
    db.begin_transaction()
    db.update('accounts', [None, 1500], (0, 1))  # 账户1余额更新
    raise Exception("模拟故障")
    db.commit()
except:
    db.rollback()  # 所有修改撤销

四、完整修改后代码

import json
import copy

class SimpleDatabase:
    def __init__(self, db_file):
        self.db_file = db_file
        self.in_transaction = False
        self.checkpoint = None
        try:
            with open(self.db_file, 'r') as f:
                self.data = json.load(f)
        except FileNotFoundError:
            self.data = {}

    def save(self):
        with open(self.db_file, 'w') as f:
            json.dump(self.data, f, indent=4)

    def begin_transaction(self):
        if self.in_transaction:
            print("Transaction already in progress.")
            return
        self.checkpoint = copy.deepcopy(self.data)
        self.in_transaction = True

    def commit(self):
        if not self.in_transaction:
            print("No active transaction to commit.")
            return
        try:
            self.save()
        finally:
            self.in_transaction = False
            self.checkpoint = None

    def rollback(self):
        if not self.in_transaction:
            print("No active transaction to rollback.")
            return
        self.data = copy.deepcopy(self.checkpoint)
        self.in_transaction = False
        self.checkpoint = None

    # 原有方法(create_table/insert/select/update/delete)保持结构不变
    # 仅在save()调用处添加事务状态判断,例如:
    def insert(self, table_name, values):
        if table_name in self.data:
            table = self.data[table_name]
            if len(values) == len(table["columns"]):
                table["rows"].append(values)
                if not self.in_transaction:
                    self.save()
            else:
                print("Number of values does not match number of columns.")
        else:
            print(f"Table {table_name} does not exist.")

    def create_table(self, table_name, columns):
        """
        创建一个新的表
        :param table_name: 表名
        :param columns: 列名列表
        """
        if table_name not in self.data:
            self.data[table_name] = {
                "columns": columns,
                "rows": []
            }
            if not self.in_transaction:
                self.save()
        else:
            print(f"Table {table_name} already exists.")

    def select(self, table_name, where=None):
        """
        从指定表中查询数据
        :param table_name: 表名
        :param where: 查询条件,格式为 (column_index, value)
        :return: 查询结果列表
        """
        if table_name in self.data:
            table = self.data[table_name]
            if where is None:
                return table["rows"]
            else:
                column_index, value = where
                return [row for row in table["rows"] if row[column_index] == value]
        else:
            print(f"Table {table_name} does not exist.")
            return []

    def update(self, table_name, set_values, where):
        """
        更新指定表中的数据
        :param table_name: 表名
        :param set_values: 要更新的值列表
        :param where: 更新条件,格式为 (column_index, value)
        """
        if table_name in self.data:
            table = self.data[table_name]
            column_index, value = where
            for row in table["rows"]:
                if row[column_index] == value:
                    for i, val in enumerate(set_values):
                        if val is not None:
                            row[i] = val
            if not self.in_transaction:
                self.save()
        else:
            print(f"Table {table_name} does not exist.")

    def delete(self, table_name, where):
        """
        从指定表中删除数据
        :param table_name: 表名
        :param where: 删除条件,格式为 (column_index, value)
        """
        if table_name in self.data:
            table = self.data[table_name]
            column_index, value = where
            table["rows"] = [row for row in table["rows"] if row[column_index] != value]
            if not self.in_transaction:
                self.save()
        else:
            print(f"Table {table_name} does not exist.")


# 使用示例
if __name__ == "__main__":
    # 初始化数据库
    db = SimpleDatabase('test_db.json')
    db.create_table('accounts', ['id', 'balance'])

    # 正常事务流程
    try:
        db.begin_transaction()
        db.insert('accounts', [1, 1000])
        db.insert('accounts', [2, 2000])
        db.commit()
    except:
        db.rollback()

    # 异常回滚流程
    try:
        db.begin_transaction()
        db.update('accounts', [None, 1500], (0, 1))  # 账户1余额更新
        raise Exception("模拟故障")
        db.commit()
    except:
        db.rollback()  # 所有修改撤销

增加事务回滚,账户1未发生改变:

可以将raise Exception("模拟故障")这行代码注释掉,再运行。

运行结果,账户1已经更新至1500:


五、注意事项

  1. 嵌套事务:当前实现不支持嵌套事务,需在前一个事务提交/回滚后再开启新事务
  2. 数据持久化:提交前修改仅存在于内存,系统崩溃可能导致数据丢失
  3. 性能影响:长时间事务会占用更多内存(保持数据快照)
  4. 错误处理:建议使用try-exatch块包裹事务操作

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

相关文章

python散装笔记——83: 解析命令行参数

大多数命令行工具依赖于程序执行时传递给程序的参数。这些程序不提示输入,而是期望设置数据或特定的标志(变成布尔值)。这使得用户和其他程序都能在 Python 文件启动时通过数据运行它。本节将解释和演示...

开门见山:Python的第一个程序

1、简要说明发自心底的热爱,并用你的注意力填满1000个小时就能练成任何你所需要的技能当我看到这句话的时候,震惊我好长时间。所以在学习接下来的内容之前有必要做个强调。自学是门手艺,没有自学能力的人没有...

通过Power BI Desktop调用Python的正确姿势 (基于Anaconda)

前言最近一直在尝试通过各种方法生成Calendar Table,如通过Python Pandas生成日历表,和通过PostgreSQL生成日历表等。Power BI与Python集成本来这篇文章的目的...

Python新晋界面库pywebio,不会这个技巧不可能用好它

界面的制作一直是 Python 的痛!使用 Python 制作桌面端界面是非常痛苦的过程(又难学又难看)。不过,Python 已经出现了几个基于web前端的库,他们的基本机制大同小异,如果对 界面操作...

如何让一个 Python 文件运行另一个 Python 文件

在 Python 编程中,经常需要从另一个 Python 文件中执行另一个 Python 文件。这可能是为了模块化、可重用性。在本文中,我们将探讨实现此任务的不同方法,每种方法都有其优势和用例。使用...