在这里插入图片描述

Stream转换与数据处理

一、Stream转换操作符体系

Stream转换操作符是Dart Stream API的核心组成部分,它们提供了强大的数据流处理能力,让开发者能够以声明式的方式对异步数据流进行转换、过滤、组合等操作。与传统的命令式编程不同,Stream操作符采用函数式编程范式,通过链式调用将多个操作组合在一起,形成清晰的数据处理管道。这种设计不仅提高了代码的可读性和可维护性,还使得异步数据流的处理变得简洁优雅。

Stream操作符主要分为几大类:转换操作符、过滤操作符、集合操作符、错误处理操作符、工具操作符等。转换操作符(如map、expand、scan)用于改变流中每个元素的值或结构;过滤操作符(如where、take、skip)用于控制流中元素的通过;集合操作符(如fold、reduce、toList)用于将流聚合为单个值或集合;错误处理操作符(如handleError、catchError)用于处理流中的错误;工具操作符(如debounce、throttle、distinct)用于控制流的特性和行为。

Stream操作符的一个重要特性是惰性求值。操作符本身不会立即处理数据,只有在Stream被订阅时才会开始工作。这意味着可以构建复杂的处理管道而不会产生任何计算开销,直到真正需要数据时才开始处理。另一个重要特性是不可变性,每个操作符都返回一个新的Stream,原始的Stream不会被修改,这保证了操作的幂等性和可预测性。

原始Stream

转换操作符

过滤操作符

聚合操作符

工具操作符

最终Stream

map

expand

scan

where

take

skip

fold

reduce

toList

debounce

throttle

distinct

二、核心转换操作符详解

2.1 map - 元素转换

map是最基础也是最常用的转换操作符,它对流中的每个元素应用一个转换函数,返回包含转换后元素的新Stream。map是1对1的转换,输出Stream的元素数量与输入Stream相同。map转换函数可以改变元素的类型,这使得类型转换成为可能,比如将Stream转换为Stream。

map的转换函数可以是同步的,也可以是异步的。异步的map使用mapAsync或asyncMap方法,它们接受一个返回Future的转换函数,可以处理包含异步逻辑的转换场景。需要注意的是,异步的map会保持输入元素的顺序,即使不同元素的转换完成时间不同。

// 基础map使用
Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5]);

// 将数字翻倍
Stream<int> doubled = numbers.map((n) => n * 2);
// 输出: 2, 4, 6, 8, 10

// 类型转换:数字转字符串
Stream<String> strings = numbers.map((n) => '数字: $n');
// 输出: "数字: 1", "数字: 2", ...

// 复杂转换:对象转JSON
Stream<String> usersToJson(List<User> users) {
  return Stream.fromIterable(users).map((user) {
    return jsonEncode({
      'id': user.id,
      'name': user.name,
      'email': user.email,
    });
  });
}

2.2 where - 条件过滤

where(也叫filter)操作符用于过滤流中的元素,只保留满足条件的元素。where接受一个断言函数,该函数返回布尔值,true表示保留该元素,false表示丢弃该元素。where是数据过滤的核心操作符,在许多场景中都非常有用,比如过滤无效数据、筛选特定范围的值、排除异常值等。

where的断言函数应该是纯函数,不依赖于外部状态,并且没有副作用。这样可以保证where操作的可预测性和可测试性。如果where条件复杂,可以考虑将其提取为命名函数,提高代码的可读性。

Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 过滤偶数
Stream<int> oddNumbers = numbers.where((n) => n % 2 != 0);
// 输出: 1, 3, 5, 7, 9

// 过滤范围
Stream<int> inRange = numbers.where((n) => n >= 3 && n <= 8);
// 输出: 3, 4, 5, 6, 7, 8

// 复杂条件:过滤有效数据
Stream<User> filterValidUsers(Stream<User> users) {
  return users.where((user) {
    return user.email != null &&
           user.email!.isNotEmpty &&
           user.age >= 18 &&
           user.age <= 65;
  });
}

