问题背景

在混合开发应用中,数据库事务和并发控制是确保数据一致性和应用稳定性的关键。当多个线程或进程同时访问数据库时,如果没有适当的并发控制机制,会导致数据竞争、脏读、幻读等问题。在混合开发中,Flutter端和原生端可能同时访问同一个数据库,这进一步增加了并发控制的复杂性。

问题1:事务处理不当导致数据不一致

问题描述

在某些业务场景中,需要执行多个数据库操作,这些操作要么全部成功,要么全部失败。例如,转账操作需要从一个账户扣款,同时向另一个账户加款。如果只执行了扣款但没有执行加款,就会导致数据不一致。如果没有使用事务来保证这些操作的原子性,就容易出现这种问题。

根本原因

事务处理不当通常是由于以下原因:没有使用事务、事务范围过大或过小、没有正确处理事务异常等。在混合开发中,如果Flutter端和原生端分别处理事务,可能导致两端的数据不一致。

解决方案

事务管理示例:

// 定义事务操作结果
class TransactionResult<T> {
  final bool success;
  final T? data;
  final String? error;
  final DateTime timestamp;
  
  TransactionResult({
    required this.success,
    this.data,
    this.error,
    required this.timestamp,
  });
}

// 事务管理器
class TransactionManager {
  static final TransactionManager _instance = TransactionManager._internal();
  
  factory TransactionManager() {
    return _instance;
  }
  
  TransactionManager._internal();
  
  // 执行转账事务
  Future<TransactionResult<Map<String, dynamic>>> transferMoney(
    Database db,
    int fromAccountId,
    int toAccountId,
    double amount,
  ) async {
    try {
      final result = await db.transaction((txn) async {
        // 检查源账户余额
        final fromAccount = await txn.query(
          'accounts',
          where: 'id = ?',
          whereArgs: [fromAccountId],
        );
        
        if (fromAccount.isEmpty) {
          throw Exception('Source account not found');
        }
        
        final fromBalance = (fromAccount[0]['balance'] as num).toDouble();
        if (fromBalance < amount) {
          throw Exception('Insufficient balance');
        }
        
        // 从源账户扣款
        await txn.update(
          'accounts',
          {'balance': fromBalance - amount},
          where: 'id = ?',
          whereArgs: [fromAccountId],
        );
        
        // 向目标账户加款
        final toAccount = await txn.query(
          'accounts',
          where: 'id = ?',
          whereArgs: [toAccountId],
        );
        
        if (toAccount.isEmpty) {
          throw Exception('Target account not found');
        }
        
        final toBalance = (toAccount[0]['balance'] as num).toDouble();
        await txn.update(
          'accounts',
          {'balance': toBalance + amount},
          where: 'id = ?',
          whereArgs: [toAccountId],
        );
        
        // 记录交易日志
        await txn.insert('transactions', {
          'from_account_id': fromAccountId,
          'to_account_id': toAccountId,
          'amount': amount,
          'timestamp': DateTime.now().millisecondsSinceEpoch,
          'status': 'completed',
        });
        
        return {
          'fromAccountId': fromAccountId,
          'toAccountId': toAccountId,
          'amount': amount,
          'status': 'success',
        };
      });
      
      return TransactionResult(
        success: true,
        data: result,
        timestamp: DateTime.now(),
      );
    } catch (e) {
      print('Transaction failed: $e');
      return TransactionResult(
        success: false,
        error: e.toString(),
        timestamp: DateTime.now(),
      );
    }
  }
  
  // 批量插入事务
  Future<TransactionResult<int>> batchInsert(
    Database db,
    String table,
    List<Map<String, dynamic>> rows,
  ) async {
    try {
      final count = await db.transaction((txn) async {
        int insertedCount = 0;
        for (final row in rows) {
          await txn.insert(table, row);
          insertedCount++;
        }
        return insertedCount;
      });
      
      return TransactionResult(
        success: true,
        data: count,
        timestamp: DateTime.now(),
      );
    } catch (e) {
      return TransactionResult(
        success: false,
        error: e.toString(),
        timestamp: DateTime.now(),
      );
    }
  }
}

