一个采集任务从业务系统触发到数据落库,中间穿过调度中心、执行节点、浏览器、Python插件、数据库五个环节。
任何一个环节慢了,整体就慢了。但到底是哪个环节慢,在没有调用链之前,只能靠猜。

我们在多节点分布式那篇文章里提到过 trace_id,它让我们能把一个任务在多个节点上的日志串起来。但这个 trace_id 最初只是一个字符串,跟着日志走,查问题的时候在 Kibana 里手动搜索。它能帮我们找到“这个任务经历了哪些环节”,但回答不了更细粒度的问题——

“订单采集从第3页到第15页之间,为什么突然慢了?”
“消息回复流程里,规则引擎决策和模板渲染各自花了多少时间?”
“浏览器页面加载耗时、CDP指令耗时、Python插件处理耗时,哪个是瓶颈?”

这些问题的答案,决定了优化该往哪里使力。我们不能凭感觉说“浏览器太慢了”就去优化浏览器,也不能凭经验说“数据库写入是瓶颈”就去改批量策略。我们要数据——精确到每一步、每一毫秒的数据。

这篇文章讲我们怎么从 trace_id 升级到完整的分布式调用链体系,把自动化流程的执行过程变成一张可度量、可观测的时间地图。


在这里插入图片描述

一、从 trace_id 到 Span 树

单纯的 trace_id 只能告诉你“这些日志来自同一个任务”,但它不描述调用关系,也不描述耗时结构。

我们引入了Span的概念——一个Span代表一次有明确起止时间的操作,Span之间有父子关系,形成一棵树。这是分布式追踪领域的标准做法,我们把它搬到了RPA执行链路上。

一个典型的订单采集任务的Span树长这样:

TaskExecution (2.3s)
├── Scheduler.Dispatch (45ms)
│   ├── Router.Lookup (2ms)
│   └── Queue.Push (12ms)
├── Node.FetchTask (80ms)
├── Flow.Execution (2100ms)
│   ├── Step.LoginCheck (320ms)
│   │   ├── Browser.Navigate (280ms)
│   │   └── CDP.WaitSelector (35ms)
│   ├── Step.PaginatedCollect (1650ms)
│   │   ├── Page.Turn (420ms) x N
│   │   │   ├── Browser.WaitLoad (380ms)
│   │   │   └── CDP.ClickElement (35ms)
│   │   └── Data.Extract (180ms)
│   ├── Step.DataTransform (85ms)
│   └── Step.BatchWrite (210ms)
│       └── MySQL.Insert (195ms)
└── Scheduler.Callback (28ms)

有了这棵树,性能瓶颈一目了然。Browser.WaitLoad 是耗时大户,MySQL.Insert 在批量写入时也占了不少时间。如果某次执行慢了,对比一下Span树就知道慢在哪个分支上。


二、Span的数据模型与存储

每个Span在Python插件里被创建、计时、上报。数据模型参考了OpenTelemetry的规范,但做了极简化适配。

import time
import uuid

class Span:
    def __init__(self, name: str, parent=None, trace_id=None):
        self.span_id = str(uuid.uuid4())[:8]
        self.parent_id = parent.span_id if parent else None
        self.trace_id = trace_id or parent.trace_id if parent else str(uuid.uuid4())[:16]
        self.name = name
        self.start_time = time.time()
        self.end_time = None
        self.attributes = {}
        self.status = "running"

    def set_attribute(self, key, value):
        self.attributes[key] = value

    def finish(self, status="ok"):
        self.end_time = time.time()
        self.status = status

    def to_dict(self):
        return {
            "span_id": self.span_id,
            "parent_id": self.parent_id,
            "trace_id": self.trace_id,
            "name": self.name,
            "start_time": self.start_time,
            "duration_ms": int((self.end_time - self.start_time) * 1000) if self.end_time else None,
            "attributes": self.attributes,
            "status": self.status
        }

Span在流程执行过程中被创建和关闭。影刀流程的每个关键步骤——导航、翻页、数据提取、写入——都包裹在Span里。Python插件的入口函数自动创建根Span,调用子模块时传递父Span。

def execute_order_collect(task):
    root_span = Span("TaskExecution", trace_id=task.trace_id)
    try:
        with Span("Step.LoginCheck", parent=root_span) as span:
            check_login(span)
        with Span("Step.PaginatedCollect", parent=root_span) as span:
            paginated_collect(task, span)
        with Span("Step.BatchWrite", parent=root_span) as span:
            batch_write(task, span)
        root_span.finish("ok")
    except Exception as e:
        root_span.finish("error")
        root_span.set_attribute("error", str(e))
    finally:
        span_reporter.report(root_span)