// 多条件组合
Stream<int> complexFilter = numbers.where((n) {
  return n % 2 == 0 &&      // 偶数
         n > 2 &&            // 大于2
         n < 10;           // 小于10
});
// 输出: 4, 6, 8

2.3 expand - 展开元素

expand操作符用于将流中的每个元素展开为多个元素,从而改变流的元素数量。与map的1对1转换不同,expand可以实现1对多的转换,这是它最强大的特性。expand的转换函数返回一个Iterable,expand会将这个Iterable中的所有元素依次发送到输出流中。这使得expand非常适合用于展开列表、矩阵、树结构等包含嵌套数据的场景。

expand的一个典型应用是展开List<List>为Stream。比如有一个包含多个列表的Stream,使用expand可以将其扁平化为包含所有元素的Stream。另一个应用是对字符串进行字符级处理,将每个字符串展开为其字符序列。

// 展开列表
Stream<List<int>> listOfLists = Stream.fromIterable([
  [1, 2, 3],
  [4, 5, 6],
  [7, 8, 9],
]);

// 扁平化为单个数字流
Stream<int> flattened = listOfLists.expand((list) => list);
// 输出: 1, 2, 3, 4, 5, 6, 7, 8, 9

// 字符串展开为字符
Stream<String> words = Stream.fromIterable(['hello', 'world']);
Stream<String> characters = words.expand((word) => word.split(''));
// 输出: h, e, l, l,, o, w, o, r, l, d

// 展开对象属性
Stream<User> users = Stream.fromIterable([...]);
Stream<String> emails = users.expand((user) {
  // 将用户的多个联系方式展开
  final contacts = <String>[];
  if (user.email != null) contacts.add(user.email!);
  if (user.phone != null) contacts.add(user.phone!);
  return contacts;
});

// 矩阵展开
Stream<List<List<int>>> matrix = Stream.fromIterable([
  [[1, 2], [3, 4]],
  [[5, 6], [7, 8]],
]);

Stream<int> matrixFlattened = matrix.expand((row) {
  return row.expand((cell) => cell);
});
// 输出: 1, 2, 3, 4, 5, 6, 7, 8

2.4 take/takeWhile - 数量限制

take和takeWhile是控制流输出数量的操作符。take接受一个整数n,只保留流的前n个元素,之后立即关闭流。takeWhile接受一个断言函数,持续保留元素直到断言返回false,然后关闭流。这两个操作符在需要限制数据处理量或提前终止流的场景中非常有用。

take的一个常见用途是获取数据的前几项,比如分页的第一页、排行榜的前N名、最近的消息等。takeWhile则更适合用于处理有序数据流,比如读取配置文件直到遇到特定标记、处理日志直到遇到错误等。

Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 只取前3个
Stream<int> firstThree = numbers.take(3);
// 输出: 1, 2, 3

// takeWhile:取到遇到大于5的数之前
Stream<int> takeUntil5 = numbers.takeWhile((n) => n <= 5);
// 输出: 1, 2, 3, 4, 5

// 实际应用:分页
Stream<T> paginate(Stream<T> source, int page, int pageSize) {
  return source.skip(page * pageSize).take(pageSize);
}

// 使用示例
Stream<int> page1 = paginate(numbers, 0, 3); // 1, 2, 3
Stream<int> page2 = paginate(numbers, 1, 3); // 4, 5, 6

// takeWhile实际应用:读取配置直到注释
Stream<String> configLines = Stream.fromIterable([
  'host=localhost',
  'port=5432',
  '# 以下是高级配置',
  'advanced=true',
]);

Stream<String> basicConfig = configLines.takeWhile((line) {
  return !line.startsWith('#');
});
// 输出: "host=localhost", "port=5432"

2.5 skip/skipWhile - 跳过元素