// 使用示例
class TransactionExample extends StatefulWidget {
  
  State<TransactionExample> createState() => _TransactionExampleState();
}

class _TransactionExampleState extends State<TransactionExample> {
  late Database _database;
  final _transactionManager = TransactionManager();
  String _status = 'Ready';
  
  
  void initState() {
    super.initState();
    _initializeDatabase();
  }
  
  Future<void> _initializeDatabase() async {
    final databasesPath = await getDatabasesPath();
    final path = join(databasesPath, 'transaction_db.db');
    
    _database = await openDatabase(path, version: 1, onCreate: (db, version) async {
      await db.execute('''
        CREATE TABLE accounts (
          id INTEGER PRIMARY KEY,
          name TEXT,
          balance REAL
        )
      ''');
      
      await db.execute('''
        CREATE TABLE transactions (
          id INTEGER PRIMARY KEY,
          from_account_id INTEGER,
          to_account_id INTEGER,
          amount REAL,
          timestamp INTEGER,
          status TEXT
        )
      ''');
    });
  }
  
  Future<void> _performTransfer() async {
    setState(() => _status = 'Transferring...');
    
    final result = await _transactionManager.transferMoney(
      _database,
      1,
      2,
      100.0,
    );
    
    setState(() {
      _status = result.success ? 'Transfer successful' : 'Transfer failed: ${result.error}';
    });
  }
  
  
  void dispose() {
    _database.close();
    super.dispose();
  }
  
  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Transaction Management')),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            Text(_status),
            SizedBox(height: 16),
            ElevatedButton(
              onPressed: _performTransfer,
              child: Text('Transfer Money'),
            ),
          ],
        ),
      ),
    );
  }
}

这段代码实现了一个事务管理器,用于处理复杂的多步骤数据库操作。转账操作在一个事务中执行,确保要么全部成功,要么全部失败。如果任何步骤失败,整个事务会被回滚,保证数据一致性。

原生端事务处理示例:

// 事务管理器
export class TransactionManager {
  // 执行转账事务
  async transferMoney(
    db: any,
    fromAccountId: number,
    toAccountId: number,
    amount: number
  ): Promise<any> {
    return await db.transaction(async (txn: any) => {
      // 检查源账户
      const fromAccount = await txn.query(
        'SELECT * FROM accounts WHERE id = ?',
        [fromAccountId]
      );
      
      if (!fromAccount || fromAccount.length === 0) {
        throw new Error('Source account not found');
      }
      
      const fromBalance = fromAccount[0].balance;
      if (fromBalance < amount) {
        throw new Error('Insufficient balance');
      }
      
      // 扣款
      await txn.execute(
        'UPDATE accounts SET balance = balance - ? WHERE id = ?',
        [amount, fromAccountId]
      );
      
      // 加款
      await txn.execute(
        'UPDATE accounts SET balance = balance + ? WHERE id = ?',
        [amount, toAccountId]
      );
      
      // 记录交易
      await txn.insert('transactions', {
        from_account_id: fromAccountId,
        to_account_id: toAccountId,
        amount: amount,
        timestamp: Date.now(),
        status: 'completed',
      });
      
      return { status: 'success' };
    });
  }
}

在原生端,我们使用相同的事务模式来确保数据一致性。

最佳实践

  1. 原子性操作:将相关的操作放在一个事务中,确保原子性。
  2. 异常处理:在事务中添加适当的异常处理,确保失败时能够正确回滚。
  3. 事务范围:保持事务范围尽可能小,避免长时间持有锁。

问题2:并发访问导致数据竞争

问题描述