上下文管理器让Span的开启和关闭变得自动化,即使在异常退出时也能保证Span被正确关闭和上报。


三、跨进程的Trace传递

RPA执行链路最大的挑战是跨进程边界。调度中心是一个进程,执行节点的调度Worker是另一个进程,影刀客户端是一个进程,浏览器是CDP连接的另一端。trace_id必须在这些边界之间显式传递。

调度中心到执行节点:trace_id在任务消息体中携带,Worker消费任务时从消息体中提取,作为整条执行链路的trace_id。

# 调度中心
task_message = {
    "task_id": task.task_id,
    "trace_id": task.trace_id,
    "flow_name": task.flow_name,
    "params": task.params
}
redis.lpush(f"queue:node:{target_node}", json.dumps(task_message))

# 执行节点Worker
task = json.loads(redis.brpop(queue_key))
trace_id = task["trace_id"]
root_span = Span("TaskExecution", trace_id=trace_id)

执行节点到影刀客户端:trace_id通过命令行参数或临时文件传递给影刀流程。影刀流程启动时,Python插件从参数中读取trace_id,所有后续Span都挂在这个trace_id下。

Python插件到浏览器CDP:CDP操作本身不会自动携带trace_id。我们在每次CDP调用前手动创建一个子Span,记录操作类型和耗时。

def cdp_with_trace(debug_port, method, params, parent_span):
    with Span(f"CDP.{method}", parent=parent_span) as span:
        span.set_attribute("cdp_method", method)
        result = send_cdp(debug_port, method, params)
        span.finish("ok" if result else "error")
        return result

跨进程传递的关键原则是:trace_id总是在边界处显式注入和提取,不依赖任何隐式的线程局部存储。


四、Span的上报与存储

Span数据需要被集中存储和查询。我们选择Elasticsearch作为Span存储,因为它天然支持时间序列和嵌套文档,适合存储Span树。

上报采用异步批量写入。流程执行过程中产生的Span不实时上报(会拖慢流程),而是收集在内存缓冲区,任务结束或缓冲区满时一次性批量写入ES。

import threading
from queue import Queue

class SpanReporter:
    def __init__(self, es_client, batch_size=50, flush_interval=5):
        self.es = es_client
        self.buffer = Queue()
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._flush_timer = None

    def report(self, span):
        self.buffer.put(span.to_dict())
        if self.buffer.qsize() >= self.batch_size:
            self._flush()

    def _flush(self):
        batch = []
        while not self.buffer.empty() and len(batch) < self.batch_size:
            batch.append(self.buffer.get())
        if batch:
            self.es.bulk_index("spans", batch)

每个Span在ES里是一条独立的文档,通过trace_id可以聚合出完整的Span树。Kibana里按trace_id分组,就能看到一个任务的完整调用链。


五、调用链的可视化

日志里的Span树是文本,排查问题时需要直观的可视化。我们基于Span数据做了一个简单的调用链看板

看板展示单次任务执行的火焰图式时间线。横轴是时间,每个Span是一个色块,色块宽度与Span耗时成正比。颜色按Span类型编码——浏览器操作蓝色、CDP操作绿色、Python处理橙色、数据库写入紫色。

鼠标悬停在色块上,显示Span名称、耗时、属性。点击Span可以展开查看子Span。

火焰图让性能瓶颈一眼可见。有一次我们发现某次采集任务耗时异常长,打开火焰图,发现 Browser.WaitLoad 的色块在某几页突然变成了一条长龙,宽度是正常页面的5倍。进一步查属性,发现那几页的URL里带了一个特殊的查询参数,导致平台返回了包含大量历史数据的页面。问题定位从“可能是什么原因”变成了“就是这几页加载慢,查这几页的URL参数”。


六、基于Span的聚合分析与性能基

单条调用链帮我们查具体问题,聚合分析帮我们发现系统性问题。

每天晚上,一个离线分析脚本扫描当天所有的Span数据,按流程类型、步骤名称、操作类型做聚合,计算P50、P95、P99耗时。

