真实世界研究的数据治理最容易卡在三个地方:多源数据口径不一、缺失值无法解释、字段映射反复返工。本文只讨论技术架构和工程流程示例,不提供诊断、治疗、分诊或用药建议;文中的质控阈值和升级规则均为示例,真实项目应由医疗专业人员和机构规范确认。

问题背景:AI分析前,数据底座经常先掉链子

在真实世界研究项目里,数据通常来自多个业务系统、登记表、随访表和人工整理文件。开发侧拿到的不是“可直接建模的数据集”,而是一批字段名不一致、编码体系不统一、日期粒度不同、缺失原因不明确的宽表或半结构化文件。

我在做这类链路时,会把“AI提效”拆成两层:第一层是让数据整理、校验、映射尽量自动化;第二层才是让分析脚本、特征生成和报告草稿提速。前者不稳定,后者只会把错误放大。

一个更稳的流程可以画成这样:

多源数据
  -> 标准字段映射
  -> Pandas 清洗
  -> DuckDB 汇总与落盘
  -> Great Expectations 质控
  -> Airflow 编排
  -> 分析数据集 ADS

技术目标:把清洗、质控、分析准备放到一条链路里

这条链路的目标不是替代研究设计,也不是自动判断医学结论,而是把数据工程环节做成可复跑、可追踪、可审计。

核心约束建议写清楚:

  • 字段映射必须版本化,不能只存在于开发者电脑里的 Excel。
  • 每次生成分析数据集时,要记录输入文件、规则版本和运行时间。
  • 缺失、重复、异常编码不能只在日志里提示,要形成质控结果。
  • 示例阈值只用于工程演示,真实阈值需按项目方案和机构规则确认。
  • AI可以辅助字段匹配、异常解释和代码生成,但最终规则要人工确认。

方案设计:Pandas 负责清洗,DuckDB 负责中间层

Pandas 适合做逐字段清洗和规则化转换,DuckDB 适合在本地或任务容器中做 SQL 汇总、Join 和 Parquet 落盘。两者结合,比把所有逻辑都塞进 Pandas 更容易排查性能和口径问题。

建议目录结构如下:

rwr-pipeline/
  dags/
    rwr_etl_dag.py
  configs/
    field_mapping.yaml
    quality_rules.yaml
  data/
    raw/
    staging/
    mart/
  src/
    extract_clean.py
    validate.py
    build_ads.py

字段映射文件可以先保持简单:

patient_id:
  source: ["pid", "patient_no", "subject_id"]
  target: "patient_id"
visit_date:
  source: ["visit_dt", "date", "followup_date"]
  target: "visit_date"
status_code:
  source: ["status", "outcome_code"]
  target: "status_code"

AI在这里适合做“候选映射推荐”,例如根据字段名、样例值、历史映射表给出建议。但不要让模型直接改生产规则,推荐结果应进入人工审核流程。

核心实现:从多源 CSV 生成分析准备表

下面示例用 Pandas 合并多源数据,用 DuckDB 做去重和汇总。代码变量都围绕真实世界研究数据整理,不涉及业务诊断逻辑。

import pandas as pd
import duckdb
from pathlib import Path

RAW_DIR = Path("data/raw")
STAGING_DIR = Path("data/staging")
MART_DIR = Path("data/mart")

FIELD_MAP = {
    "pid": "patient_id",
    "patient_no": "patient_id",
    "subject_id": "patient_id",
    "visit_dt": "visit_date",
    "date": "visit_date",
    "followup_date": "visit_date",
    "status": "status_code",
    "outcome_code": "status_code",
}

def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    renamed = {}
    for col in df.columns:
        key = col.strip().lower()
        renamed[col] = FIELD_MAP.get(key, key)
    return df.rename(columns=renamed)

def clean_one_file(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path, dtype=str)
    df = normalize_columns(df)

    required = ["patient_id", "visit_date"]
    for col in required:
        if col not in df.columns:
            raise ValueError(f"{path.name} 缺少必要字段: {col}")

    df["patient_id"] = df["patient_id"].str.strip()
    df["visit_date"] = pd.to_datetime(df["visit_date"], errors="coerce")
    df["source_file"] = path.name

    if "status_code" in df.columns:
        df["status_code"] = df["status_code"].str.strip().str.upper()

    return df

def build_staging():
    frames = [clean_one_file(p) for p in RAW_DIR.glob("*.csv")]
    merged = pd.concat(frames, ignore_index=True)
    STAGING_DIR.mkdir(parents=True, exist_ok=True)
    merged.to_parquet(STAGING_DIR / "rwr_staging.parquet", index=False)

