AI做真实世界研究:数据整理、质控与分析效率怎么一起提
真实世界研究的数据治理最容易卡在三个地方:多源数据口径不一、缺失值无法解释、字段映射反复返工。本文只讨论技术架构和工程流程示例,不提供诊断、治疗、分诊或用药建议;文中的质控阈值和升级规则均为示例,真实项目应由医疗专业人员和机构规范确认。
问题背景:AI分析前,数据底座经常先掉链子
在真实世界研究项目里,数据通常来自多个业务系统、登记表、随访表和人工整理文件。开发侧拿到的不是“可直接建模的数据集”,而是一批字段名不一致、编码体系不统一、日期粒度不同、缺失原因不明确的宽表或半结构化文件。
我在做这类链路时,会把“AI提效”拆成两层:第一层是让数据整理、校验、映射尽量自动化;第二层才是让分析脚本、特征生成和报告草稿提速。前者不稳定,后者只会把错误放大。
一个更稳的流程可以画成这样:
多源数据
-> 标准字段映射
-> Pandas 清洗
-> DuckDB 汇总与落盘
-> Great Expectations 质控
-> Airflow 编排
-> 分析数据集 ADS
技术目标:把清洗、质控、分析准备放到一条链路里
这条链路的目标不是替代研究设计,也不是自动判断医学结论,而是把数据工程环节做成可复跑、可追踪、可审计。
核心约束建议写清楚:
- 字段映射必须版本化,不能只存在于开发者电脑里的 Excel。
- 每次生成分析数据集时,要记录输入文件、规则版本和运行时间。
- 缺失、重复、异常编码不能只在日志里提示,要形成质控结果。
- 示例阈值只用于工程演示,真实阈值需按项目方案和机构规则确认。
- AI可以辅助字段匹配、异常解释和代码生成,但最终规则要人工确认。
方案设计:Pandas 负责清洗,DuckDB 负责中间层
Pandas 适合做逐字段清洗和规则化转换,DuckDB 适合在本地或任务容器中做 SQL 汇总、Join 和 Parquet 落盘。两者结合,比把所有逻辑都塞进 Pandas 更容易排查性能和口径问题。
建议目录结构如下:
rwr-pipeline/
dags/
rwr_etl_dag.py
configs/
field_mapping.yaml
quality_rules.yaml
data/
raw/
staging/
mart/
src/
extract_clean.py
validate.py
build_ads.py
字段映射文件可以先保持简单:
patient_id:
source: ["pid", "patient_no", "subject_id"]
target: "patient_id"
visit_date:
source: ["visit_dt", "date", "followup_date"]
target: "visit_date"
status_code:
source: ["status", "outcome_code"]
target: "status_code"
AI在这里适合做“候选映射推荐”,例如根据字段名、样例值、历史映射表给出建议。但不要让模型直接改生产规则,推荐结果应进入人工审核流程。
核心实现:从多源 CSV 生成分析准备表
下面示例用 Pandas 合并多源数据,用 DuckDB 做去重和汇总。代码变量都围绕真实世界研究数据整理,不涉及业务诊断逻辑。
import pandas as pd
import duckdb
from pathlib import Path
RAW_DIR = Path("data/raw")
STAGING_DIR = Path("data/staging")
MART_DIR = Path("data/mart")
FIELD_MAP = {
"pid": "patient_id",
"patient_no": "patient_id",
"subject_id": "patient_id",
"visit_dt": "visit_date",
"date": "visit_date",
"followup_date": "visit_date",
"status": "status_code",
"outcome_code": "status_code",
}
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
renamed = {}
for col in df.columns:
key = col.strip().lower()
renamed[col] = FIELD_MAP.get(key, key)
return df.rename(columns=renamed)
def clean_one_file(path: Path) -> pd.DataFrame:
df = pd.read_csv(path, dtype=str)
df = normalize_columns(df)
required = ["patient_id", "visit_date"]
for col in required:
if col not in df.columns:
raise ValueError(f"{path.name} 缺少必要字段: {col}")
df["patient_id"] = df["patient_id"].str.strip()
df["visit_date"] = pd.to_datetime(df["visit_date"], errors="coerce")
df["source_file"] = path.name
if "status_code" in df.columns:
df["status_code"] = df["status_code"].str.strip().str.upper()
return df
def build_staging():
frames = [clean_one_file(p) for p in RAW_DIR.glob("*.csv")]
merged = pd.concat(frames, ignore_index=True)
STAGING_DIR.mkdir(parents=True, exist_ok=True)
merged.to_parquet(STAGING_DIR / "rwr_staging.parquet", index=False)
def build_ads():
MART_DIR.mkdir(parents=True, exist_ok=True)
con = duckdb.connect()
con.execute("""
CREATE OR REPLACE TABLE staging AS
SELECT * FROM read_parquet('data/staging/rwr_staging.parquet')
""")
con.execute("""
COPY (
SELECT
patient_id,
MIN(visit_date) AS first_visit_date,
MAX(visit_date) AS last_visit_date,
COUNT(*) AS record_count,
COUNT(DISTINCT source_file) AS source_count
FROM staging
WHERE patient_id IS NOT NULL
AND visit_date IS NOT NULL
GROUP BY patient_id
)
TO 'data/mart/analysis_dataset.parquet'
(FORMAT PARQUET)
""")
if __name__ == "__main__":
build_staging()
build_ads()
print("ADS generated: data/mart/analysis_dataset.parquet")
这段代码没有追求复杂,而是先保证三件事:字段统一、日期可解析、分析准备表可复跑。后续再把映射配置外置、异常行输出、任务元数据记录补上。
质控规则:不要只看任务是否成功
ETL任务成功不代表数据可用。真实项目里最常见的问题是:数据能跑完,但关键字段缺失率突然升高,或者某个来源的编码规则变了。
Great Expectations 可以把质控规则沉淀成可执行资产。示例规则如下:
import great_expectations as gx
import pandas as pd
df = pd.read_parquet("data/staging/rwr_staging.parquet")
context = gx.get_context()
validator = context.sources.pandas_default.read_dataframe(df)
validator.expect_column_values_to_not_be_null("patient_id")
validator.expect_column_values_to_not_be_null(
"visit_date",
mostly=0.95
)
validator.expect_column_values_to_be_in_set(
"status_code",
["A", "B", "C", "UNKNOWN"],
mostly=0.98
)
result = validator.validate()
print("success:", result.success)
for item in result.results:
if not item.success:
print(item.expectation_config.expectation_type)
print(item.result)
这里的 0.95、0.98 只是工程示例,不代表任何行业标准。实际项目应把缺失率、枚举范围、异常升级规则写进研究数据管理计划,并由项目负责人确认。
Airflow编排:让返工点变得可定位
当数据源增加后,手动运行脚本很快会失控。Airflow 的价值不在于“显得平台化”,而在于把每个节点的输入、输出和失败原因拆开。
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="rwr_data_quality_pipeline",
start_date=datetime(2026, 5, 26),
schedule=None,
catchup=False,
tags=["rwr", "data-quality"],
) as dag:
clean = BashOperator(
task_id="clean_and_stage",
bash_command="python src/extract_clean.py"
)
validate = BashOperator(
task_id="run_quality_check",
bash_command="python src/validate.py"
)
build_ads = BashOperator(
task_id="build_analysis_dataset",
bash_command="python src/build_ads.py"
)
clean >> validate >> build_ads
我的经验是,质控节点不要放到最后。清洗后先校验,校验通过再生成分析准备表;否则分析侧发现问题时,往往已经不知道是哪一批原始数据或哪次映射修改引入的。
优化点:效率来自少返工,而不只是跑得快
这类链路的性能优化,不应只盯 CPU 时间。更重要的是减少返工次数、减少人工核对范围、减少口径争议。
可以优先做四个优化:
- 原始数据落盘只读,清洗结果用 Parquet 保存,避免重复解析 CSV。
- 字段映射、枚举映射、缺失原因映射全部配置化,并记录版本号。
- 对失败质控输出明细样本,让数据管理员可以直接定位问题行。
- 对常见映射冲突使用 AI 生成候选解释,但所有规则变更必须走审核。
如果数据量达到千万级记录,DuckDB 通常比纯 Pandas Join 更稳,尤其适合本地批处理和容器化任务。若后续进入多团队协作,再考虑迁移到更完整的数据湖或仓库架构。
踩坑记录:几个容易被低估的问题
第一,字段名相同不代表语义相同。比如 status_code 在不同来源里可能表示采集状态、随访状态或记录状态,不能只按字段名自动合并。
第二,缺失值要区分“未采集”“不适用”“未知”“解析失败”。如果都写成空值,后续分析准备阶段会丢失大量上下文。
第三,质控报告要给到可执行结果。只输出“校验失败”没有意义,至少要包含失败字段、失败比例、样例记录和数据来源。
第四,AI生成的清洗代码需要测试集。可以维护一批脱敏样例数据,覆盖日期格式、编码大小写、重复记录和缺失原因等常见情况。
结论:先把数据底座做稳,再谈分析提效
AI可以帮助真实世界研究的数据团队更快完成字段识别、规则草拟和异常解释,但工程链路必须保持可复跑、可审计、可人工确认。Pandas、DuckDB、Airflow 和 Great Expectations 的组合,适合从轻量项目起步,逐步沉淀成稳定的数据治理流程。
下一步建议先选一个数据源做端到端闭环:字段映射、清洗落盘、质控报告、ADS生成全部跑通。数据底座稳定以后,再把AI用于规则推荐、质控摘要和分析脚本生成,效率提升才不会建立在不可靠的数据之上。
本文文献检索、文献挖掘以及文献翻译采用的是【超能文献| AI文献检索|AI文档翻译】。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)