skip和skipWhile是take/takeWhile的相反操作,用于跳过流中的元素。skip接受一个整数n,跳过前n个元素,之后正常输出。skipWhile接受一个断言函数,持续跳过元素直到断言返回false,然后正常输出剩余的元素。这两个操作符在需要忽略部分数据或处理分页数据时非常有用。

skip的一个典型应用是实现分页功能,跳过前面页的数据,只返回当前页的数据。另一个应用是跳过标题行或元数据行,只处理实际的数据行。skipWhile则适合用于处理有序数据,比如跳过已处理的记录、跳过特定值之前的数据等。

Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 跳过前5个
Stream<int> afterFive = numbers.skip(5);
// 输出: 6, 7, 8, 9, 10

// skipWhile:跳过小于5的数
Stream<int> fromFive = numbers.skipWhile((n) => n < 5);
// 输出: 5, 6, 7, 8, 9, 10

// 实际应用:分页第二页
Stream<T> getPage(Stream<T> source, int page, int pageSize) {
  final startIndex = page * pageSize;
  return source.skip(startIndex).take(pageSize);
}

// 使用示例
Stream<int> page0 = getPage(numbers, 0, 3); // 1, 2, 3
Stream<int> page1 = getPage(numbers, 1, 3); // 4, 5, 6
Stream<int> page2 = getPage(numbers, 2, 3); // 7, 8, 9

// skipWhile实际应用:跳过标题和空行
Stream<String> csvLines = Stream.fromIterable([
  'ID,Name,Email',
  '',
  '1,张三,zhangsan@example.com',
  '2,李四,lisi@example.com',
]);

Stream<String> dataLines = csvLines.skipWhile((line) {
  return line.isEmpty || line.startsWith('#') || line.startsWith(',');
});
// 输出: "1,张三,zhangsan@example.com", "2,李四,lisi@example.com"

三、聚合操作符详解

3.1 fold - 累积计算

fold是强大的聚合操作符,它使用一个初始值和累积函数,将流中的所有元素累积为单个值。fold接受两个参数:初始值和累积函数,累积函数接受当前累积值和当前元素,返回新的累积值。fold在需要对流进行求和、计算平均值、构建复杂对象等聚合操作时非常有用。

fold的一个特点是它总是返回一个值,即使流是空的也会返回初始值。这使得fold的行为更加可预测,不需要处理空流的情况。fold的累积函数可以是同步的,也可以是异步的,异步版本使用foldAsync或asyncFold方法。

Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5]);

// 求和
Future<int> sum = numbers.fold(0, (acc, n) => acc + n);
// 结果: 15

// 求积
Future<int> product = numbers.fold(1, (acc, n) => acc * n);
// 结果: 120

// 构建Map
Stream<User> users = Stream.fromIterable([...]);
Future<Map<String, User>> userMap = users.fold(
  {},
  (map, user) {
    map[user.id] = user;
    return map;
  },
);

// 复杂聚合:计算统计信息
Future<Statistics> calculateStats(Stream<int> numbers) {
  return numbers.fold(
    Statistics(),
    (stats, n) {
      stats.count++;
      stats.sum += n;
      stats.min = stats.min == null ? n : min(stats.min!, n);
      stats.max = stats.max == null ? n : max(stats.max!, n);
      return stats;
    },
  );
}

class Statistics {
  int count = 0;
  int sum = 0;
  int? min;
  int? max;
  double get average => count > 0 ? sum / count : 0;
}

3.2 reduce - 归约计算

reduce与fold类似,也用于聚合流中的所有元素,但reduce不使用初始值,而是使用流的第一个元素作为初始累积值。这意味着reduce要求流至少包含一个元素,如果流是空的,reduce会抛出StateError异常。reduce在某些场景下比fold更简洁,比如求最大值、最小值、连接字符串等操作。

reduce的一个重要特性是它可以直接使用流的元素类型作为累积值的类型,而fold需要显式指定初始值的类型。这使得reduce的类型推导更加自然。但需要注意的是,如果流可能为空,应该使用fold而不是reduce,或者在调用reduce之前检查流是否为空。

