原子性事务完整实现
import sqlite3
from contextlib import contextmanager
from typing import Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
import threading
import time
import uuid
class TransactionStatus(Enum):
"""事务状态"""
PENDING = "pending"
RUNNING = "running"
COMMITTED = "committed"
ROLLED_BACK = "rolled_back"
FAILED = "failed"
@dataclass
class AtomicTransaction:
"""原子事务定义"""
transaction_id: str
operations: list
status: TransactionStatus = TransactionStatus.PENDING
start_time: float = None
end_time: float = None
error: str = None
result: Any = None
class AtomicExecutor:
"""
原子执行器
核心功能:
1. 原子事务管理
2. 自动回滚机制
3. 事务隔离
4. 错误恢复
5. 事务日志
"""
def __init__(self, db_path: str = ":memory:"):
self.db_path = db_path
self.transactions: dict = {}
self.lock = threading.Lock()
self._init_database()
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建事务日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS transaction_log (
transaction_id TEXT PRIMARY KEY,
status TEXT,
start_time REAL,
end_time REAL,
error TEXT,
operations TEXT
)
''')
# 创建示例账户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
account_id TEXT PRIMARY KEY,
balance REAL DEFAULT 0.0
)
''')
conn.commit()
conn.close()
@contextmanager
def atomic_transaction(self, operations: list = None):
"""
原子事务上下文管理器
用法:
with executor.atomic_transaction() as tx:
tx.execute(operation1)
tx.execute(operation2)
Args:
operations: 操作列表(可选)
Yields:
AtomicTransaction: 事务对象
"""
transaction_id = str(uuid.uuid4())
transaction = AtomicTransaction(
transaction_id=transaction_id,
operations=operations or []
)
with self.lock:
self.transactions[transaction_id] = transaction
transaction.status = TransactionStatus.RUNNING
transaction.start_time = time.time()
try:
# 开始事务
conn = sqlite3.connect(self.db_path)
conn.isolation_level = None # 手动控制事务
cursor = conn.cursor()
cursor.execute("BEGIN IMMEDIATE")
yield transaction
# 提交事务
cursor.execute("COMMIT")
transaction.status = TransactionStatus.COMMITTED
transaction.result = "Transaction committed successfully"
conn.close()
except Exception as e:
# 回滚事务
try:
cursor.execute("ROLLBACK")
transaction.status = TransactionStatus.ROLLED_BACK
transaction.error = str(e)
conn.close()
except:
transaction.status = TransactionStatus.FAILED
transaction.error = f"Rollback failed: {str(e)}"
raise
finally:
transaction.end_time = time.time()
self._log_transaction(transaction)
def _log_transaction(self, transaction: AtomicTransaction):
"""记录事务日志"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO transaction_log
(transaction_id, status, start_time, end_time, error, operations)
VALUES (?, ?, ?, ?, ?, ?)
''', (
transaction.transaction_id,
transaction.status.value,
transaction.start_time,
transaction.end_time,
transaction.error,
str(transaction.operations)
))
conn.commit()
conn.close()
def transfer_money(self, from_account: str, to_account: str, amount: float):
"""
原子性转账示例
保证:要么两个账户都更新成功,要么都不更新
"""
with self.atomic_transaction() as tx:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 检查余额
cursor.execute(
"SELECT balance FROM accounts WHERE account_id = ?",
(from_account,)
)
result = cursor.fetchone()
if not result:
raise ValueError(f"Account {from_account} not found")
current_balance = result[0]
if current_balance < amount:
raise ValueError(f"Insufficient balance: {current_balance} < {amount}")
# 扣款
cursor.execute(
"UPDATE accounts SET balance = balance - ? WHERE account_id = ?",
(amount, from_account)
)
# 入账
cursor.execute(
"UPDATE accounts SET balance = balance + ? WHERE account_id = ?",
(amount, to_account)
)
tx.operations = [
f"Debit {amount} from {from_account}",
f"Credit {amount} to {to_account}"
]
conn.close()
return {
"status": "success",
"from": from_account,
"to": to_account,
"amount": amount
}
def get_transaction_status(self, transaction_id: str) -> dict:
"""获取事务状态"""
transaction = self.transactions.get(transaction_id)
if not transaction:
return None
return {
"transaction_id": transaction.transaction_id,
"status": transaction.status.value,
"start_time": transaction.start_time,
"end_time": transaction.end_time,
"duration": (transaction.end_time - transaction.start_time) if transaction.end_time and transaction.start_time else None,
"error": transaction.error,
"operations": transaction.operations
}
# 使用示例
if __name__ == "__main__":
# 创建执行器
executor = AtomicExecutor(db_path="atomic_demo.db")
# 初始化账户
conn = sqlite3.connect("atomic_demo.db")
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO accounts (account_id, balance) VALUES ('A', 1000)")
cursor.execute("INSERT OR IGNORE INTO accounts (account_id, balance) VALUES ('B', 500)")
conn.commit()
conn.close()
print("=== 原子性转账示例 ===")
print("初始状态:A=1000, B=500")
try:
# 成功转账
result = executor.transfer_money('A', 'B', 200)
print(f"\n✓ 转账成功:{result}")
# 查询余额
conn = sqlite3.connect("atomic_demo.db")
cursor = conn.cursor()
cursor.execute("SELECT account_id, balance FROM accounts")
for row in cursor.fetchall():
print(f" {row[0]}: {row[1]}")
conn.close()
print("\n关键观察:")
print("1. 原子性保证:A 扣款和 B 入账要么都成功,要么都失败")
print("2. 自动回滚:如果 B 入账失败,A 扣款会自动回滚")
print("3. 事务日志:所有事务操作都被记录,可追溯")
print("4. 隔离性:并发事务互不干扰")
print("5. 持久性:提交后数据永久保存")
except Exception as e:
print(f"\n✗ 转账失败:{e}")
print(" 所有操作已回滚,数据保持一致")
# 查询事务状态
print("\n=== 事务日志 ===")
for tx_id in executor.transactions:
status = executor.get_transaction_status(tx_id)
print(f"事务 {tx_id[:8]}...: {status['status']} ({status['duration']:.4f}s)")