当多个线程或进程同时访问同一个数据库时,如果没有适当的并发控制,会导致数据竞争。例如,两个线程同时读取和修改同一条记录,可能导致其中一个线程的修改被另一个线程覆盖。

根本原因

SQLite等嵌入式数据库通常使用文件级锁,不支持行级锁。当多个线程同时访问时,容易导致锁竞争。在混合开发中,Flutter端和原生端可能同时访问数据库,加剧了这个问题。

解决方案

并发控制示例:

// 并发控制管理器
class ConcurrencyControlManager {
  static final ConcurrencyControlManager _instance = ConcurrencyControlManager._internal();
  
  factory ConcurrencyControlManager() {
    return _instance;
  }
  
  ConcurrencyControlManager._internal();
  
  final Map<String, Mutex> _locks = {};
  
  // 获取行级锁
  Future<void> acquireRowLock(String table, int rowId) async {
    final lockKey = '$table:$rowId';
    
    if (!_locks.containsKey(lockKey)) {
      _locks[lockKey] = Mutex();
    }
    
    await _locks[lockKey]!.lock();
  }
  
  // 释放行级锁
  void releaseRowLock(String table, int rowId) {
    final lockKey = '$table:$rowId';
    _locks[lockKey]?.unlock();
  }
  
  // 执行并发安全的更新
  Future<bool> updateWithLock(
    Database db,
    String table,
    int rowId,
    Map<String, dynamic> values,
  ) async {
    await acquireRowLock(table, rowId);
    
    try {
      final result = await db.update(
        table,
        values,
        where: 'id = ?',
        whereArgs: [rowId],
      );
      
      return result > 0;
    } finally {
      releaseRowLock(table, rowId);
    }
  }
}

// 简单的互斥锁实现
class Mutex {
  bool _locked = false;
  final List<Completer<void>> _waitQueue = [];
  
  Future<void> lock() async {
    if (!_locked) {
      _locked = true;
      return;
    }
    
    final completer = Completer<void>();
    _waitQueue.add(completer);
    await completer.future;
  }
  
  void unlock() {
    if (_waitQueue.isNotEmpty) {
      final completer = _waitQueue.removeAt(0);
      completer.complete();
    } else {
      _locked = false;
    }
  }
}

// 使用示例
class ConcurrencyExample extends StatefulWidget {
  
  State<ConcurrencyExample> createState() => _ConcurrencyExampleState();
}

class _ConcurrencyExampleState extends State<ConcurrencyExample> {
  late Database _database;
  final _concurrencyManager = ConcurrencyControlManager();
  String _status = 'Ready';
  
  Future<void> _testConcurrentUpdate() async {
    setState(() => _status = 'Testing concurrent updates...');
    
    try {
      // 创建多个并发更新任务
      final futures = <Future>[];
      
      for (int i = 0; i < 5; i++) {
        futures.add(
          _concurrencyManager.updateWithLock(
            _database,
            'accounts',
            1,
            {'balance': 100.0 + i},
          ),
        );
      }
      
      await Future.wait(futures);
      
      setState(() => _status = 'Concurrent updates completed successfully');
    } catch (e) {
      setState(() => _status = 'Error: $e');
    }
  }
  
  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Concurrency Control')),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            Text(_status),
            SizedBox(height: 16),
            ElevatedButton(
              onPressed: _testConcurrentUpdate,
              child: Text('Test Concurrent Update'),
            ),
          ],
        ),
      ),
    );
  }
}

这段代码实现了一个并发控制管理器,使用互斥锁来保护对数据库行的访问。当多个线程同时尝试更新同一行时,只有一个线程能够获得锁,其他线程需要等待。

原生端并发控制示例:

// 并发控制管理器
export class ConcurrencyControlManager {
  private locks: Map<string, boolean> = new Map();
  private waitQueues: Map<string, Array<() => void>> = new Map();
  