Stream<int> numbers = Stream.fromIterable([3, 7, 2, 9, 5]);

// 求最大值
Future<int> max = numbers.reduce((a, b) => a > b ? a : b);
// 结果: 9

// 求最小值
Future<int> min = numbers.reduce((a, b) => a < b ? a : b);
// 结果: 2

// 连接字符串
Stream<String> words = Stream.fromIterable(['Hello', ' ', 'World', '!']);
Future<String> sentence = words.reduce((a, b) => a + b);
// 结果: "Hello World!"

// 找出符合条件的元素
Future<int> findLargestEven(Stream<int> numbers) {
  return numbers.reduce((a, b) {
    final isAEven = a % 2 == 0;
    final isBEven = b % 2 == 0;
    if (isAEven && !isBEven) return a;
    if (!isAEven && isBEven) return b;
    return a > b ? a : b;
  });
}

3.3 toList/toSet - 集合转换

toList和toSet是将流转换为集合的操作符。toList将流中的所有元素收集到一个List中,toSet将流中的所有元素收集到一个Set中(自动去重)。这两个操作符会等待流完全结束,然后返回完整的集合。由于需要等待所有元素,toList和toSet通常只用于有限长度的流,对于无限流或可能产生大量元素的流,应该谨慎使用。

toList返回的List保持流中元素的原始顺序,而toSet返回的Set不保证顺序,因为它使用哈希存储。toList适合需要保持顺序、可能包含重复元素的场景,toSet适合需要快速查找、自动去重的场景。

Stream<int> numbers = Stream.fromIterable([3, 1, 4, 1, 5, 9, 2, 6]);

// 转换为List
Future<List<int>> list = numbers.toList();
// 结果: [3, 1, 4, 1, 5, 9, 2, 6]

// 转换为Set(自动去重)
Future<Set<int>> set = numbers.toSet();
// 结果: {3, 1, 4, 5, 9, 2, 6}

// 实际应用:批量查询数据库
Future<List<User>> batchFetchUsers(Stream<String> userIds) async {
  final ids = await userIds.toList();
  // 使用ids批量查询数据库
  return await database.fetchUsers(ids);
}

// 实际应用:获取唯一的标签
Future<List<String>> getUniqueTags(Stream<Article> articles) async {
  final tags = <String>[];
  await for (final article in articles) {
    tags.addAll(article.tags);
  }
  return tags.toSet().toList();
}

四、工具操作符详解

4.1 debounce - 防抖处理

debounce操作符用于抑制短时间内连续的事件,只在事件流暂停一段时间后才输出最后一个事件。debounce的典型应用场景包括:搜索输入框(用户停止输入后再执行搜索)、窗口大小调整(调整完成后再重新计算)、按钮点击(防止重复提交)等。debounce可以显著减少不必要的计算和UI更新,提高应用性能。

debounce接受一个Duration参数,表示等待的时间长度。每当新事件到达时,debounce会重置计时器,只有当距离上一个事件经过了指定的等待时间,才会输出该事件。这意味着如果事件连续不断到达,debounce可能永远不会输出任何事件,直到事件流停止一段时间。

// 模拟用户输入
final textEditingController = StreamController<String>();

// 应用debounce
Stream<String> debouncedInput = textEditingController.stream
  .debounceTime(Duration(milliseconds: 300));

// 监听防抖后的输入
debouncedInput.listen((text) {
  // 只在用户停止输入300ms后执行搜索
  performSearch(text);
});

// 实际应用:窗口大小调整
Stream<void> resizeEvents = Window.onResize;
Stream<void> debouncedResize = resizeEvents
  .debounceTime(Duration(milliseconds: 200));

debouncedResize.listen((_) {
  // 调整完成200ms后才重新计算布局
  recalculateLayout();
});

