Python实现轻量级数据库引擎(续)——新增“事务功能”

liftword1周前 (03-05)技术文章2

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

前期基础教程:

「Python3.11.0」手把手教你安装最新版Python运行环境

讲讲Python环境使用Pip命令快速下载各类库的方法

Python启航:30天编程速成之旅(第2天)-IDE安装

【Python教程】JupyterLab 开发环境安装


接上期教程:

Python实现轻量级数据库引擎——用200行代码复刻SQLite3核心功能

上期教程带着大家实现了简单的数据库基本功能,今天继续增加新的功能:事务处理功能,确保数据操作的原子性

一、事务功能介绍

事务用于将多个数据库操作作为单个逻辑单元执行,保证:

  • 原子性:所有操作成功或全部回滚
  • 一致性:保持数据完整性
  • 隔离性:事务中间状态不可见
  • 持久性:提交后修改永久保存

二、代码修改步骤

1. 添加事务状态管理

Bash
import json
import copy  # 新增导入

class SimpleDatabase:
    def __init__(self, db_file):
        self.db_file = db_file
        self.in_transaction = False  # 事务状态标志
        self.checkpoint = None       # 数据快照
        # ...其他原有初始化代码...

2. 新增事务控制方法

Bash
    def begin_transaction(self):
        """开启事务"""
        if self.in_transaction:
            print("事务已在进行中")
            return
        self.checkpoint = copy.deepcopy(self.data)  # 创建数据快照
        self.in_transaction = True

    def commit(self):
        """提交事务"""
        if not self.in_transaction:
            print("没有活跃事务可提交")
            return
        try:
            self.save()  # 持久化数据
        finally:
            self.in_transaction = False
            self.checkpoint = None

    def rollback(self):
        """回滚事务"""
        if not self.in_transaction:
            print("没有活跃事务可回滚")
            return
        self.data = copy.deepcopy(self.checkpoint)  # 恢复数据
        self.in_transaction = False
        self.checkpoint = None

3. 修改原有保存逻辑

在所有数据修改方法中增加事务状态判断:

Bash
# 以insert方法为例
def insert(self, table_name, values):
    if table_name in self.data:
        table = self.data[table_name]
        if len(values) == len(table["columns"]):
            table["rows"].append(values)
            if not self.in_transaction:  # 非事务中立即保存
                self.save()
        # ...其余原有代码...

三、使用示例

Bash
# 初始化数据库
db = SimpleDatabase('test_db.json')
db.create_table('accounts', ['id', 'balance'])

# 正常事务流程
try:
    db.begin_transaction()
    db.insert('accounts', [1, 1000])
    db.insert('accounts', [2, 2000])
    db.commit()
except:
    db.rollback()

# 异常回滚流程
try:
    db.begin_transaction()
    db.update('accounts', [None, 1500], (0, 1))  # 账户1余额更新
    raise Exception("模拟故障")
    db.commit()
except:
    db.rollback()  # 所有修改撤销

四、完整修改后代码

Bash
import json
import copy