  // 获取行级锁
  async acquireRowLock(table: string, rowId: number): Promise<void> {
    const lockKey = `${table}:${rowId}`;
    
    if (!this.locks.has(lockKey)) {
      this.locks.set(lockKey, false);
    }
    
    if (!this.locks.get(lockKey)) {
      this.locks.set(lockKey, true);
      return;
    }
    
    // 等待锁释放
    return new Promise((resolve) => {
      if (!this.waitQueues.has(lockKey)) {
        this.waitQueues.set(lockKey, []);
      }
      this.waitQueues.get(lockKey)!.push(resolve);
    });
  }
  
  // 释放行级锁
  releaseRowLock(table: string, rowId: number): void {
    const lockKey = `${table}:${rowId}`;
    const waitQueue = this.waitQueues.get(lockKey);
    
    if (waitQueue && waitQueue.length > 0) {
      const resolve = waitQueue.shift();
      resolve?.();
    } else {
      this.locks.set(lockKey, false);
    }
  }
  
  // 执行并发安全的更新
  async updateWithLock(
    db: any,
    table: string,
    rowId: number,
    values: any
  ): Promise<boolean> {
    await this.acquireRowLock(table, rowId);
    
    try {
      const result = await db.execute(
        `UPDATE ${table} SET ${Object.keys(values).map(k => `${k} = ?`).join(', ')} WHERE id = ?`,
        [...Object.values(values), rowId]
      );
      
      return result > 0;
    } finally {
      this.releaseRowLock(table, rowId);
    }
  }
}

在原生端,我们实现了相同的并发控制机制。

最佳实践

  1. 使用锁:使用互斥锁或其他同步机制保护共享资源。
  2. 最小化临界区:尽量减少持有锁的时间。
  3. 避免死锁:在使用多个锁时,要注意避免死锁。

问题3:跨平台事务同步困难

问题描述

在混合开发中,Flutter端和原生端可能各自执行事务。如果这些事务涉及相同的数据,就需要确保它们之间的一致性。例如,Flutter端执行了一个转账事务,原生端也需要同步这个事务的结果。

根本原因

两个框架各自管理自己的数据库事务,它们之间没有自动的同步机制。当一端执行事务时,另一端可能不知道这个事务的结果。

解决方案

跨平台事务同步示例:

// 定义事务日志
class TransactionLog {
  final int id;
  final String transactionId;
  final String operation;
  final Map<String, dynamic> data;
  final String status; // 'pending', 'completed', 'failed'
  final DateTime timestamp;
  
  TransactionLog({
    required this.id,
    required this.transactionId,
    required this.operation,
    required this.data,
    required this.status,
    required this.timestamp,
  });
  
  Map<String, dynamic> toMap() {
    return {
      'transaction_id': transactionId,
      'operation': operation,
      'data': jsonEncode(data),
      'status': status,
      'timestamp': timestamp.millisecondsSinceEpoch,
    };
  }
}

// 跨平台事务同步管理器
class CrossPlatformTransactionManager {
  static final CrossPlatformTransactionManager _instance = 
      CrossPlatformTransactionManager._internal();
  
  factory CrossPlatformTransactionManager() {
    return _instance;
  }
  
  CrossPlatformTransactionManager._internal();
  
  // 执行事务并同步到原生端
  Future<bool> executeAndSync(
    Database db,
    String transactionId,
    String operation,
    Map<String, dynamic> data,
    Future<void> Function(Transaction) transactionFn,
  ) async {
    try {
      // 记录事务为pending
      await db.insert('transaction_logs', {
        'transaction_id': transactionId,
        'operation': operation,
        'data': jsonEncode(data),
        'status': 'pending',
        'timestamp': DateTime.now().millisecondsSinceEpoch,
      });
      
      // 执行事务
      await db.transaction((txn) async {
        await transactionFn(txn);
      });
      
      // 更新事务状态为completed
      await db.update(
        'transaction_logs',
        {'status': 'completed'},
        where: 'transaction_id = ?',
        whereArgs: [transactionId],
      );
      
      // 同步到原生端
      await _syncToNative(transactionId, operation, data);
      
      return true;
    } catch (e) {
      // 更新事务状态为failed
      await db.update(
        'transaction_logs',
        {'status': 'failed'},
        where: 'transaction_id = ?',
        whereArgs: [transactionId],
      );
      
      print('Transaction failed: $e');
      return false;
    }
  }
  