// 实际应用:滚动停止后加载更多
Stream<ScrollEvent> scrollEvents = ScrollController.stream;
Stream<ScrollEvent> debouncedScroll = scrollEvents
  .debounceTime(Duration(milliseconds: 500));

debouncedScroll.listen((event) {
  // 滚动停止500ms后判断是否需要加载更多
  if (event.isNearBottom) {
    loadMoreItems();
  }
});

4.2 throttle - 节流控制

throttle操作符用于限制事件的输出频率,确保在指定的时间间隔内最多只输出一个事件。与debounce不同,throttle不会延迟第一个事件,而是立即输出第一个事件,然后在指定的时间间隔内忽略后续事件。throttle的典型应用场景包括:滚动事件(限制渲染频率)、鼠标移动事件(限制处理频率)、传感器数据(限制采样频率)等。

throttle有两种常见实现:固定节流和拖尾节流。固定节流在每个时间周期的开始输出一个事件,拖尾节流在每个时间周期的结束输出一个事件。Flutter的StreamController默认使用固定节流,但开发者可以根据需要实现不同的节流策略。

// 模拟滚动事件
final scrollController = StreamController<ScrollEvent>();

// 应用节流,每100ms最多输出一个事件
Stream<ScrollEvent> throttledScroll = scrollController.stream
  .throttleTime(Duration(milliseconds: 100));

// 监听节流后的滚动事件
throttledScroll.listen((event) {
  // 限制渲染频率到最多10fps
  updateUI(event);
});

// 实际应用:限制传感器采样
Stream<AccelerometerData> sensorData = Accelerometer.stream;
Stream<AccelerometerData> sampledData = sensorData
  .throttleTime(Duration(milliseconds: 50)); // 20Hz采样

sampledData.listen((data) {
  // 限制采样频率到20Hz
  updateDataDisplay(data);
});

// 实际应用:鼠标移动
Stream<MouseEvent> mouseMoves = Mouse.onMove;
Stream<MouseEvent> throttledMoves = mouseMoves
  .throttleTime(Duration(milliseconds: 16)); // 60fps

throttledMoves.listen((event) {
  // 限制处理频率到60fps
  updateCursorPosition(event.position);
});

4.3 distinct - 去重处理

distinct操作符用于过滤流中的重复元素,只输出与之前所有元素不同的元素。distinct在处理可能包含重复数据的数据流时非常有用,比如去重、状态变化通知、表单提交等。distinct接受一个可选的比较函数,如果没有提供比较函数,则使用==运算符进行默认比较。

distinct的一个重要特性是它记住流中所有之前输出过的元素,这意味着随着流的长度增加,distinct的内存使用也会增加。对于可能产生大量不同元素的流,应该谨慎使用distinct。如果只需要比较相邻元素,可以考虑使用distinctUnique或其他优化策略。

Stream<int> numbers = Stream.fromIterable([1, 2, 2, 3, 3, 3, 4, 5]);

// 去重
Stream<int> distinctNumbers = numbers.distinct();
// 输出: 1, 2, 3, 4, 5

// 自定义比较函数
Stream<User> distinctUsers = users.distinct((a, b) => a.id == b.id);

// 实际应用:状态变化通知
class StateManager {
  final _stateController = StreamController<AppState>();
  
  Stream<AppState> get stateChanges => _stateController.stream.distinct();
  
  void updateState(AppState newState) {
    _stateController.add(newState);
    // 只有状态真正变化时才会通知监听者
  }
}

// 实际应用:表单去重提交
Stream<void> submitRequests = SubmitButton.clicks;
Stream<void> distinctSubmits = submitRequests.distinct();

distinctSubmits.listen((_) {
  // 只有在按钮状态真正改变时才提交
  performSubmit();
});

五、错误处理操作符

5.1 handleError - 错误捕获