class SimpleDatabase:
    def __init__(self, db_file):
        self.db_file = db_file
        self.in_transaction = False
        self.checkpoint = None
        try:
            with open(self.db_file, 'r') as f:
                self.data = json.load(f)
        except FileNotFoundError:
            self.data = {}

    def save(self):
        with open(self.db_file, 'w') as f:
            json.dump(self.data, f, indent=4)

    def begin_transaction(self):
        if self.in_transaction:
            print("Transaction already in progress.")
            return
        self.checkpoint = copy.deepcopy(self.data)
        self.in_transaction = True

    def commit(self):
        if not self.in_transaction:
            print("No active transaction to commit.")
            return
        try:
            self.save()
        finally:
            self.in_transaction = False
            self.checkpoint = None

    def rollback(self):
        if not self.in_transaction:
            print("No active transaction to rollback.")
            return
        self.data = copy.deepcopy(self.checkpoint)
        self.in_transaction = False
        self.checkpoint = None

    # 原有方法(create_table/insert/select/update/delete)保持结构不变
    # 仅在save()调用处添加事务状态判断,例如:
    def insert(self, table_name, values):
        if table_name in self.data:
            table = self.data[table_name]
            if len(values) == len(table["columns"]):
                table["rows"].append(values)
                if not self.in_transaction:
                    self.save()
            else:
                print("Number of values does not match number of columns.")
        else:
            print(f"Table {table_name} does not exist.")

    def create_table(self, table_name, columns):
        """
        创建一个新的表
        :param table_name: 表名
        :param columns: 列名列表
        """
        if table_name not in self.data:
            self.data[table_name] = {
                "columns": columns,
                "rows": []
            }
            if not self.in_transaction:
                self.save()
        else:
            print(f"Table {table_name} already exists.")

    def select(self, table_name, where=None):
        """
        从指定表中查询数据
        :param table_name: 表名
        :param where: 查询条件,格式为 (column_index, value)
        :return: 查询结果列表
        """
        if table_name in self.data:
            table = self.data[table_name]
            if where is None:
                return table["rows"]
            else:
                column_index, value = where
                return [row for row in table["rows"] if row[column_index] == value]
        else:
            print(f"Table {table_name} does not exist.")
            return []

    def update(self, table_name, set_values, where):
        """
        更新指定表中的数据
        :param table_name: 表名
        :param set_values: 要更新的值列表
        :param where: 更新条件,格式为 (column_index, value)
        """
        if table_name in self.data:
            table = self.data[table_name]
            column_index, value = where
            for row in table["rows"]:
                if row[column_index] == value:
                    for i, val in enumerate(set_values):
                        if val is not None:
                            row[i] = val
            if not self.in_transaction:
                self.save()
        else:
            print(f"Table {table_name} does not exist.")

    def delete(self, table_name, where):
        """
        从指定表中删除数据
        :param table_name: 表名
        :param where: 删除条件,格式为 (column_index, value)
        """
        if table_name in self.data:
            table = self.data[table_name]
            column_index, value = where
            table["rows"] = [row for row in table["rows"] if row[column_index] != value]
            if not self.in_transaction:
                self.save()
        else:
            print(f"Table {table_name} does not exist.")


# 使用示例
if __name__ == "__main__":
    # 初始化数据库
    db = SimpleDatabase('test_db.json')
    db.create_table('accounts', ['id', 'balance'])

    # 正常事务流程
    try:
        db.begin_transaction()
        db.insert('accounts', [1, 1000])
        db.insert('accounts', [2, 2000])
        db.commit()
    except:
        db.rollback()

    # 异常回滚流程
    try:
        db.begin_transaction()
        db.update('accounts', [None, 1500], (0, 1))  # 账户1余额更新
        raise Exception("模拟故障")
        db.commit()
    except:
        db.rollback()  # 所有修改撤销

增加事务回滚,账户1未发生改变:

可以将raise Exception("模拟故障")这行代码注释掉,再运行。

运行结果,账户1已经更新至1500:


五、注意事项

  1. 嵌套事务:当前实现不支持嵌套事务,需在前一个事务提交/回滚后再开启新事务
  2. 数据持久化:提交前修改仅存在于内存,系统崩溃可能导致数据丢失
  3. 性能影响:长时间事务会占用更多内存(保持数据快照)
  4. 错误处理:建议使用try-exatch块包裹事务操作

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

相关文章

Python 命令行工具 python 的常用参数执行命令

作为 Python 的初学者,最不缺见的就是命令行工具 python 的执行命令了,每每遇到就可能去查资料帮助,同样,自己也会不时的需要某些执行命令来完成自己的需求,鉴于此我对 python 工具的执...

python中执行shell命令的几个方法小结!很实用,帮助很大

最近有个需求就是页面上执行shell命令,第一想到的就是os.system,os.system('cat /proc/cpuinfo') 但是发现页面上打印的命令执行结果 0或者1,当然不满足需求了。...

Python 中的一些命令行命令

虽然 Python 通常用于构建具有图形用户界面 (GUI) 的应用程序,但它也支持命令行交互。命令行界面 (CLI) 是一种基于文本的方法,用于与计算机的操作系统进行交互并运行程序。从命令行运行 P...

入门必学25个python常用命令

以下是 Python 入门必学的 25 个常用命令(函数、语句等):基础输入输出与数据类型print():用于输出数据到控制台,例如print("Hello, World!")。input():获取用...

Python 基础教程 九之cron定时执行python脚本

前言在Linux或Unix系统中,你可以使用cron任务来定时执行Python脚本。cron是一个基于时间的作业调度器,允许你安排命令或脚本在系统上自动执行。安装cron大多数Linux发行版默认安装...

Python 基础教程十之设置cron作业在bash sh文件中执行python脚本

简介在上一节中,我们介绍了使用cron作业定时执行python脚本。这一节我们介绍使用bash sh文件执行批量python脚本。先回顾一下cron内容:设置cron作业是一种在Linux系统中定时执...