def build_ads():
    MART_DIR.mkdir(parents=True, exist_ok=True)
    con = duckdb.connect()
    con.execute("""
        CREATE OR REPLACE TABLE staging AS
        SELECT * FROM read_parquet('data/staging/rwr_staging.parquet')
    """)
    con.execute("""
        COPY (
            SELECT
                patient_id,
                MIN(visit_date) AS first_visit_date,
                MAX(visit_date) AS last_visit_date,
                COUNT(*) AS record_count,
                COUNT(DISTINCT source_file) AS source_count
            FROM staging
            WHERE patient_id IS NOT NULL
              AND visit_date IS NOT NULL
            GROUP BY patient_id
        )
        TO 'data/mart/analysis_dataset.parquet'
        (FORMAT PARQUET)
    """)

if __name__ == "__main__":
    build_staging()
    build_ads()
    print("ADS generated: data/mart/analysis_dataset.parquet")

这段代码没有追求复杂,而是先保证三件事:字段统一、日期可解析、分析准备表可复跑。后续再把映射配置外置、异常行输出、任务元数据记录补上。

质控规则:不要只看任务是否成功

ETL任务成功不代表数据可用。真实项目里最常见的问题是:数据能跑完,但关键字段缺失率突然升高,或者某个来源的编码规则变了。

Great Expectations 可以把质控规则沉淀成可执行资产。示例规则如下:

import great_expectations as gx
import pandas as pd

df = pd.read_parquet("data/staging/rwr_staging.parquet")

context = gx.get_context()
validator = context.sources.pandas_default.read_dataframe(df)

validator.expect_column_values_to_not_be_null("patient_id")

validator.expect_column_values_to_not_be_null(
    "visit_date",
    mostly=0.95
)

validator.expect_column_values_to_be_in_set(
    "status_code",
    ["A", "B", "C", "UNKNOWN"],
    mostly=0.98
)

result = validator.validate()
print("success:", result.success)

for item in result.results:
    if not item.success:
        print(item.expectation_config.expectation_type)
        print(item.result)

这里的 0.950.98 只是工程示例,不代表任何行业标准。实际项目应把缺失率、枚举范围、异常升级规则写进研究数据管理计划,并由项目负责人确认。

Airflow编排:让返工点变得可定位

当数据源增加后,手动运行脚本很快会失控。Airflow 的价值不在于“显得平台化”,而在于把每个节点的输入、输出和失败原因拆开。

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="rwr_data_quality_pipeline",
    start_date=datetime(2026, 5, 26),
    schedule=None,
    catchup=False,
    tags=["rwr", "data-quality"],
) as dag:

    clean = BashOperator(
        task_id="clean_and_stage",
        bash_command="python src/extract_clean.py"
    )

    validate = BashOperator(
        task_id="run_quality_check",
        bash_command="python src/validate.py"
    )

    build_ads = BashOperator(
        task_id="build_analysis_dataset",
        bash_command="python src/build_ads.py"
    )

    clean >> validate >> build_ads

我的经验是,质控节点不要放到最后。清洗后先校验,校验通过再生成分析准备表;否则分析侧发现问题时,往往已经不知道是哪一批原始数据或哪次映射修改引入的。

优化点:效率来自少返工,而不只是跑得快

这类链路的性能优化,不应只盯 CPU 时间。更重要的是减少返工次数、减少人工核对范围、减少口径争议。

可以优先做四个优化:

  • 原始数据落盘只读,清洗结果用 Parquet 保存,避免重复解析 CSV。
  • 字段映射、枚举映射、缺失原因映射全部配置化,并记录版本号。
  • 对失败质控输出明细样本,让数据管理员可以直接定位问题行。
  • 对常见映射冲突使用 AI 生成候选解释,但所有规则变更必须走审核。

如果数据量达到千万级记录,DuckDB 通常比纯 Pandas Join 更稳,尤其适合本地批处理和容器化任务。若后续进入多团队协作,再考虑迁移到更完整的数据湖或仓库架构。

踩坑记录:几个容易被低估的问题

第一,字段名相同不代表语义相同。比如 status_code 在不同来源里可能表示采集状态、随访状态或记录状态,不能只按字段名自动合并。

第二,缺失值要区分“未采集”“不适用”“未知”“解析失败”。如果都写成空值,后续分析准备阶段会丢失大量上下文。

第三,质控报告要给到可执行结果。只输出“校验失败”没有意义,至少要包含失败字段、失败比例、样例记录和数据来源。

第四,AI生成的清洗代码需要测试集。可以维护一批脱敏样例数据,覆盖日期格式、编码大小写、重复记录和缺失原因等常见情况。

结论:先把数据底座做稳,再谈分析提效

AI可以帮助真实世界研究的数据团队更快完成字段识别、规则草拟和异常解释,但工程链路必须保持可复跑、可审计、可人工确认。Pandas、DuckDB、Airflow 和 Great Expectations 的组合,适合从轻量项目起步,逐步沉淀成稳定的数据治理流程。

下一步建议先选一个数据源做端到端闭环:字段映射、清洗落盘、质控报告、ADS生成全部跑通。数据底座稳定以后,再把AI用于规则推荐、质控摘要和分析脚本生成,效率提升才不会建立在不可靠的数据之上。

本文文献检索、文献挖掘以及文献翻译采用的是【超能文献| AI文献检索|AI文档翻译】

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