handleError操作符用于处理流中发生的错误,它可以拦截错误并决定如何处理:忽略错误、转换错误为默认值、或重新抛出错误。handleError接受一个错误处理函数和一个可选的测试函数,测试函数用于判断哪些错误应该被处理。这使得开发者可以精确控制错误处理逻辑。

handleError的一个关键特性是它不会自动关闭流,除非错误处理函数决定关闭。这意味着流可以在遇到错误后继续产生事件,这对于容错性强的应用非常重要。但是需要注意,如果错误处理函数抛出新的异常,这个新异常会传播到下游。

Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5])
  .map((n) {
    if (n == 3) throw Exception('Invalid number: $n');
    return n * 2;
  });

// 捕获并处理错误
Stream<int> handledNumbers = numbers.handleError((error, stackTrace) {
  print('捕获错误: $error');
  // 错误已处理,流继续
  return 0; // 为错误返回默认值
});

// 使用测试函数:只处理特定类型的错误
Stream<int> selectivelyHandled = numbers.handleError(
  (error, stackTrace) {
    if (error is NetworkException) {
      // 网络错误,返回默认值
      return -1;
    }
    // 其他错误重新抛出
    throw error;
  },
  test: (error) => error is NetworkException,
);

// 实际应用:重试机制
Stream<T> withRetry<T>(
  Stream<T> source, {
  int maxRetries = 3,
  Duration delay = const Duration(seconds: 1),
}) {
  return source.handleError((error, stackTrace) async* {
    int attempts = 0;
    while (attempts < maxRetries) {
      try {
        // 重新执行源流
        final result = await source.last;
        yield result;
        return; // 成功则退出错误处理
      } catch (e) {
        attempts++;
        if (attempts >= maxRetries) {
          // 达到最大重试次数,抛出错误
          yield* Stream.error(error, stackTrace);
          return;
        }
        // 等待后重试
        await Future.delayed(delay * attempts);
      }
    }
  });
}

5.2 catchError - 错误转换

catchError操作符与handleError类似,也用于处理流中的错误,但它更适合用于将错误转换为正常值。catchError接受一个错误处理函数,该函数接收错误对象和堆栈跟踪,返回一个替代值。返回的值会被发送到输出流中,就好像它是一个正常的事件一样。

catchError的一个常见应用是将异常转换为错误码或特殊值,这样流的监听者可以统一处理所有值而不需要区分正常值和错误值。另一个应用是记录错误日志,同时保持流的连续性。

Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5])
  .map((n) {
    if (n == 3) throw FormatException('Invalid format');
    return n;
  });

// 将错误转换为错误码
Stream<int> withErrorCode = numbers.catchError((error, stackTrace) {
  print('发生错误: $error');
  return -1; // 错误码
});

// 监听时统一处理
withErrorCode.listen((value) {
  if (value == -1) {
    // 处理错误情况
    showError('数据格式错误');
  } else {
    // 处理正常值
    processValue(value);
  }
});

// 实际应用:API响应处理
Stream<ApiResponse> apiResponses = ApiClient.getResponses();
Stream<ApiResponse> safeResponses = apiResponses.catchError((error, stackTrace) {
  // 记录错误
  logger.error('API请求失败', error, stackTrace);
  
  // 返回错误响应
  return ApiResponse.error(
    code: _extractErrorCode(error),
    message: '请求失败',
  );
});

六、操作符组合模式

6.1 链式调用

Stream操作符最强大的特性之一是链式调用,多个操作符可以依次组合,形成清晰的数据处理管道。链式调用使得复杂的数据处理逻辑可以用简洁、声明式的方式表达,大大提高了代码的可读性和可维护性。每个操作符都返回一个新的Stream,这使得链式调用成为可能。

链式调用的关键在于理解每个操作符对流的变换效果,以及它们之间的相互影响。比如,take和skip的顺序会影响最终输出的元素;where和map的顺序会影响转换和过滤的效率。合理规划操作符的顺序,可以减少不必要的计算,提升整体性能。