def aggregate_spans(es_client, date):
    aggs = es_client.search(index="spans", body={
        "query": {"range": {"start_time": {"gte": f"{date}T00:00:00", "lt": f"{date}T23:59:59"}}},
        "aggs": {
            "by_span_name": {
                "terms": {"field": "name.keyword", "size": 100},
                "aggs": {
                    "p50": {"percentiles": {"field": "duration_ms", "percents": [50]}},
                    "p95": {"percentiles": {"field": "duration_ms", "percents": [95]}},
                    "p99": {"percentiles": {"field": "duration_ms", "percents": [99]}}
                }
            }
        }
    })
    return aggs

聚合结果和上一篇文章的时间预估模型打通。预估模型使用Span级别的耗时数据,做更精细的预估——不只是“采集流程大概多久”,而是“翻页操作平均耗时×预估页数 + 数据提取平均耗时×预估条数 + 批量写入平均耗时”。

当某个Span的聚合耗时出现持续漂移时,系统自动触发告警。比如过去3天 Browser.WaitLoad 的P50耗时从320ms涨到了580ms,很可能是平台页面资源变重或代理质量下降,需要关注。


七、调用链驱动的性能优化案例

Span数据在我们一次大优化中发挥了关键作用。

那一次,我们想优化TEMU订单采集流程的整体耗时。优化前的Span分析显示:

  • Browser.WaitLoad(页面加载等待)占总耗时的62%
  • CDP.Evaluate(执行JS提取数据)占18%
  • MySQL.BatchInsert(批量写入)占12%
  • 其他占8%

我们优先攻击占比最大的 Browser.WaitLoad。进一步分析发现,页面加载时间里,有大量时间花在等待非核心资源(广告脚本、统计埋点)上。我们做了两件事:通过CDP的Network.setBlockedURLs屏蔽非必要资源域名,并把图片加载策略改为按需加载。

优化后的Span数据:

  • Browser.WaitLoad 从62%降到41%
  • 总耗时下降了约25%

然后我们发现 CDP.Evaluate 变成了新的瓶颈。进一步分析发现,部分提取脚本对每条数据重复执行了相同的DOM查询。优化为“一次性提取当前页所有数据,再逐条解析”,CDP.Evaluate 耗时下降了40%。

这次优化,每一步的决策都基于Span数据,每一刀的成效都被Span数据量化验证。

没有度量的优化是玄学,有度量的优化是工程。


八、和DAG工作流的联动

Span体系和我们之前建的DAG编排引擎打通后,带来了一个额外的好处:工作流级别的耗时归因。

一个工作流跑完,不仅能看到整体耗时,还能看到每个子流程的耗时占比,以及子流程内部的耗时分布。这让我们能识别出工作流关键路径上最耗时的节点,并优先优化。

同时,工作流实例的Span树里记录了各节点之间的依赖等待时间——子流程B必须等子流程A完成后才能开始,但A完成了、B还没被调度,中间这段空转时间也被Span记录了。这让我们发现了调度延迟的优化空间。


九、Span体系的开销控制

全链路追踪会带来额外的性能开销。每个Span的创建、计时、属性记录、序列化、批量上报,都要消耗CPU和内存。

我们做了几项开销控制:

采样策略:不是每个任务都做细粒度Span记录。90%的任务只记录根Span和步骤Span(粗粒度),10%的任务做完整Span记录(细粒度到CDP操作)。采样比例通过配置中心动态调整,线上异常时可以临时调高。

属性精简:Span属性不记录大文本。页面URL截断到200字符,DOM片段不存入Span属性(另走快照通道)。保持单Span体积在1KB以内。

异步上报与背压:缓冲区满了不进反压业务线程,而是丢弃最旧的Span,并记录丢弃数量。宁可丢Span,不可阻塞流程。

实测下来,细粒度Span记录对流程整体耗时的增加不超过3%,在可接受范围内。


十、从可观测到可预测

调用链体系的终极目标,不是“出问题后能快速查到原因”,而是让问题在出现之前就被系统自己发现。

我们把Span数据和异常检测算法结合。当一个任务的某个Span耗时偏离了同类任务的历史基线超过3个标准差时,即使任务还没失败,系统也会标记这个任务为“异常慢”,并提前生成预警。

很多时候,这种异常慢是流程即将失败的先兆——页面正在经历间歇性超时,只是暂时还没触发超时阈值。提前预警让我们能在任务彻底失败之前介入,甚至提前保存Checkpoint,避免进度丢失。

最好的故障排查,是故障还没发生就把它扼杀在摇篮里。
调用链数据不只是用来复盘过去的,更是用来预警未来的。

作者:林焱
一个执着于让自动化系统的每一毫秒都透明可见的工程师

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