  // 同步到原生端
  Future<void> _syncToNative(
    String transactionId,
    String operation,
    Map<String, dynamic> data,
  ) async {
    try {
      await PlatformChannelManager.methodChannel.invokeMethod(
        'syncTransaction',
        {
          'transactionId': transactionId,
          'operation': operation,
          'data': data,
        },
      );
    } on PlatformException catch (e) {
      print('Error syncing transaction: ${e.message}');
    }
  }
  
  // 处理来自原生端的事务同步
  Future<void> handleNativeTransaction(
    Database db,
    Map<dynamic, dynamic> transactionData,
  ) async {
    final transactionId = transactionData['transactionId'] as String;
    final operation = transactionData['operation'] as String;
    final data = Map<String, dynamic>.from(transactionData['data'] as Map);
    
    // 检查事务是否已存在
    final existing = await db.query(
      'transaction_logs',
      where: 'transaction_id = ?',
      whereArgs: [transactionId],
    );
    
    if (existing.isNotEmpty) {
      print('Transaction already exists: $transactionId');
      return;
    }
    
    // 记录事务
    await db.insert('transaction_logs', {
      'transaction_id': transactionId,
      'operation': operation,
      'data': jsonEncode(data),
      'status': 'completed',
      'timestamp': DateTime.now().millisecondsSinceEpoch,
    });
  }
}

// 使用示例
class CrossPlatformTransactionExample extends StatefulWidget {
  
  State<CrossPlatformTransactionExample> createState() => 
      _CrossPlatformTransactionExampleState();
}

class _CrossPlatformTransactionExampleState extends State<CrossPlatformTransactionExample> {
  late Database _database;
  final _transactionManager = CrossPlatformTransactionManager();
  String _status = 'Ready';
  
  Future<void> _performCrossplatformTransaction() async {
    setState(() => _status = 'Performing transaction...');
    
    final transactionId = 'txn_${DateTime.now().millisecondsSinceEpoch}';
    
    final success = await _transactionManager.executeAndSync(
      _database,
      transactionId,
      'transfer',
      {'from': 1, 'to': 2, 'amount': 100},
      (txn) async {
        // 执行事务逻辑
        await txn.update(
          'accounts',
          {'balance': 900},
          where: 'id = ?',
          whereArgs: [1],
        );
      },
    );
    
    setState(() {
      _status = success ? 'Transaction completed' : 'Transaction failed';
    });
  }
  
  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Cross-Platform Transaction')),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            Text(_status),
            SizedBox(height: 16),
            ElevatedButton(
              onPressed: _performCrossplatformTransaction,
              child: Text('Perform Transaction'),
            ),
          ],
        ),
      ),
    );
  }
}

这段代码实现了一个跨平台事务同步管理器。当Flutter端执行事务时,我们记录事务日志,然后通过平台通道同步到原生端。这样可以确保两端的事务保持一致。

最佳实践

  1. 事务日志:记录所有事务的执行情况,便于追踪和恢复。
  2. 同步机制:建立清晰的同步机制,确保两端的事务一致。
  3. 重试机制:对于失败的同步,实现重试机制确保最终一致性。

总结

数据库事务与并发控制是混合开发中的关键问题。通过正确的事务处理、并发控制和跨平台同步机制,可以确保数据的一致性和应用的稳定性。在实际开发中,建议在项目初期就建立完善的事务管理框架,避免后期的数据一致性问题。

欢迎加入开源鸿蒙PC社区:https://harmonypc.csdn.net/

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