鸿蒙PC实战_鸿蒙与Flutter混合开发:数据库事务与并发控制
问题背景
在混合开发应用中,数据库事务和并发控制是确保数据一致性和应用稳定性的关键。当多个线程或进程同时访问数据库时,如果没有适当的并发控制机制,会导致数据竞争、脏读、幻读等问题。在混合开发中,Flutter端和原生端可能同时访问同一个数据库,这进一步增加了并发控制的复杂性。
问题1:事务处理不当导致数据不一致
问题描述
在某些业务场景中,需要执行多个数据库操作,这些操作要么全部成功,要么全部失败。例如,转账操作需要从一个账户扣款,同时向另一个账户加款。如果只执行了扣款但没有执行加款,就会导致数据不一致。如果没有使用事务来保证这些操作的原子性,就容易出现这种问题。
根本原因
事务处理不当通常是由于以下原因:没有使用事务、事务范围过大或过小、没有正确处理事务异常等。在混合开发中,如果Flutter端和原生端分别处理事务,可能导致两端的数据不一致。
解决方案
事务管理示例:
// 定义事务操作结果
class TransactionResult<T> {
final bool success;
final T? data;
final String? error;
final DateTime timestamp;
TransactionResult({
required this.success,
this.data,
this.error,
required this.timestamp,
});
}
// 事务管理器
class TransactionManager {
static final TransactionManager _instance = TransactionManager._internal();
factory TransactionManager() {
return _instance;
}
TransactionManager._internal();
// 执行转账事务
Future<TransactionResult<Map<String, dynamic>>> transferMoney(
Database db,
int fromAccountId,
int toAccountId,
double amount,
) async {
try {
final result = await db.transaction((txn) async {
// 检查源账户余额
final fromAccount = await txn.query(
'accounts',
where: 'id = ?',
whereArgs: [fromAccountId],
);
if (fromAccount.isEmpty) {
throw Exception('Source account not found');
}
final fromBalance = (fromAccount[0]['balance'] as num).toDouble();
if (fromBalance < amount) {
throw Exception('Insufficient balance');
}
// 从源账户扣款
await txn.update(
'accounts',
{'balance': fromBalance - amount},
where: 'id = ?',
whereArgs: [fromAccountId],
);
// 向目标账户加款
final toAccount = await txn.query(
'accounts',
where: 'id = ?',
whereArgs: [toAccountId],
);
if (toAccount.isEmpty) {
throw Exception('Target account not found');
}
final toBalance = (toAccount[0]['balance'] as num).toDouble();
await txn.update(
'accounts',
{'balance': toBalance + amount},
where: 'id = ?',
whereArgs: [toAccountId],
);
// 记录交易日志
await txn.insert('transactions', {
'from_account_id': fromAccountId,
'to_account_id': toAccountId,
'amount': amount,
'timestamp': DateTime.now().millisecondsSinceEpoch,
'status': 'completed',
});
return {
'fromAccountId': fromAccountId,
'toAccountId': toAccountId,
'amount': amount,
'status': 'success',
};
});
return TransactionResult(
success: true,
data: result,
timestamp: DateTime.now(),
);
} catch (e) {
print('Transaction failed: $e');
return TransactionResult(
success: false,
error: e.toString(),
timestamp: DateTime.now(),
);
}
}
// 批量插入事务
Future<TransactionResult<int>> batchInsert(
Database db,
String table,
List<Map<String, dynamic>> rows,
) async {
try {
final count = await db.transaction((txn) async {
int insertedCount = 0;
for (final row in rows) {
await txn.insert(table, row);
insertedCount++;
}
return insertedCount;
});
return TransactionResult(
success: true,
data: count,
timestamp: DateTime.now(),
);
} catch (e) {
return TransactionResult(
success: false,
error: e.toString(),
timestamp: DateTime.now(),
);
}
}
}
// 使用示例
class TransactionExample extends StatefulWidget {
State<TransactionExample> createState() => _TransactionExampleState();
}
class _TransactionExampleState extends State<TransactionExample> {
late Database _database;
final _transactionManager = TransactionManager();
String _status = 'Ready';
void initState() {
super.initState();
_initializeDatabase();
}
Future<void> _initializeDatabase() async {
final databasesPath = await getDatabasesPath();
final path = join(databasesPath, 'transaction_db.db');
_database = await openDatabase(path, version: 1, onCreate: (db, version) async {
await db.execute('''
CREATE TABLE accounts (
id INTEGER PRIMARY KEY,
name TEXT,
balance REAL
)
''');
await db.execute('''
CREATE TABLE transactions (
id INTEGER PRIMARY KEY,
from_account_id INTEGER,
to_account_id INTEGER,
amount REAL,
timestamp INTEGER,
status TEXT
)
''');
});
}
Future<void> _performTransfer() async {
setState(() => _status = 'Transferring...');
final result = await _transactionManager.transferMoney(
_database,
1,
2,
100.0,
);
setState(() {
_status = result.success ? 'Transfer successful' : 'Transfer failed: ${result.error}';
});
}
void dispose() {
_database.close();
super.dispose();
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Transaction Management')),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(_status),
SizedBox(height: 16),
ElevatedButton(
onPressed: _performTransfer,
child: Text('Transfer Money'),
),
],
),
),
);
}
}
这段代码实现了一个事务管理器,用于处理复杂的多步骤数据库操作。转账操作在一个事务中执行,确保要么全部成功,要么全部失败。如果任何步骤失败,整个事务会被回滚,保证数据一致性。
原生端事务处理示例:
// 事务管理器
export class TransactionManager {
// 执行转账事务
async transferMoney(
db: any,
fromAccountId: number,
toAccountId: number,
amount: number
): Promise<any> {
return await db.transaction(async (txn: any) => {
// 检查源账户
const fromAccount = await txn.query(
'SELECT * FROM accounts WHERE id = ?',
[fromAccountId]
);
if (!fromAccount || fromAccount.length === 0) {
throw new Error('Source account not found');
}
const fromBalance = fromAccount[0].balance;
if (fromBalance < amount) {
throw new Error('Insufficient balance');
}
// 扣款
await txn.execute(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
[amount, fromAccountId]
);
// 加款
await txn.execute(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
[amount, toAccountId]
);
// 记录交易
await txn.insert('transactions', {
from_account_id: fromAccountId,
to_account_id: toAccountId,
amount: amount,
timestamp: Date.now(),
status: 'completed',
});
return { status: 'success' };
});
}
}
在原生端,我们使用相同的事务模式来确保数据一致性。
最佳实践
- 原子性操作:将相关的操作放在一个事务中,确保原子性。
- 异常处理:在事务中添加适当的异常处理,确保失败时能够正确回滚。
- 事务范围:保持事务范围尽可能小,避免长时间持有锁。
问题2:并发访问导致数据竞争
问题描述
当多个线程或进程同时访问同一个数据库时,如果没有适当的并发控制,会导致数据竞争。例如,两个线程同时读取和修改同一条记录,可能导致其中一个线程的修改被另一个线程覆盖。
根本原因
SQLite等嵌入式数据库通常使用文件级锁,不支持行级锁。当多个线程同时访问时,容易导致锁竞争。在混合开发中,Flutter端和原生端可能同时访问数据库,加剧了这个问题。
解决方案
并发控制示例:
// 并发控制管理器
class ConcurrencyControlManager {
static final ConcurrencyControlManager _instance = ConcurrencyControlManager._internal();
factory ConcurrencyControlManager() {
return _instance;
}
ConcurrencyControlManager._internal();
final Map<String, Mutex> _locks = {};
// 获取行级锁
Future<void> acquireRowLock(String table, int rowId) async {
final lockKey = '$table:$rowId';
if (!_locks.containsKey(lockKey)) {
_locks[lockKey] = Mutex();
}
await _locks[lockKey]!.lock();
}
// 释放行级锁
void releaseRowLock(String table, int rowId) {
final lockKey = '$table:$rowId';
_locks[lockKey]?.unlock();
}
// 执行并发安全的更新
Future<bool> updateWithLock(
Database db,
String table,
int rowId,
Map<String, dynamic> values,
) async {
await acquireRowLock(table, rowId);
try {
final result = await db.update(
table,
values,
where: 'id = ?',
whereArgs: [rowId],
);
return result > 0;
} finally {
releaseRowLock(table, rowId);
}
}
}
// 简单的互斥锁实现
class Mutex {
bool _locked = false;
final List<Completer<void>> _waitQueue = [];
Future<void> lock() async {
if (!_locked) {
_locked = true;
return;
}
final completer = Completer<void>();
_waitQueue.add(completer);
await completer.future;
}
void unlock() {
if (_waitQueue.isNotEmpty) {
final completer = _waitQueue.removeAt(0);
completer.complete();
} else {
_locked = false;
}
}
}
// 使用示例
class ConcurrencyExample extends StatefulWidget {
State<ConcurrencyExample> createState() => _ConcurrencyExampleState();
}
class _ConcurrencyExampleState extends State<ConcurrencyExample> {
late Database _database;
final _concurrencyManager = ConcurrencyControlManager();
String _status = 'Ready';
Future<void> _testConcurrentUpdate() async {
setState(() => _status = 'Testing concurrent updates...');
try {
// 创建多个并发更新任务
final futures = <Future>[];
for (int i = 0; i < 5; i++) {
futures.add(
_concurrencyManager.updateWithLock(
_database,
'accounts',
1,
{'balance': 100.0 + i},
),
);
}
await Future.wait(futures);
setState(() => _status = 'Concurrent updates completed successfully');
} catch (e) {
setState(() => _status = 'Error: $e');
}
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Concurrency Control')),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(_status),
SizedBox(height: 16),
ElevatedButton(
onPressed: _testConcurrentUpdate,
child: Text('Test Concurrent Update'),
),
],
),
),
);
}
}
这段代码实现了一个并发控制管理器,使用互斥锁来保护对数据库行的访问。当多个线程同时尝试更新同一行时,只有一个线程能够获得锁,其他线程需要等待。
原生端并发控制示例:
// 并发控制管理器
export class ConcurrencyControlManager {
private locks: Map<string, boolean> = new Map();
private waitQueues: Map<string, Array<() => void>> = new Map();
// 获取行级锁
async acquireRowLock(table: string, rowId: number): Promise<void> {
const lockKey = `${table}:${rowId}`;
if (!this.locks.has(lockKey)) {
this.locks.set(lockKey, false);
}
if (!this.locks.get(lockKey)) {
this.locks.set(lockKey, true);
return;
}
// 等待锁释放
return new Promise((resolve) => {
if (!this.waitQueues.has(lockKey)) {
this.waitQueues.set(lockKey, []);
}
this.waitQueues.get(lockKey)!.push(resolve);
});
}
// 释放行级锁
releaseRowLock(table: string, rowId: number): void {
const lockKey = `${table}:${rowId}`;
const waitQueue = this.waitQueues.get(lockKey);
if (waitQueue && waitQueue.length > 0) {
const resolve = waitQueue.shift();
resolve?.();
} else {
this.locks.set(lockKey, false);
}
}
// 执行并发安全的更新
async updateWithLock(
db: any,
table: string,
rowId: number,
values: any
): Promise<boolean> {
await this.acquireRowLock(table, rowId);
try {
const result = await db.execute(
`UPDATE ${table} SET ${Object.keys(values).map(k => `${k} = ?`).join(', ')} WHERE id = ?`,
[...Object.values(values), rowId]
);
return result > 0;
} finally {
this.releaseRowLock(table, rowId);
}
}
}
在原生端,我们实现了相同的并发控制机制。
最佳实践
- 使用锁:使用互斥锁或其他同步机制保护共享资源。
- 最小化临界区:尽量减少持有锁的时间。
- 避免死锁:在使用多个锁时,要注意避免死锁。
问题3:跨平台事务同步困难
问题描述
在混合开发中,Flutter端和原生端可能各自执行事务。如果这些事务涉及相同的数据,就需要确保它们之间的一致性。例如,Flutter端执行了一个转账事务,原生端也需要同步这个事务的结果。
根本原因
两个框架各自管理自己的数据库事务,它们之间没有自动的同步机制。当一端执行事务时,另一端可能不知道这个事务的结果。
解决方案
跨平台事务同步示例:
// 定义事务日志
class TransactionLog {
final int id;
final String transactionId;
final String operation;
final Map<String, dynamic> data;
final String status; // 'pending', 'completed', 'failed'
final DateTime timestamp;
TransactionLog({
required this.id,
required this.transactionId,
required this.operation,
required this.data,
required this.status,
required this.timestamp,
});
Map<String, dynamic> toMap() {
return {
'transaction_id': transactionId,
'operation': operation,
'data': jsonEncode(data),
'status': status,
'timestamp': timestamp.millisecondsSinceEpoch,
};
}
}
// 跨平台事务同步管理器
class CrossPlatformTransactionManager {
static final CrossPlatformTransactionManager _instance =
CrossPlatformTransactionManager._internal();
factory CrossPlatformTransactionManager() {
return _instance;
}
CrossPlatformTransactionManager._internal();
// 执行事务并同步到原生端
Future<bool> executeAndSync(
Database db,
String transactionId,
String operation,
Map<String, dynamic> data,
Future<void> Function(Transaction) transactionFn,
) async {
try {
// 记录事务为pending
await db.insert('transaction_logs', {
'transaction_id': transactionId,
'operation': operation,
'data': jsonEncode(data),
'status': 'pending',
'timestamp': DateTime.now().millisecondsSinceEpoch,
});
// 执行事务
await db.transaction((txn) async {
await transactionFn(txn);
});
// 更新事务状态为completed
await db.update(
'transaction_logs',
{'status': 'completed'},
where: 'transaction_id = ?',
whereArgs: [transactionId],
);
// 同步到原生端
await _syncToNative(transactionId, operation, data);
return true;
} catch (e) {
// 更新事务状态为failed
await db.update(
'transaction_logs',
{'status': 'failed'},
where: 'transaction_id = ?',
whereArgs: [transactionId],
);
print('Transaction failed: $e');
return false;
}
}
// 同步到原生端
Future<void> _syncToNative(
String transactionId,
String operation,
Map<String, dynamic> data,
) async {
try {
await PlatformChannelManager.methodChannel.invokeMethod(
'syncTransaction',
{
'transactionId': transactionId,
'operation': operation,
'data': data,
},
);
} on PlatformException catch (e) {
print('Error syncing transaction: ${e.message}');
}
}
// 处理来自原生端的事务同步
Future<void> handleNativeTransaction(
Database db,
Map<dynamic, dynamic> transactionData,
) async {
final transactionId = transactionData['transactionId'] as String;
final operation = transactionData['operation'] as String;
final data = Map<String, dynamic>.from(transactionData['data'] as Map);
// 检查事务是否已存在
final existing = await db.query(
'transaction_logs',
where: 'transaction_id = ?',
whereArgs: [transactionId],
);
if (existing.isNotEmpty) {
print('Transaction already exists: $transactionId');
return;
}
// 记录事务
await db.insert('transaction_logs', {
'transaction_id': transactionId,
'operation': operation,
'data': jsonEncode(data),
'status': 'completed',
'timestamp': DateTime.now().millisecondsSinceEpoch,
});
}
}
// 使用示例
class CrossPlatformTransactionExample extends StatefulWidget {
State<CrossPlatformTransactionExample> createState() =>
_CrossPlatformTransactionExampleState();
}
class _CrossPlatformTransactionExampleState extends State<CrossPlatformTransactionExample> {
late Database _database;
final _transactionManager = CrossPlatformTransactionManager();
String _status = 'Ready';
Future<void> _performCrossplatformTransaction() async {
setState(() => _status = 'Performing transaction...');
final transactionId = 'txn_${DateTime.now().millisecondsSinceEpoch}';
final success = await _transactionManager.executeAndSync(
_database,
transactionId,
'transfer',
{'from': 1, 'to': 2, 'amount': 100},
(txn) async {
// 执行事务逻辑
await txn.update(
'accounts',
{'balance': 900},
where: 'id = ?',
whereArgs: [1],
);
},
);
setState(() {
_status = success ? 'Transaction completed' : 'Transaction failed';
});
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Cross-Platform Transaction')),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(_status),
SizedBox(height: 16),
ElevatedButton(
onPressed: _performCrossplatformTransaction,
child: Text('Perform Transaction'),
),
],
),
),
);
}
}
这段代码实现了一个跨平台事务同步管理器。当Flutter端执行事务时,我们记录事务日志,然后通过平台通道同步到原生端。这样可以确保两端的事务保持一致。
最佳实践
- 事务日志:记录所有事务的执行情况,便于追踪和恢复。
- 同步机制:建立清晰的同步机制,确保两端的事务一致。
- 重试机制:对于失败的同步,实现重试机制确保最终一致性。
总结
数据库事务与并发控制是混合开发中的关键问题。通过正确的事务处理、并发控制和跨平台同步机制,可以确保数据的一致性和应用的稳定性。在实际开发中,建议在项目初期就建立完善的事务管理框架,避免后期的数据一致性问题。
欢迎加入开源鸿蒙PC社区:https://harmonypc.csdn.net/
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)