Apache Airflow 系列教程 | 第1课:Apache Airflow 概述与架构全景
导读(Introduction)
欢迎来到 Apache Airflow 源码深度解析系列的第一课。
Apache Airflow 是当今最流行的开源工作流编排平台之一,被全球数千家企业用于构建、调度和监控数据管道。从 Airbnb 内部工具到 Apache 顶级项目,Airflow 已经成为数据工程领域的事实标准。
本课程系列的独特之处在于:我们不仅教你"如何使用" Airflow,更带你深入源码层面理解"它是如何工作的"。通过剖析真实的源代码结构,你将获得对 Airflow 内部机制的深刻理解,这将帮助你更好地调试问题、优化性能,甚至贡献代码到开源社区。
本课作为系列的开篇,将为你建立一个关于 Airflow 的全景认知——从核心概念到系统架构,从代码组织到依赖关系。这是后续所有深入学习的基础。
学习目标(Learning Objectives)
完成本课学习后,你将能够:
- 理解 Airflow 的定位与核心价值——明确它在数据工程生态中解决什么问题
- 掌握核心概念体系——DAG、Task、Operator、Scheduler、Executor 等核心术语
- 理解架构演进——从 Airflow 2.x 单体架构到 3.x 分包架构的设计思想
- 熟悉项目代码结构——了解
airflow-core、task-sdk、providers等顶层模块的职责划分 - 分析依赖关系——通过
pyproject.toml理解各包之间的关联和技术栈选择 - 搭建开发环境——了解本地开发环境配置和 Breeze 开发工具
正文内容(Main Content)
1. Airflow 是什么:工作流编排平台的定位与核心价值
1.1 问题背景
在现代数据驱动的企业中,每天有大量的数据处理任务需要按照特定的依赖关系和时间规则执行:
- 凌晨从业务数据库抽取数据到数据仓库
- 数据清洗和转换任务必须等上游抽取完成
- 机器学习模型每天重新训练
- 报表在每个工作日早上 8 点前生成
这些任务构成了复杂的工作流(Workflow),手动管理这些任务的执行顺序、失败重试、资源分配等问题非常困难。
1.2 Airflow 的解决方案
Apache Airflow 将工作流定义为代码(Workflow as Code),使用 Python 语言描述任务及其依赖关系。这带来了几个关键优势:
| 特性 | 说明 |
|---|---|
| 代码即工作流 | 用 Python 定义 DAG,享受版本控制、代码审查、测试等软件工程最佳实践 |
| 可视化监控 | Web UI 提供 DAG 运行状态、任务日志、甘特图等全方位监控 |
| 可扩展性 | 通过 Operator/Hook/Sensor 机制轻松集成任何外部系统 |
| 弹性调度 | 支持 Cron 表达式、时间间隔、数据资产触发等多种调度方式 |
| 故障恢复 | 自动重试、告警通知、手动触发回填 |
1.3 Airflow 不是什么
理解 Airflow 的边界同样重要:
- 不是数据流引擎:不像 Spark Streaming 或 Flink 处理实时数据流
- 不是数据处理框架:不直接处理大数据,而是调度其他工具(如 Spark)来处理
- 不适合超低延迟场景:调度粒度通常在秒到分钟级别
2. 核心概念速览
在深入源码之前,我们需要建立一套统一的术语体系。以下是 Airflow 中最核心的概念:
2.1 DAG(Directed Acyclic Graph,有向无环图)
DAG 是 Airflow 中最基本的组织单元,代表一个完整的工作流。"有向"意味着任务之间有明确的执行顺序;"无环"意味着不存在循环依赖。
from airflow.sdk import dag, task
@dag(schedule="0 2 * * *", start_date=pendulum.datetime(2024, 1, 1))
def my_etl_pipeline():
"""每天凌晨2点执行的 ETL 管道"""
...
2.2 Task(任务)
Task 是 DAG 中的一个执行单元,代表一个具体的工作步骤。每个 Task 都是某个 Operator 的实例。
2.3 Operator(算子)
Operator 定义了 Task 要"做什么"。Airflow 提供丰富的内置 Operator:
PythonOperator:执行 Python 函数BashOperator:执行 Shell 命令KubernetesPodOperator:在 K8s Pod 中运行任务
2.4 Scheduler(调度器)
Scheduler 是 Airflow 的"大脑",负责:
- 解析 DAG 文件
- 根据调度规则创建 DagRun
- 决定哪些 Task 可以执行
- 将 Task 提交给 Executor
2.5 Executor(执行器)
Executor 负责实际执行 Task 的运行环境。不同的 Executor 适用于不同的部署场景:
LocalExecutor:在本地进程中执行CeleryExecutor:通过 Celery 分布式执行KubernetesExecutor:每个 Task 启动一个 K8s Pod
2.6 概念之间的关系
DAG (工作流定义)
└── Task (任务节点)
└── Operator (执行逻辑)
Scheduler (调度决策) → Executor (执行引擎) → Worker (实际运行)
↓
TaskInstance (任务运行实例)
↓
DagRun (DAG 运行实例)
3. Airflow 的架构演进:从单体到分包
3.1 Airflow 2.x 时代:单体架构
在 Airflow 2.x 中,整个项目是一个巨大的单体包 apache-airflow,所有代码都在一个 airflow/ 目录下。这种架构的问题逐渐暴露:
- 安装体积庞大:即使只编写 DAG,也需要安装完整的调度器代码
- 依赖冲突频繁:核心框架依赖与用户 DAG 依赖容易冲突
- 耦合度高:Provider 更新需要等待核心框架发版
3.2 Airflow 3.x 时代:分包架构
Airflow 3.x 进行了重大架构重组,将单体包拆分为多个独立的子项目:
airflow-main/ # 项目根目录
├── airflow-core/ # 核心调度引擎(服务端)
│ └── src/airflow/ # 包名: apache-airflow-core
├── task-sdk/ # 任务定义 SDK(客户端)
│ └── src/airflow/sdk/ # 包名: apache-airflow-task-sdk
├── providers/ # 70+ 外部系统集成插件
│ ├── amazon/ # 包名: apache-airflow-providers-amazon
│ ├── google/ # 包名: apache-airflow-providers-google
│ ├── standard/ # 包名: apache-airflow-providers-standard
│ └── ...
├── airflow-ctl/ # 命令行管理工具
│ └── src/airflowctl/ # 包名: apache-airflow-ctl
├── shared/ # 共享基础模块
│ ├── configuration/
│ ├── serialization/
│ ├── observability/
│ └── ...
└── docs/ # 文档
这种分包设计的核心思想是关注点分离:
| 子项目 | 职责 | 使用者 |
|---|---|---|
airflow-core |
调度引擎、API 服务、数据库管理 | 平台运维人员 |
task-sdk |
DAG 定义接口、Operator 基类 | DAG 开发者 |
providers |
外部系统集成(AWS/GCP/K8s等) | 按需安装 |
shared |
跨包共享代码(序列化、时区等) | 内部使用 |
4. 项目顶层结构深度分析
让我们逐一剖析每个顶层模块的内部结构和职责。
4.1 airflow-core:核心调度引擎
airflow-core 是整个系统的"心脏",包含调度器、API 服务、数据库模型等核心功能。
源码路径:airflow-core/src/airflow/
airflow-core/src/airflow/
├── __init__.py # 版本定义: __version__ = "3.3.0"
├── __main__.py # CLI 入口点: airflow 命令
├── configuration.py # 配置解析引擎
├── settings.py # 全局设置(DB连接、时区等)
├── models/ # SQLAlchemy 数据模型(40+ 个模型文件)
│ ├── dag.py # DagModel - DAG 的数据库表示
│ ├── dagrun.py # DagRun - DAG 运行实例
│ ├── taskinstance.py # TaskInstance - 2463行核心逻辑
│ ├── connection.py # 外部系统连接信息
│ ├── variable.py # 运行时变量
│ ├── pool.py # 资源池
│ ├── xcom.py # 任务间数据交换
│ ├── asset.py # 数据资产模型
│ └── ...
├── jobs/ # 后台服务进程
│ ├── scheduler_job_runner.py # 调度器主循环(3317行)
│ ├── triggerer_job_runner.py # 异步触发器进程
│ └── dag_processor_job_runner.py # DAG 解析进程
├── executors/ # 执行器体系
│ ├── base_executor.py # 执行器抽象基类
│ ├── local_executor.py # 本地多进程执行器
│ └── executor_loader.py # 动态加载机制
├── dag_processing/ # DAG 文件解析引擎
│ ├── manager.py # 解析管理器
│ ├── processor.py # 单文件处理器
│ └── bundles/ # DAG Bundle 机制
├── api_fastapi/ # REST API 服务(基于 FastAPI)
│ ├── core_api/ # 管理 API(面向用户)
│ └── execution_api/ # 执行 API(面向 Worker)
├── serialization/ # 序列化/反序列化系统
├── timetables/ # 时间调度策略
├── triggers/ # Deferrable 异步触发器
├── security/ # 权限与认证
├── ui/ # React 前端(TypeScript + Vite)
└── cli/ # CLI 命令实现
从 airflow-core/pyproject.toml 可以看到其核心定位:
[project]
name = "apache-airflow-core"
description = "Core packages for Apache Airflow, schedule and API server"
version = "3.3.0"
关键观察:airflow-core 的定位非常明确——“调度(schedule)和 API 服务(API server)”。它不包含 DAG 编写接口,那是 task-sdk 的职责。
4.2 task-sdk:任务定义 SDK
task-sdk 是 DAG 开发者最常接触的包,提供了编写 DAG 和 Task 的所有接口。
源码路径:task-sdk/src/airflow/sdk/
task-sdk/src/airflow/sdk/
├── __init__.py # 公共 API 导出(100+ 符号)
├── definitions/ # DAG/Task/Operator 定义
│ ├── dag.py # DAG 类和 @dag 装饰器
│ ├── decorators/ # @task、@setup、@teardown
│ ├── taskgroup.py # TaskGroup
│ ├── asset/ # 数据资产定义
│ ├── timetables/ # 时间调度策略
│ ├── mappedoperator.py # 动态任务映射
│ └── context.py # 任务运行上下文
├── bases/ # 基类定义
│ ├── operator.py # BaseOperator
│ ├── sensor.py # BaseSensorOperator
│ └── hook.py # BaseHook
├── api/ # 与执行API交互的客户端
│ └── datamodels/ # Pydantic 数据模型
├── execution_time/ # 任务运行时
├── io/ # Object Storage 抽象
└── serde/ # 序列化框架
从 task-sdk/pyproject.toml 中可以看到:
[project]
name = "apache-airflow-task-sdk"
description = "Python Task SDK for Apache Airflow DAG Authors"
dependencies = [
"apache-airflow-core<3.4.0,>=3.3.0", # 依赖 core 包
...
]
SDK 的公共 API 设计非常值得学习。查看 task-sdk/src/airflow/sdk/__init__.py,它使用了 延迟导入(Lazy Import)模式:
# task-sdk/src/airflow/sdk/__init__.py
__all__ = [
"DAG", "dag", "task", "TaskGroup",
"BaseOperator", "BaseSensorOperator", "BaseHook",
"Asset", "Connection", "Variable",
"XComArg", "Context",
... # 共 100+ 个公共符号
]
__lazy_imports: dict[str, str] = {
"DAG": ".definitions.dag",
"dag": ".definitions.dag",
"task": ".definitions.decorators",
"BaseOperator": ".bases.operator",
...
}
def __getattr__(name: str):
if module_path := __lazy_imports.get(name):
import importlib
mod = importlib.import_module(module_path, __name__)
val = getattr(mod, name)
globals()[name] = val # 缓存,下次直接获取
return val
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
这种设计的好处是:import airflow.sdk 几乎不花时间,只有在实际使用某个类时才会导入对应的模块。对于包含大量模块的 SDK 来说,这大大加快了 DAG 文件的解析速度。
4.3 providers:外部系统集成生态
providers/ 目录下包含 70+ 个独立的 Provider 包,每个对应一个外部系统的集成:
providers/
├── standard/ # 标准 Operator/Sensor(Python、Bash 等)
├── amazon/ # AWS 服务集成(S3、EMR、Redshift 等)
├── google/ # GCP 服务集成(BigQuery、GCS、Dataflow 等)
├── microsoft/ # Azure 服务集成
├── cncf/kubernetes/ # Kubernetes 集成
├── celery/ # Celery 分布式执行器
├── postgres/ # PostgreSQL
├── mysql/ # MySQL
├── redis/ # Redis
├── docker/ # Docker
├── snowflake/ # Snowflake 数据仓库
├── databricks/ # Databricks
└── ... # 还有 60+ 个
每个 Provider 遵循标准的目录结构:
providers/standard/
├── provider.yaml # Provider 元数据声明
├── pyproject.toml # 包配置和依赖
├── src/airflow/providers/standard/
│ ├── operators/ # Operator 实现
│ │ ├── python.py # PythonOperator
│ │ ├── bash.py # BashOperator
│ │ └── branch.py # BranchOperator
│ ├── sensors/ # Sensor 实现
│ │ ├── external_task.py
│ │ └── filesystem.py
│ ├── hooks/ # Hook 实现
│ └── triggers/ # Trigger 实现
└── tests/ # 测试
4.4 shared:跨包共享模块
shared/ 目录包含了 airflow-core 和 task-sdk 共同需要的基础代码:
shared/
├── configuration/ # 配置解析共享逻辑
├── serialization/ # 序列化/反序列化共享逻辑
├── observability/ # 可观测性(metrics/tracing)
├── timezones/ # 时区处理
├── logging/ # 日志框架
├── state/ # 状态枚举定义
├── secrets_masker/ # 敏感信息脱敏
├── secrets_backend/ # 密钥存储后端
├── dagnode/ # DAG 节点抽象
├── listeners/ # 事件监听器
├── plugins_manager/ # 插件管理器
├── providers_discovery/ # Provider 发现机制
└── template_rendering/ # Jinja2 模板渲染
这些共享模块在构建时通过 hatch 的 force-include 机制被分别复制到 airflow-core 和 task-sdk 中。从 airflow-core/pyproject.toml 可以看到:
[tool.hatch.build.targets.sdist.force-include]
"../shared/configuration/src/airflow_shared/configuration" = "src/airflow/_shared/configuration"
"../shared/serialization/src/airflow_shared/serialization" = "src/airflow/_shared/serialization"
"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability"
# ... 共 14 个共享模块
5. 依赖关系分析:从 pyproject.toml 看技术选型
5.1 包间依赖关系
通过分析各个 pyproject.toml,我们可以绘制出包之间的依赖图:
┌─────────────────┐
│ providers/* │
│ (按需安装) │
└────────┬────────┘
│ 依赖
▼
┌──────────────┐ ┌─────────────────┐
│ airflow-ctl │────▶│ task-sdk │
│ (管理工具) │ │ (DAG 编写接口) │
└──────────────┘ └────────┬────────┘
│ 依赖
▼
┌─────────────────┐
│ airflow-core │
│ (调度引擎) │
└────────┬────────┘
│ 依赖
▼
┌─────────────────┐
│ shared/* │
│ (共享基础库) │
└─────────────────┘
具体版本约束(来自 task-sdk/pyproject.toml):
# task-sdk 对 core 的依赖约束
"apache-airflow-core<3.4.0,>=3.3.0"
而 airflow-core 对 task-sdk 的依赖(来自 airflow-core/pyproject.toml):
# core 对 task-sdk 的依赖约束
"apache-airflow-task-sdk<1.4.0,>=1.3.0"
这形成了一个双向版本约束关系,确保两个包在兼容的版本范围内协同工作。
5.2 核心技术栈分析
从 airflow-core/pyproject.toml 的 dependencies 字段,我们可以解读 Airflow 的技术选型:
Web 框架层:
"fastapi[standard-no-fastapi-cloud-cli]>=0.129.0" # REST API 框架
"uvicorn>=0.37.0" # ASGI 服务器
"starlette>=0.45.0" # ASGI 底层框架
"pydantic>=2.11.0" # 数据验证(FastAPI 核心)
Airflow 3.x 从 Flask 迁移到了 FastAPI,这是一个重大的技术栈升级,带来了更好的性能和类型安全。
数据库层:
"sqlalchemy[asyncio]>=2.0.48" # ORM 框架(支持异步)
"alembic>=1.13.1, <2.0" # 数据库迁移
"aiosqlite>=0.20.0,<0.22.0" # SQLite 异步驱动
调度相关:
"croniter>=2.0.2" # Cron 表达式解析
"pendulum>=3.1.0" # 时间处理(比 datetime 更好用)
"python-dateutil>=2.7.0" # 日期工具
可观测性:
"opentelemetry-api>=1.27.0" # OpenTelemetry 追踪
"opentelemetry-exporter-otlp>=1.27.0" # OTLP 导出器
"structlog>=25.4.0" # 结构化日志
序列化与模板:
"jinja2>=3.1.5" # 模板引擎(用于 DAG 参数模板化)
"msgspec>=0.19.0" # 高性能序列化
"jsonschema>=4.19.1" # JSON Schema 验证
安全:
"cryptography>=44.0.3" # 加密库(Fernet 加密 Connection)
"pyjwt>=2.11.0" # JWT Token
"itsdangerous>=2.0" # 安全签名
5.3 预装 Provider
airflow-core 默认预装了一些基础 Provider:
# pre-installed providers
"apache-airflow-providers-common-compat>=1.7.4" # 兼容层
"apache-airflow-providers-common-io>=1.6.3" # IO 抽象
"apache-airflow-providers-common-sql>=1.28.1" # SQL 通用逻辑
"apache-airflow-providers-smtp>=2.3.1" # 邮件通知
"apache-airflow-providers-standard>=1.9.0" # 标准 Operator
这意味着安装 airflow-core 后,你立即可以使用 PythonOperator、BashOperator 等基础 Operator,无需额外安装。
6. 系统运行架构图
理解了代码结构后,让我们来看 Airflow 在运行时的各组件关系:
┌──────────────────────────────────────────────────────────────────┐
│ Airflow 运行时架构 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ 解析 DAG 文件 ┌─────────────────────┐ │
│ │ DAG 文件 │ ──────────────────▶ │ DAG Processor │ │
│ │ (Python) │ │ (dag_processing/) │ │
│ └─────────────┘ └──────────┬──────────┘ │
│ │ │
│ 序列化存入数据库 │
│ ▼ │
│ ┌─────────────┐ REST API ┌─────────────────────┐ │
│ │ Web UI │◀──────────────────▶ │ API Server │ │
│ │ (React) │ │ (api_fastapi/) │ │
│ └─────────────┘ └──────────┬──────────┘ │
│ │ │
│ 读取 DAG │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Scheduler │ │
│ │ (jobs/scheduler_ │ │
│ │ job_runner.py) │ │
│ └──────────┬──────────┘ │
│ │ │
│ 提交 Task 执行 │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Executor │ │
│ │ (executors/) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌────────────────────┼──────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Local │ │ Celery │ │ K8s Pod │ │
│ │ Process │ │ Worker │ │ Worker │ │
│ └────────────┘ └──────────┘ └─────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Metadata Database │ │
│ │ (PostgreSQL / MySQL / SQLite) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
各组件的核心职责对应的源码位置:
| 组件 | 源码位置 | 核心文件 |
|---|---|---|
| DAG Processor | airflow-core/src/airflow/dag_processing/ |
manager.py, processor.py |
| Scheduler | airflow-core/src/airflow/jobs/ |
scheduler_job_runner.py(3317行) |
| Executor | airflow-core/src/airflow/executors/ |
base_executor.py, local_executor.py |
| API Server | airflow-core/src/airflow/api_fastapi/ |
app.py, core_api/, execution_api/ |
| Web UI | airflow-core/src/airflow/ui/ |
React + TypeScript + Vite |
| 数据模型 | airflow-core/src/airflow/models/ |
dag.py, dagrun.py, taskinstance.py |
| CLI | airflow-core/src/airflow/cli/ |
cli_parser.py, commands/ |
7. CLI 入口点:airflow 命令的启动过程
了解程序的入口点是理解系统的第一步。从 airflow-core/pyproject.toml 可以看到:
[project.scripts]
airflow = "airflow.__main__:main"
这意味着执行 airflow 命令时,会调用 airflow-core/src/airflow/__main__.py 中的 main() 函数:
# airflow-core/src/airflow/__main__.py
def main():
conf = configuration.conf
if conf.get("core", "security") == "kerberos":
os.environ["KRB5CCNAME"] = conf.get("kerberos", "ccache")
os.environ["KRB5_KTNAME"] = conf.get("kerberos", "keytab")
parser = cli_parser.get_parser()
argcomplete.autocomplete(parser)
args = parser.parse_args()
args.func(args)
启动流程简洁明了:
- 初始化配置
- 构建 CLI 参数解析器
- 解析命令行参数
- 调用对应的命令处理函数
例如 airflow scheduler 会调用 scheduler_command.py,airflow api-server 会调用 api_server_command.py。
8. 环境搭建:本地开发环境配置
8.1 系统要求
从 pyproject.toml 可以确认:
requires-python = ">=3.10,!=3.15"
Airflow 3.x 支持 Python 3.10 到 3.14。
8.2 使用 uv 进行依赖管理
项目使用 uv 作为包管理工具(从 pyproject.toml 中可以看到):
[tool.uv]
required-version = ">=0.11.8"
基本安装步骤:
# 1. 安装 uv(如果尚未安装)
pip install uv
# 2. 克隆项目
git clone https://github.com/apache/airflow.git
cd airflow
# 3. 使用 uv 安装依赖
uv sync
# 4. 启动独立模式(包含 Scheduler + API Server + 数据库)
uv run airflow standalone
8.3 Breeze 开发工具
Airflow 提供了名为 Breeze 的开发工具,它基于 Docker 提供了完整的开发环境:
# 进入 Breeze 开发环境
./breeze
# 在 Breeze 中运行测试
./breeze testing tests --test-type "Core"
# 构建文档
./breeze build-docs
Breeze 的详细使用说明在 dev/breeze/doc/README.rst 中。
8.4 项目构建系统
从 pyproject.toml 的 [build-system] 可以看到,项目使用 Hatchling 作为构建后端:
[build-system]
requires = ["hatchling==1.29.0", ...]
build-backend = "hatchling.build"
Hatchling 是现代 Python 项目的构建工具,相比 setuptools 更加简洁和高效。
9. 版本体系与发布策略
通过分析项目文件可以看到清晰的版本规划:
| 包名 | 当前版本 | 版本路径 |
|---|---|---|
| apache-airflow-core | 3.3.0 | airflow-core/src/airflow/__init__.py |
| apache-airflow-task-sdk | 1.3.0 | task-sdk/src/airflow/sdk/__init__.py |
| providers (各自独立版本) | 各异 | 各 Provider 的 pyproject.toml |
airflow-core 使用 3.x 主版本号,而 task-sdk 使用独立的 1.x 版本号。这意味着两者可以独立演进,只要遵循兼容性约束:
task-sdk 1.3.x兼容core 3.3.x- Provider 各自独立发版,不与 core 绑定
总结(Summary)
本课我们建立了对 Apache Airflow 的全景认知:
- 定位明确:Airflow 是一个工作流编排平台,用 Python 代码定义、调度和监控数据管道
- 核心概念:DAG → Task → Operator 构成工作流定义;Scheduler → Executor → Worker 构成执行引擎
- 架构演进:从 2.x 单体包到 3.x 的
airflow-core+task-sdk+providers三层分包架构 - 技术选型:FastAPI(Web)、SQLAlchemy(ORM)、Pydantic(数据验证)、OpenTelemetry(可观测性)
- 代码组织:清晰的模块划分和延迟导入设计,兼顾可维护性和启动性能
- 开发工具:使用 uv 管理依赖,Hatchling 构建,Breeze 提供开发环境
关键记忆点:
- DAG 开发者主要与
task-sdk(from airflow.sdk import dag, task)交互 - 平台运维人员需要理解
airflow-core的调度器、执行器和 API 服务 - Provider 按需安装,保持核心系统的轻量化
下一课简介(Next Lesson Preview)
在第2课"第一个 DAG — 从 Hello World 到 TaskFlow API"中,我们将:
- 深入分析
tutorial.py和tutorial_taskflow_api.py两个示例 DAG - 理解传统 Operator 模式与 TaskFlow API 装饰器模式的区别
- 追踪
from airflow.sdk import dag, task背后的完整导入链路 - 实际编写一个包含数据提取、转换、加载的 ETL 管道
- 深入
task-sdk/src/airflow/sdk/definitions/dag.py理解 DAG 类的内部实现
下一课将是你动手编写代码的开始,建议先确保开发环境已经搭建完成。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)