Faust:把 Kafka Streams 搬到 Python 里

Robinhood 开源的 Faust,是一个 Python 流处理库,斩获 6.8k Star。

正文顶部截图

README区域截图

Faust 做的事情很明确,把 Kafka Streams 那套能力搬到了 Python 里。Robinhood 用它搭建分布式系统和实时数据管道,每天处理数十亿事件。

纯 Python,不用 DSL

Faust 不需要专用 DSL,会 Python 就能上手。它基于 3.6+ 的 async/await 语法,同时支持 mypy 静态类型检查。

定义 agent 就是在消费一个 Kafka topic:

import faust

class Order(faust.Record):
    account_id: str
    amount: int

app = faust.App('myapp', broker='kafka://localhost')

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        print(f'Order for {order.account_id}: {order.amount}')

Agent 是 async def 函数,除了处理流数据,还能同时做 HTTP 请求。Faust 也用 @app.timer 装饰器支持定时任务,方便做数据生产或周期性操作。

Record 定义了消息的序列化结构,底层用 JSON 编解码,开发者只管写 Python 类就行。

带状态的流处理

Faust 内置了分布式 K/V 存储(基于 RocksDB),用法跟字典一样:

counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

数据按 key 分区,同一 key 落到同一个 worker。状态通过 Kafka changelog topic 做预写日志,节点宕机后 standby 节点从 changelog 恢复状态并接管工作。

Table 支持窗口聚合,统计"过去一小时的点击数"这类场景开箱即用。支持 tumbling、hopping、sliding 三种窗口类型,过期数据自动清理。

规模与性能

单 core worker 实例每秒可处理数万事件。数据经过 Kafka topic 分区,天然支持水平扩展,加实例就能提升吞吐。

生态整合

Faust 可以和 NumPy、PyTorch、Django、Flask、SQLAlchemy 等库一起用。通过 eventlet 桥接,现有 Django/Flask 项目也能集成 Faust。

安装

pip install -U faust

生产环境推荐安装 RocksDB 支持:

pip install faust[rocksdb]

其他可选插件包括 redis(缓存)、datadog/statsd(监控)、uvloop(事件循环优化)。

现状

项目已 deprecated,官方停止维护。社区活跃分支在 faust-streaming。

Faust 的意义在于,它证明了 Python 也能做流处理。之前这类工作基本被 Java 生态垄断,Faust 把门槛降到了"会 Python 就行"的程度。对于已经重度使用 Python 的团队,这是一个值得关注的方向。

这类工作基本被 Java 生态垄断,Faust 把门槛降到了"会 Python 就行"的程度。对于已经重度使用 Python 的团队,这是一个值得关注的方向。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