// 复杂数据处理管道
Stream<int> result = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
  // 过滤偶数
  .where((n) => n % 2 == 0)
  // 转换为字符串
  .map((n) => '数字: $n')
  // 只取前3个
  .take(3)
  // 去重(虽然这个例子不会重复)
  .distinct();

// 输出: ["数字: 2", "数字: 4", "数字: 6"]

// 实际应用:数据清洗管道
Stream<ProcessedData> cleanData(Stream<RawData> rawData) {
  return rawData
    // 去除空数据
    .where((data) => data != null && data.isValid)
    // 转换格式
    .map((data) => data.toProcessedFormat())
    // 过滤无效值
    .where((data) => data.value > 0)
    // 转换为字符串
    .map((data) => data.toJson())
    // 去重
    .distinct()
    // 只取最近100条
    .take(100);
}

// 实际应用:搜索结果处理
Stream<SearchResult> processSearch(Stream<SearchEvent> events) {
  return events
    // 防抖:用户停止输入300ms后才处理
    .debounceTime(Duration(milliseconds: 300))
    // 过滤空查询
    .where((event) => event.query.isNotEmpty && event.query.length > 2)
    // 转换为搜索请求
    .map((event) => SearchRequest(event.query))
    // 执行搜索
    .asyncMap((request) => ApiClient.search(request))
    // 过滤空结果
    .where((result) => result.items.isNotEmpty)
    // 去重(避免重复搜索)
    .distinct((a, b) => a.query == b.query)
    // 只取前10个结果
    .take(10);
}

6.2 并行组合

除了链式调用,Stream还提供了多种并行组合操作符,可以将多个流组合成一个流。主要的并行组合操作符包括:merge、zip、combineLatest、race等。这些操作符在需要处理多个数据源、协调多个异步操作的复杂场景中非常有用。

merge将多个流的输出合并到一个流中,不保证顺序,哪个流先有数据就先输出哪个。zip将多个流的元素一一对应地组合成元组,只有当所有流都有元素时才输出一个元组。combineLatest类似zip,但只要有任何一个流有新元素就输出一个元组。race只输出第一个完成的流的值,其他流会被忽略。

// merge示例
final stream1 = Stream.fromIterable([1, 2, 3]);
final stream2 = Stream.fromIterable([4, 5, 6]);
final merged = StreamGroup.merge([stream1, stream2]);
// 输出: 1, 4, 2, 5, 3, 6 (顺序可能不同)

// zip示例
final names = Stream.fromIterable(['张三', '李四', '王五']);
final ages = Stream.fromIterable([25, 30, 28]);
final zipped = StreamZip([names, ages]);
// 输出: (张三, 25), (李四, 30), (王五, 28)

// combineLatest示例
final usernameController = StreamController<String>();
final emailController = StreamController<String>();
final combined = Stream.combineLatest(
  usernameController.stream,
  emailController.stream,
  (username, email) => FormData(username: username, email: email),
);

// 实际应用:多条件搜索
Stream<SearchResult> multiConditionSearch({
  required String keyword,
  required String category,
  required String sort,
}) {
  final keywordStream = _searchByKeyword(keyword);
  final categoryStream = _searchByCategory(category);
  final sortStream = _searchBySort(sort);
  
  // 使用race获取最快的结果
  return Stream.race([
    keywordStream,
    categoryStream,
    sortStream,
  ]);
}

// 实际应用:表单验证
class FormValidator {
  final _usernameController = StreamController<String>();
  final _emailController = StreamController<String>();
  final _passwordController = StreamController<String>();
  
  Stream<bool> get isValid => Stream.combineLatest3(
    _usernameController.stream,
    _emailController.stream,
    _passwordController.stream,
    (username, email, password) {
      return _isValidUsername(username) &&
             _isValidEmail(email) &&
             _isValidPassword(password);
    },
  );
}

七、性能优化最佳实践

7.1 操作符选择与顺序优化

