Python实现轻量级数据库引擎(续)——新增“事务功能”
喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。
前期基础教程:
「Python3.11.0」手把手教你安装最新版Python运行环境
讲讲Python环境使用Pip命令快速下载各类库的方法
Python启航:30天编程速成之旅(第2天)-IDE安装
【Python教程】JupyterLab 开发环境安装
接上期教程:
Python实现轻量级数据库引擎——用200行代码复刻SQLite3核心功能
上期教程带着大家实现了简单的数据库基本功能,今天继续增加新的功能:事务处理功能,确保数据操作的原子性。
一、事务功能介绍
事务用于将多个数据库操作作为单个逻辑单元执行,保证:
- 原子性:所有操作成功或全部回滚
- 一致性:保持数据完整性
- 隔离性:事务中间状态不可见
- 持久性:提交后修改永久保存
二、代码修改步骤
1. 添加事务状态管理
import json
import copy # 新增导入
class SimpleDatabase:
def __init__(self, db_file):
self.db_file = db_file
self.in_transaction = False # 事务状态标志
self.checkpoint = None # 数据快照
# ...其他原有初始化代码...
2. 新增事务控制方法
def begin_transaction(self):
"""开启事务"""
if self.in_transaction:
print("事务已在进行中")
return
self.checkpoint = copy.deepcopy(self.data) # 创建数据快照
self.in_transaction = True
def commit(self):
"""提交事务"""
if not self.in_transaction:
print("没有活跃事务可提交")
return
try:
self.save() # 持久化数据
finally:
self.in_transaction = False
self.checkpoint = None
def rollback(self):
"""回滚事务"""
if not self.in_transaction:
print("没有活跃事务可回滚")
return
self.data = copy.deepcopy(self.checkpoint) # 恢复数据
self.in_transaction = False
self.checkpoint = None
3. 修改原有保存逻辑
在所有数据修改方法中增加事务状态判断:
# 以insert方法为例
def insert(self, table_name, values):
if table_name in self.data:
table = self.data[table_name]
if len(values) == len(table["columns"]):
table["rows"].append(values)
if not self.in_transaction: # 非事务中立即保存
self.save()
# ...其余原有代码...
三、使用示例
# 初始化数据库
db = SimpleDatabase('test_db.json')
db.create_table('accounts', ['id', 'balance'])
# 正常事务流程
try:
db.begin_transaction()
db.insert('accounts', [1, 1000])
db.insert('accounts', [2, 2000])
db.commit()
except:
db.rollback()
# 异常回滚流程
try:
db.begin_transaction()
db.update('accounts', [None, 1500], (0, 1)) # 账户1余额更新
raise Exception("模拟故障")
db.commit()
except:
db.rollback() # 所有修改撤销
四、完整修改后代码
import json
import copy
class SimpleDatabase:
def __init__(self, db_file):
self.db_file = db_file
self.in_transaction = False
self.checkpoint = None
try:
with open(self.db_file, 'r') as f:
self.data = json.load(f)
except FileNotFoundError:
self.data = {}
def save(self):
with open(self.db_file, 'w') as f:
json.dump(self.data, f, indent=4)
def begin_transaction(self):
if self.in_transaction:
print("Transaction already in progress.")
return
self.checkpoint = copy.deepcopy(self.data)
self.in_transaction = True
def commit(self):
if not self.in_transaction:
print("No active transaction to commit.")
return
try:
self.save()
finally:
self.in_transaction = False
self.checkpoint = None
def rollback(self):
if not self.in_transaction:
print("No active transaction to rollback.")
return
self.data = copy.deepcopy(self.checkpoint)
self.in_transaction = False
self.checkpoint = None
# 原有方法(create_table/insert/select/update/delete)保持结构不变
# 仅在save()调用处添加事务状态判断,例如:
def insert(self, table_name, values):
if table_name in self.data:
table = self.data[table_name]
if len(values) == len(table["columns"]):
table["rows"].append(values)
if not self.in_transaction:
self.save()
else:
print("Number of values does not match number of columns.")
else:
print(f"Table {table_name} does not exist.")
def create_table(self, table_name, columns):
"""
创建一个新的表
:param table_name: 表名
:param columns: 列名列表
"""
if table_name not in self.data:
self.data[table_name] = {
"columns": columns,
"rows": []
}
if not self.in_transaction:
self.save()
else:
print(f"Table {table_name} already exists.")
def select(self, table_name, where=None):
"""
从指定表中查询数据
:param table_name: 表名
:param where: 查询条件,格式为 (column_index, value)
:return: 查询结果列表
"""
if table_name in self.data:
table = self.data[table_name]
if where is None:
return table["rows"]
else:
column_index, value = where
return [row for row in table["rows"] if row[column_index] == value]
else:
print(f"Table {table_name} does not exist.")
return []
def update(self, table_name, set_values, where):
"""
更新指定表中的数据
:param table_name: 表名
:param set_values: 要更新的值列表
:param where: 更新条件,格式为 (column_index, value)
"""
if table_name in self.data:
table = self.data[table_name]
column_index, value = where
for row in table["rows"]:
if row[column_index] == value:
for i, val in enumerate(set_values):
if val is not None:
row[i] = val
if not self.in_transaction:
self.save()
else:
print(f"Table {table_name} does not exist.")
def delete(self, table_name, where):
"""
从指定表中删除数据
:param table_name: 表名
:param where: 删除条件,格式为 (column_index, value)
"""
if table_name in self.data:
table = self.data[table_name]
column_index, value = where
table["rows"] = [row for row in table["rows"] if row[column_index] != value]
if not self.in_transaction:
self.save()
else:
print(f"Table {table_name} does not exist.")
# 使用示例
if __name__ == "__main__":
# 初始化数据库
db = SimpleDatabase('test_db.json')
db.create_table('accounts', ['id', 'balance'])
# 正常事务流程
try:
db.begin_transaction()
db.insert('accounts', [1, 1000])
db.insert('accounts', [2, 2000])
db.commit()
except:
db.rollback()
# 异常回滚流程
try:
db.begin_transaction()
db.update('accounts', [None, 1500], (0, 1)) # 账户1余额更新
raise Exception("模拟故障")
db.commit()
except:
db.rollback() # 所有修改撤销
增加事务回滚,账户1未发生改变:
可以将raise Exception("模拟故障")这行代码注释掉,再运行。
运行结果,账户1已经更新至1500:
五、注意事项
- 嵌套事务:当前实现不支持嵌套事务,需在前一个事务提交/回滚后再开启新事务
- 数据持久化:提交前修改仅存在于内存,系统崩溃可能导致数据丢失
- 性能影响:长时间事务会占用更多内存(保持数据快照)
- 错误处理:建议使用try-exatch块包裹事务操作
喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。