选择合适的操作符和合理的顺序可以显著提升Stream处理的性能。一些通用的优化原则包括:尽早过滤以减少后续操作的数据量、使用更高效的操作符、避免不必要的中间集合创建等。

过滤操作应该尽可能早地执行,因为它们可以显著减少后续操作需要处理的数据量。比如,如果需要过滤偶数然后平方,应该先过滤再平方,而不是先平方再过滤。对于复杂的过滤条件,应该考虑将多个简单的where操作合并为一个,减少遍历次数。

// ❌ 不好的顺序:先转换再过滤
Stream<int> bad = numbers
  .map((n) => n * n)  // 对所有元素计算平方
  .where((n) => n > 10);  // 然后过滤

// ✅ 好的顺序:先过滤再转换
Stream<int> good = numbers
  .where((n) => n > 3)  // 先过滤
  .map((n) => n * n);  // 只对需要的元素计算平方

// 合并多个where
Stream<int> filtered = numbers
  .where((n) => n > 0)
  .where((n) => n < 10)
  .where((n) => n % 2 == 0);

// 优化为单个where
Stream<int> optimized = numbers.where((n) {
  return n > 0 && n < 10 && n % 2 == 0;
});

7.2 内存管理

Stream处理过程中需要注意内存管理,避免因操作不当导致内存泄漏或内存占用过高。主要的内存管理策略包括:及时取消不再需要的订阅、使用有限长度的操作符(如take)、避免在操作符中创建大型临时对象、使用更高效的数据结构等。

及时取消订阅是防止内存泄漏的关键。当widget被销毁或某个功能不再需要时,应该调用StreamSubscription的cancel方法取消订阅。对于长期运行的Stream,可以考虑使用StreamController的onCancel回调来执行清理操作。

class _MyWidgetState extends State<MyWidget> {
  StreamSubscription? _subscription;
  
  
  void initState() {
    super.initState();
    _subscription = dataStream.listen((data) {
      setState(() => _currentData = data);
    });
  }
  
  
  void dispose() {
    // 重要:取消订阅
    _subscription?.cancel();
    super.dispose();
  }
}

// 使用take限制数据量
Stream<int> limited = infiniteStream.take(100);

// 避免在操作符中创建大型集合
Stream<int> good = stream.map((n) => n * 2);
Stream<int> bad = stream.map((n) => List.generate(1000, (i) => n * i).sum());

// 使用高效的数据结构
Stream<int> optimized = stream.where((n) => validNumbers.contains(n));

八、实际应用案例

8.1 实时数据处理管道

class RealTimeDataProcessor {
  final _rawDataStream = StreamController<SensorData>();
  final _processedStream = StreamController<ProcessedData>();
  
  RealTimeDataProcessor() {
    // 构建数据处理管道
    _rawDataStream.stream
      // 防抖:避免高频数据
      .debounceTime(Duration(milliseconds: 50))
      // 过滤无效数据
      .where((data) => data.isValid)
      // 平滑处理:滑动窗口平均
      .transform(_smoothTransformer)
      // 转换单位
      .map((data) => data.toMeters())
      // 去重:避免重复处理
      .distinct((a, b) => (a.value - b.value).abs() < 0.1))
      // 错误处理
      .handleError((error, stackTrace) {
        logger.error('数据处理错误', error, stackTrace);
      })
      // 输出到处理后的流
      .pipe(_processedStream.sink);
  }
  
  StreamTransformer<SensorData, double> _smoothTransformer() {
    final buffer = <double>[];
    const windowSize = 10;
    
    return StreamTransformer.fromHandlers(
      handleData: (data, sink) {
        buffer.add(data.value);
        if (buffer.length > windowSize) {
          buffer.removeAt(0);
        }
        final smoothed = buffer.reduce((a, b) => a + b) / buffer.length;
        sink.add(smoothed);
      },
    );
  }
  
  void dispose() {
    _rawDataStream.close();
    _processedStream.close();
  }
}

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