【亲测可用】Dagster数据编排框架实战教程-第1章:Dagster基础概念和安装
·
第1章:Dagster基础概念和安装
学习目标
- 理解Dagster的基本概念
- 掌握Dagster的安装方法
- 创建第一个Dagster应用
- 了解Dagster的核心组件
目录
什么是Dagster
Dagster是一个现代的数据编排平台,专门为构建、运行和监控数据管道而设计。它的核心特点包括:
主要特性
- 声明式管道定义:使用Python代码定义数据管道
- 类型安全:强大的类型系统确保数据质量
- 依赖管理:自动管理任务之间的依赖关系
- 可观测性:内置的监控和日志系统
- 测试友好:易于编写单元测试和集成测试
适用场景
- ETL/ELT数据管道
- 机器学习工作流
- 数据质量检查
- 数据仓库维护
- 实时数据处理
Dagster vs 其他编排工具
与Airflow比较
| 特性 | Dagster | Airflow |
|---|---|---|
| 编程范式 | 声明式Python | 基于DAG的Python |
| 类型系统 | 强大,内置 | 较弱 |
| 测试支持 | 优秀 | 一般 |
| 学习曲线 | 中等 | 较陡峭 |
| 部署复杂度 | 较低 | 较高 |
与Prefect比较
| 特性 | Dagster | Prefect |
|---|---|---|
| 核心概念 | Assets为中心 | Flows为中心 |
| 调度系统 | 内置强大 | 灵活但复杂 |
| 监控界面 | 优秀 | 优秀 |
| 社区生态 | 快速增长 | 成熟稳定 |
安装和配置
系统要求
- Python 3.8+
- pip 20.0+
- 4GB+ RAM(推荐)
安装步骤
- 创建虚拟环境(推荐)
python -m venv dagster-env
source dagster-env/bin/activate # Linux/Mac
# 或
dagster-env\Scripts\activate # Windows
- 安装Dagster核心包
pip install dagster dagster-webserver
- 验证安装
python -c "import dagster; print(f'Dagster版本: {dagster.__version__}')"
- 安装可选依赖
# 数据处理
pip install pandas numpy
# 数据库连接
pip install sqlalchemy psycopg2-binary
# 开发工具
pip install pytest black isort
配置开发环境
创建.env文件:
# 开发环境配置
DAGSTER_HOME=./.dagster
LOG_LEVEL=INFO
# 数据库连接(可选)
DATABASE_URL=postgresql://user:password@localhost:5432/dagster_db
创建workspace.yaml文件:
load_from:
- python_module: dagster_tutorial.chapter_01_basics
核心概念
1. Op(操作)
Op是Dagster中的基本计算单元,类似于函数。每个op:
- 执行特定的任务
- 可以有输入和输出
- 支持配置参数
- 可以记录日志
2. Job(作业)
Job是由多个op组成的执行单元:
- 定义op之间的依赖关系
- 可以被调度执行
- 支持配置和参数化
3. Asset(资产)
Asset是数据管道中的产出物:
- 可以是数据文件、数据库表等
- 有版本和依赖关系
- 支持数据沿袭追踪
4. Resource(资源)
Resource是外部依赖的抽象:
- 数据库连接
- API客户端
- 文件系统访问
- 配置管理
5. Schedule(调度)
Schedule定义作业的执行时间:
- 定时执行
- 支持cron表达式
- 可以暂停/恢复
6. Sensor(传感器)
Sensor基于事件触发作业:
- 文件变化
- 数据库更新
- API调用
- 自定义事件
第一个Dagster应用
简单示例
创建hello_dagster.py:
from dagster import op, job
@op
def hello_world(context):
context.log.info("Hello, Dagster!")
return "Hello from Dagster!"
@job
def hello_world_job():
hello_world()
运行方式
- 命令行运行
dagster job execute -f hello_dagster.py -n hello_world_job
- Python脚本运行
from dagster import execute_job
from hello_dagster import hello_world_job
result = execute_job(hello_world_job)
print(f"执行结果: {result.success}")
- Dagster UI运行
dagster dev
然后访问 http://localhost:3000
带参数的示例
from dagster import op, job, Field, String
@op(
config_schema={
"name": Field(String, description="要问候的名字", default_value="World"),
"repeat": Field(int, description="重复次数", default_value=1),
}
)
def greet_person(context):
name = context.op_config["name"]
repeat = context.op_config["repeat"]
message = f"Hello, {name}! " * repeat
context.log.info(f"Greeting: {message}")
return message.strip()
@job
def greeting_job():
greet_person()
运行带配置的job:
dagster job execute -f hello_dagster.py -n greeting_job --config '{"ops": {"greet_person": {"config": {"name": "Alice", "repeat": 3}}}}'
运行和监控
启动开发服务器
dagster dev
访问Web界面
- 打开浏览器访问 http://localhost:3000
- 查看作业状态
- 监控执行日志
- 查看数据沿袭
常用命令
- 列出所有作业
dagster job list
- 执行特定作业
dagster job execute -f your_file.py -n your_job_name
- 查看作业详情
dagster job describe -f your_file.py -n your_job_name
- 清理缓存
dagster instance migrate
监控指标
- 作业执行时间
- 资源使用情况
- 错误率统计
- 数据质量指标
练习和扩展
基础练习
- 修改
hello_worldop,使其接受一个名字参数 - 创建两个op,一个生成随机数,另一个计算平均值
- 添加op配置,控制随机数的范围和数量
中级练习
- 创建一个处理CSV文件的ETL管道
- 添加数据验证和错误处理
- 实现op之间的数据传递
高级练习
- 集成外部API获取数据
- 实现数据质量检查规则
- 添加监控和告警功能
扩展阅读
故障排除
常见问题
-
导入错误
- 确保Dagster已正确安装
- 检查Python路径
- 验证虚拟环境
-
作业执行失败
- 检查op配置
- 查看详细日志
- 验证依赖关系
-
Web界面无法访问
- 检查端口占用
- 验证防火墙设置
- 查看服务日志
调试技巧
- 使用
context.log记录关键信息 - 添加详细的错误处理
- 逐步测试每个op
- 使用Dagster的测试工具
下一步
完成本章学习后,您应该:
- 理解Dagster的基本概念
- 能够安装和配置Dagster
- 创建简单的数据管道
- 使用Dagster UI监控作业
继续学习第2章:Ops和Jobs基础,深入了解Dagster的核心组件。
basic_examples.py
"""
"""
第1章:Dagster基础示例代码 - 详细注释版
================================================================================
第一章:Dagster核心概念入门
================================================================================
本章节将帮助你理解Dagster的最核心概念:
1. Op(操作):Dagster中的最小计算单元,类似于其他框架中的"任务"或"函数"
2. Job(作业):由多个Op组成的有向无环图(DAG),定义了Op之间的执行顺序
3. Graph(图):Op的组合结构,可以被多个Job复用
4. In/Out(输入/输出):定义Op的输入输出规范
5. Field/Config(字段/配置):定义Op的可配置参数
在开始之前,让我们理解几个关键概念:
【什么是Dagster?】
Dagster是一个现代化的数据编排平台,用于构建、管理和监控数据管线。
你可以把它想象成一个"数据管道的指挥官",它会按照你定义的顺序,
一个接一个地执行数据处理任务,并且能够:
- 自动处理任务之间的依赖关系
- 提供重试机制(某个任务失败时自动重试)
- 支持定时执行(像cron一样)
- 监控任务执行状态
- 支持数据回填(重新处理历史数据)
【Dagster的核心组件】
- Op(操作):执行具体的数据处理逻辑
- Job(作业):定义Op的执行顺序和依赖关系
- Asset(资产):代表数据产物,可以跟踪数据血缘
- Resource(资源):外部依赖(如数据库连接、API客户端等)
- Schedule(调度器):定时触发Job执行
- Sensor(传感器):响应外部事件触发Job执行
================================================================================
"""
# ================================================================================
# 第一部分:导入必要的模块
# ================================================================================
"""
在Python中,我们使用import语句来导入其他模块中定义的函数、类或变量。
这里我们从dagster库导入各种组件。
"""
from dagster import Definitions
from dagster import op, job, graph, In, Out, Field, String, Int
"""
导入说明:
- op: 装饰器,用于将一个Python函数转换为Dagster的Op(操作单元)
- job: 装饰器,用于将一个或多个Op组合成Job(作业)
- graph: 装饰器,用于将Op组合成Graph(图),可以复用
- In: 类,用于定义Op的输入规范(输入什么类型的数据)
- Out: 类,用于定义Op的输出规范(输出什么类型的数据)
- Field: 类,用于定义Op的配置字段(可调整的参数)
- String: 类型注解,表示字符串类型
- Int: 类型注解,表示整数类型
这些是Dagster最基础的构建块,让我们逐一了解:
"""
from dagster import get_dagster_logger
"""
get_dagster_logger: 函数,用于获取Dagster的日志记录器
为什么需要日志?
- 在Dagster中,Op可能运行在分布式环境中的不同进程或机器上
- 日志是追踪Op执行过程的重要手段
- 通过context.log.info()等方法,你可以记录Op执行时的关键信息
使用示例:
logger = get_dagster_logger()
logger.info("这是一条信息日志")
logger.warning("这是一条警告日志")
logger.error("这是一条错误日志")
"""
import time
"""
Python标准库,用于处理时间相关的操作
- time.time(): 返回当前时间戳(从1970年1月1日开始的秒数)
- time.sleep(): 让程序暂停执行指定的秒数
"""
from typing import List, Dict, Any, Tuple
"""
typing模块:Python的类型注解系统
为什么要用类型注解?
1. 让代码更清晰,别人一看就知道函数期望什么类型的输入输出
2. IDE可以提供更好的代码补全和错误检查
3. Dagster使用类型注解来验证数据管线中的数据类型
- List[int]: 整数列表,例如 [1, 2, 3, 4, 5]
- Dict[str, Any]: 键为字符串,值为任意类型的字典,例如 {"name": "张三", "age": 25}
- Tuple[int, float]: 元组,包含整数和浮点数,例如 (10, 3.14)
- Any: 任意类型
"""
import random
"""
Python标准库,用于生成随机数
- random.randint(a, b): 生成[a, b]区间内的随机整数
- random.choice(sequence): 从序列中随机选择一个元素
"""
# ================================================================================
# 第二部分:理解@op装饰器
# ================================================================================
"""
【@op装饰器详解】
@op是Dagster中最核心的装饰器之一。它将一个普通的Python函数转换为一个"Op"。
装饰器是什么?
装饰器是Python的一种语法糖,形式是 @装饰器名,放在函数定义的上方。
@op装饰器会"包装"你的函数,添加Dagster需要的基础设施代码。
@op可以接收以下常用参数:
1. config_schema: 定义这个Op可以接受哪些配置参数
2. ins: 定义这个Op需要哪些输入
3. out: 定义这个Op的输出
什么是config_schema?
config_schema就像是函数的"参数规范",告诉Dagster:
- 这个Op需要哪些配置项
- 每个配置项是什么类型(String、Int、Bool等)
- 是否有默认值
- 有什么描述(方便他人理解)
示例:
@op(config_schema={"name": Field(String, default_value="World")})
def greet(context):
name = context.op_config["name"] # 获取配置值
return f"Hello, {name}!"
什么是ins(输入)?
ins是"inputs"的缩写,定义了Op需要接收哪些数据。
每个输入需要指定:
- 输入的名称
- 输入的类型(使用In类包装)
- 可选的描述
示例:
@op(ins={"numbers": In(List[int], description="数字列表")})
def sum_numbers(numbers):
return sum(numbers)
什么是out(输出)?
out是"outputs"的缩写,定义了Op会输出什么数据。
每个输出需要指定:
- 输出的名称
- 输出的类型(使用Out类包装)
- 可选的描述
示例:
@op(out={"result": Out(int, description="求和结果")})
def sum_numbers():
return 42
context参数是什么?
每个被@op装饰的函数都会自动接收一个context参数(通常是第一个参数)。
context是OpExecutionContext类型的对象,提供了:
- context.op_config: 获取Op的配置参数
- context.log: 获取日志记录器
- context.resources: 访问定义的资源
- context.retry_context: 获取重试相关信息
- 等等...
注意:context参数名可以是任意名称,但约定俗成用context。
"""
# ================================================================================
# 示例1:带参数的Op
# ================================================================================
"""
【第一个示例:带配置的问候Op】
这个示例展示了一个最基本的Op,它:
1. 接受配置参数(名字和重复次数)
2. 根据配置生成问候语
3. 使用日志记录执行过程
4. 返回问候语
重点学习:
- 如何定义config_schema
- 如何在Op中使用context.op_config获取配置
- 如何使用context.log记录日志
"""
@op(
config_schema={
# config_schema是一个字典,键是配置项名称,值是Field对象
# Field用于定义配置项的详细规范
# 第一个配置项:name(名字)
# - String表示这个配置项是字符串类型
# - description是描述,方便他人理解这个配置项的用途
# - default_value是默认值,如果用户不提供,就使用这个值
"name": Field(String, description="要问候的名字", default_value="World"),
# 第二个配置项:repeat(重复次数)
# - Int表示这个配置项是整数类型
# - 用户可以指定问候语重复几次
"repeat": Field(Int, description="重复次数", default_value=1),
}
)
def greet_person(context) -> str:
"""
这是一个带配置的问候Op。
参数:
context: Op执行上下文,由Dagster自动传入
包含了配置、日志、资源等信息
返回:
str: 生成的问候语,例如 "Hello, World! Hello, World! Hello, World! "
"""
# context.op_config是字典类型,存储用户提供的配置
# 我们从中获取name和repeat的值
name = context.op_config["name"]
repeat = context.op_config["repeat"]
# 生成问候语:字符串乘法相当于重复连接
# "Hello, {}! ".format(name) * repeat 等于重复repeat次
message = f"Hello, {name}! " * repeat
# 使用context.log记录日志
# log.info()用于记录一般信息
context.log.info(f"Greeting: {message}")
# strip()去除字符串两端的空白字符
return message.strip()
# ================================================================================
# 示例2:带输入输出的Op
# ================================================================================
"""
【第二个示例:计算统计信息】
这个示例展示了一个接受输入、返回多个输出的Op:
1. 接受一个整数列表作为输入
2. 计算总和和平均值作为输出
3. 使用Tuple(元组)返回多个值
重点学习:
- 如何定义ins(输入)
- 如何定义out(多个输出)
- Tuple类型注解的使用
"""
# In和Out类用于定义Op的输入输出规范
# ins参数:是一个字典,键是输入名称,值是In对象
# out参数:是一个字典,键是输出名称,值是Out对象
@op(
ins={
# "numbers"是输入的名称,可以是任意字符串
# In(List[int])表示输入是一个整数列表
# description参数提供对输入的描述
"numbers": In(List[int], description="输入的数字列表")
},
out={
# 第一个输出:sum(总和)
"sum": Out(int, description="数字总和"),
# 第二个输出:avg(平均值)
"avg": Out(float, description="数字平均值")
},
)
def calculate_stats(numbers: List[int]) -> Tuple[int, float]:
"""
计算数字列表的统计信息。
参数:
numbers: 输入的数字列表,例如 [1, 2, 3, 4, 5]
返回:
Tuple[int, float]: 包含总和和平均值的元组
例如 (15, 3.0) 表示总和15,平均值3.0
"""
# 获取Dagster日志记录器
logger = get_dagster_logger()
# 边界情况处理:如果列表为空
if not numbers:
logger.warning("输入列表为空")
return 0, 0.0
# 计算总和:sum()是Python内置函数,计算列表元素之和
total = sum(numbers)
# 计算平均值:总和除以元素个数
# 注意:浮点数除法用/,整数除法用//
average = total / len(numbers)
# 记录计算结果
logger.info(f"计算统计: 总和={total}, 平均值={average:.2f}")
# :.2f表示格式化浮点数,保留2位小数
# 返回元组:Python允许直接return a, b,会自动包装成(a, b)
return total, average
# ================================================================================
# 示例3:数据处理Op
# ================================================================================
"""
【第三个示例:数据处理】
这个示例展示了对字典数据的处理:
1. 接受一个字典作为输入
2. 复制并增强输入数据(添加新字段)
3. 如果数据包含数值列表,计算汇总统计
重点学习:
- Dict[str, Any]类型的使用
- 字典的copy()方法
- isinstance()类型检查
"""
@op(
ins={
"data": In(Dict[str, Any], description="输入数据")
# Dict[str, Any]表示输入是一个字典
# 键必须是字符串,值可以是任意类型
},
out={
"processed_data": Out(Dict[str, Any], description="处理后的数据")
},
)
def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""
处理输入数据,添加元数据和统计信息。
参数:
data: 输入字典,例如 {"name": "test", "values": [1, 2, 3]}
返回:
Dict[str, Any]: 处理后的字典,包含原始数据和新增字段
"""
logger = get_dagster_logger()
# copy()方法创建字典的浅拷贝
# 这样我们可以在不修改原始数据的情况下添加新字段
processed = data.copy()
# 添加处理时间戳
# time.time()返回当前时间的时间戳(1970年至今的秒数)
processed["processed_at"] = time.time()
processed["processed_by"] = "dagster_tutorial"
# 如果数据中包含"values"字段,且是数值列表,计算统计
if "values" in processed:
values = processed["values"]
# isinstance()检查values是否是列表,且所有元素都是数值类型
# all()检查序列中所有元素是否都满足条件
if isinstance(values, list) and all(isinstance(v, (int, float)) for v in values):
# 使用sum()计算总和
processed["sum"] = sum(values)
# len()获取列表长度
processed["count"] = len(values)
# 计算平均值
processed["average"] = sum(values) / len(values) if values else 0
logger.info(f"数据处理完成: {len(processed)} 个字段")
return processed
# ================================================================================
# 示例4:模拟长时间运行的Op
# ================================================================================
"""
【第四个示例:模拟长时间运行的任务】
这个示例展示了一个模拟耗时操作的Op:
1. 接受duration和should_fail作为配置
2. 模拟执行进度
3. 可以配置为失败(用于测试重试机制)
重点学习:
- 模拟长时间运行的模式
- 进度日志的记录方式
- 条件抛出异常
"""
@op(
config_schema={
# duration:模拟任务持续时间(秒)
"duration": Field(float, description="模拟运行时间(秒)", default_value=1.0),
# should_fail:是否模拟失败
"should_fail": Field(bool, description="是否模拟失败", default_value=False),
}
)
def simulate_long_task(context) -> str:
"""
模拟一个长时间运行的任务,可选择失败。
参数:
context: Op执行上下文
返回:
str: 任务完成消息
"""
# 从配置中获取参数
duration = context.op_config["duration"]
should_fail = context.op_config["should_fail"]
context.log.info(f"开始模拟任务,预计时长: {duration}秒")
# 模拟工作进度:将总时长分成10个阶段
for i in range(10):
# time.sleep()暂停执行指定的秒数
time.sleep(duration / 10)
# 计算当前进度百分比
progress = (i + 1) * 10
context.log.info(f"任务进度: {progress}%")
# 如果配置要求失败,则抛出异常
if should_fail:
# raise语句用于抛出异常
# 异常会中断程序执行,并传递给Dagster的重试机制
raise Exception("模拟任务失败!")
# 格式化字符串:f-string中:表示格式化
result = f"任务完成,耗时{duration}秒"
context.log.info(result)
return result
# ================================================================================
# 示例5:生成数据的Op
# ================================================================================
"""
【第五个示例:生成测试数据】
这个示例展示了几种不同的数据生成Op:
1. generate_numbers:生成固定数字列表
2. create_example_data:创建示例字典
3. generate_data:生成随机或顺序测试数据
重点学习:
- 返回Python内置数据结构
- 字典的构建方式
- random模块的使用
"""
@op
def generate_numbers(context) -> List[int]:
"""
生成一个固定的数字列表。
返回:
List[int]: [1, 2, 3, 4, 5]
"""
context.log.info("生成数字列表: [1, 2, 3, 4, 5]")
return [1, 2, 3, 4, 5]
@op
def create_example_data(context) -> Dict[str, Any]:
"""
创建包含数值列表的示例数据。
返回:
Dict[str, Any]: {"values": [1, 2, 3, 4, 5], "name": "example"}
"""
context.log.info("创建示例数据")
return {"values": [1, 2, 3, 4, 5], "name": "example"}
@op
def generate_data(context) -> List[Dict[str, Any]]:
"""
生成测试数据集,支持随机或顺序模式。
返回:
List[Dict[str, Any]]: 生成的测试数据列表
"""
# 固定生成10条数据
count = 10
# 数据类型:可选"random"(随机)或"sequential"(顺序)
data_type = "random"
# 初始化空列表来存储生成的数据
data = []
# for循环:range(count)生成0到count-1的序列
for i in range(count):
if data_type == "random":
# random.randint(1, 100)生成1到100之间的随机整数
# random.choice(["A", "B", "C", "D"])从列表中随机选一个
item = {
"id": i + 1, # id从1开始
"value": random.randint(1, 100), # 随机值
"category": random.choice(["A", "B", "C", "D"]), # 随机分类
"timestamp": time.time() + i, # 时间戳
}
elif data_type == "sequential":
# chr(65)返回ASCII字符'A',chr(65+i%4)返回A、B、C、D循环
item = {
"id": i + 1,
"value": i * 10, # 顺序值:0, 10, 20, 30...
"category": chr(65 + (i % 4)), # A, B, C, D循环
"timestamp": time.time() + i,
}
else:
# 默认情况
item = {
"id": i + 1,
"value": 0,
"category": "unknown",
"timestamp": time.time(),
}
# 将每条数据添加到列表
data.append(item)
context.log.info(f"生成了 {len(data)} 条{data_type}类型的数据")
return data
# ================================================================================
# 示例6:验证数据
# ================================================================================
"""
【第六个示例:数据验证Op】
这个示例展示了对数据集的验证:
1. 接受一个字典列表作为输入
2. 根据规则验证每条数据
3. 返回有效数据和无效数据计数
重点学习:
- 复杂的数据验证逻辑
- 返回多个输出
- 布尔类型和计数
"""
@op(
ins={
"data": In(List[Dict[str, Any]], description="要验证的数据")
# 这是一个字典列表,每个字典代表一条数据记录
},
out={
"valid_data": Out(List[Dict[str, Any]], description="有效数据"),
"invalid_count": Out(int, description="无效数据数量")
},
)
def validate_data(data: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], int]:
"""
验证数据列表,筛选出有效数据。
验证规则:
1. 必须有"id"字段,且为整数
2. 必须有"value"字段,且为数值类型
3. 必须有"category"字段,且为字符串
参数:
data: 待验证的数据列表
返回:
Tuple[List[Dict[str, Any]], int]: (有效数据列表, 无效数据数量)
"""
logger = get_dagster_logger()
# 初始化结果容器
valid_data = [] # 存储验证通过的数据
invalid_count = 0 # 计数器,统计失败数量
# 遍历每条数据进行验证
for item in data:
# 默认假设数据有效
is_valid = True
# 验证规则1:检查"id"字段
# in运算符检查键是否存在于字典中
# isinstance(x, int)检查x是否是整数类型
if "id" not in item or not isinstance(item["id"], int):
is_valid = False
# 验证规则2:检查"value"字段(可以是int或float)
if "value" not in item or not isinstance(item["value"], (int, float)):
is_valid = False
# 验证规则3:检查"category"字段
if "category" not in item or not isinstance(item["category"], str):
is_valid = False
# 根据验证结果分类存储
if is_valid:
valid_data.append(item) # 添加到有效列表
else:
invalid_count += 1 # 计数加1
logger.info(f"数据验证完成: 有效={len(valid_data)}, 无效={invalid_count}")
return valid_data, invalid_count
# ================================================================================
# 示例7:生成汇总报告
# ================================================================================
"""
【第七个示例:多输入单输出Op】
这个示例展示了一个接收多个输入的Op:
1. 接收7个不同类型的输入
2. 生成格式化的汇总报告
重点学习:
- 多个输入的定义方式
- 字符串的格式化方法
- 生成文本报告
字符串格式化方法说明:
1. f-string(推荐):f"Hello, {name}!"
2. .format():"Hello, {}!".format(name)
3. %操作符(老式):"Hello, %s!" % name
"""
@op(
ins={
# 字符串输入:问候语
"greeting": In(str, description="问候语"),
# 整数输入:统计总和
"stats_sum": In(int, description="统计总和"),
# 浮点数输入:统计平均值
"stats_avg": In(float, description="统计平均值"),
# 字典输入:处理后的数据
"processed_data": In(Dict[str, Any], description="处理后的数据"),
# 字符串输入:任务执行结果
"task_result": In(str, description="任务结果"),
# 列表输入:有效数据
"valid_data": In(List[Dict[str, Any]], description="有效数据"),
# 整数输入:无效数据计数
"invalid_count": In(int, description="无效数据数量"),
},
out={
"report": Out(str, description="汇总报告")
},
)
def generate_report(
greeting: str,
stats_sum: int,
stats_avg: float,
processed_data: Dict[str, Any],
task_result: str,
valid_data: List[Dict[str, Any]],
invalid_count: int,
) -> str:
"""
汇总所有处理结果,生成格式化的报告。
参数:
greeting: 问候语字符串
stats_sum: 数字总和
stats_avg: 数字平均值
processed_data: 处理后的数据字典
task_result: 任务执行结果描述
valid_data: 有效数据列表
invalid_count: 无效数据条数
返回:
str: 格式化的文本报告
"""
logger = get_dagster_logger()
# 构建报告内容
# "=" * 50 创建50个等号组成的字符串,用作分隔线
report_lines = [
"=" * 50,
"DAGSTER教程 - 第1章示例汇总报告",
"=" * 50,
f"1. 问候: {greeting}",
f"2. 统计: 总和={stats_sum}, 平均值={stats_avg:.2f}",
# len(processed_data)获取字典的键数量
f"3. 数据处理: 包含{len(processed_data)}个字段",
f"4. 任务结果: {task_result}",
# 统计有效数据条数
f"5. 数据验证: 有效数据{len(valid_data)}条, 无效数据{invalid_count}条",
# valid_data[:2]是切片操作,获取前2条数据
f"6. 有效数据样例: {valid_data[:2] if valid_data else '无'}",
# time.strftime()将时间戳转换为格式化字符串
f"7. 报告生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
"=" * 50,
]
# "\n".join()用换行符连接列表中的所有字符串
report = "\n".join(report_lines)
logger.info("报告生成完成")
return report
# ================================================================================
# 第三部分:理解@job装饰器
# ================================================================================
"""
【@job装饰器详解】
@job装饰器将一个或多个Op组合成一个"Job"(作业)。
Job是什么?
- Job定义了Op的执行顺序和依赖关系
- 在Dagster中,Job是一个有向无环图(DAG)
- Op之间的边表示数据依赖:Op B依赖Op A的输出
如何在Job中组合Op?
1. 直接调用Op函数(不带参数)
2. Op的返回值可以传递给其他Op
3. Dagster会自动解析依赖关系
Op别名(alias)有什么用?
- 使用.op_name.alias("新名称")()可以创建Op的副本
- 这样同一个Op可以在Job中使用多次
- 类似于"复制粘贴"一个Op
重要约束:
1. Job组合中不能直接传递字面值(如数字、字符串)
2. 必须通过Op生成这些值
3. 如果需要常量,使用专门的constant Op
"""
# ================================================================================
# 综合示例Job
# ================================================================================
@job
def comprehensive_example_job():
"""
综合示例Job,展示多个Op的复杂依赖关系。
这个Job展示了Dagster的各种Op组合模式:
1. 顺序依赖:A -> B 表示B等待A完成后执行
2. 并行执行:A、B、C 之间无依赖,可以并行执行
3. 多输出:Op可以有多个输出
4. 多输入:Op可以依赖多个其他Op的输出
"""
# 生成数据:返回一个字典列表
data = generate_data()
# 以下Op可以并行执行,因为它们之间没有依赖关系
# 调用带配置的Op时不传参数,使用默认配置
greeting = greet_person()
# 返回固定数字列表
numbers = generate_numbers()
# 依赖numbers的输出
# calculate_stats返回两个值,使用元组解构接收
stats_sum, stats_avg = calculate_stats(numbers)
# 返回示例字典
example_data = create_example_data()
# 依赖example_data的输出
processed = process_data(example_data)
# 使用默认配置运行长时间任务
task_result = simulate_long_task()
# 依赖data的输出
valid_data, invalid_count = validate_data(data)
# 最终Op依赖多个输入
# 注意:这里传入的顺序要与Op定义的参数顺序一致
report = generate_report(
greeting, # -> greeting: str
stats_sum, # -> stats_sum: int
stats_avg, # -> stats_avg: float
processed, # -> processed_data: Dict
task_result, # -> task_result: str
valid_data, # -> valid_data: List
invalid_count, # -> invalid_count: int
)
# ================================================================================
# 简单线性Job
# ================================================================================
@job
def simple_linear_job():
"""
简单的线性Job,演示基本的Op链式调用。
执行流程:
generate_data.alias("data_generator")() -> validate_data.alias("data_validator")(data)
alias()创建Op的命名副本,这样同一个Op类型可以在Job中使用多次。
"""
# generate_data别名,使用别名后是一个独立的Op实例
data = generate_data.alias("data_generator")()
# validate_data别名,依赖data的输出
return validate_data.alias("data_validator")(data)
# ================================================================================
# 可配置Job
# ================================================================================
@job
def configurable_job():
"""
可配置的Job,展示如何使用别名创建带不同配置的Op实例。
"""
# 别名后可以有不同的行为(目前共享相同配置)
greeting = greet_person.alias("custom_greeting")()
long_task = simulate_long_task.alias("custom_task")()
# 返回多个值
return greeting, long_task
# ================================================================================
# 导出定义
# ================================================================================
"""
__all__是一个特殊变量,定义了这个模块被"from module import *"时
哪些符号会被导出。这是一种明确公开API的方式。
但在实际使用Dagster时,通常不需要关心__all__,
因为我们直接通过文件名导入:
from dagster_tutorial.chapter_01_basics.basic_examples import greet_person
"""
__all__ = [
"greet_person",
"calculate_stats",
"process_data",
"simulate_long_task",
"generate_data",
"validate_data",
"generate_report",
"comprehensive_example_job",
"simple_linear_job",
"configurable_job",
]
# ================================================================================
# 运行说明
# ================================================================================
"""
【如何运行本文件中的示例】
方法1:使用Dagster命令执行Job
------------------------------------------
# 在项目根目录下运行
dagster job execute -f dagster_tutorial/chapter_01_basics/basic_examples.py -j comprehensive_example_job
参数说明:
- -f: 指定包含Job定义的文件
- -j: 指定要执行的Job名称
方法2:使用Python直接执行(如果文件有if __name__ == "__main__"块)
------------------------------------------
python dagster_tutorial/chapter_01_basics/basic_examples.py
方法3:在Dagster UI中查看和执行
------------------------------------------
# 启动Dagster Web服务器
dagster dev -f dagster_tutorial/chapter_01_basics/basic_examples.py
然后在浏览器中打开 http://localhost:3000 查看和执行Job
【配置Op执行】
如果你想修改Op的配置,可以在执行时提供配置文件:
# config.yaml
ops:
greet_person:
config:
name: "Dagster Student"
repeat: 3
# 执行时指定配置
dagster job execute -f ... -c config.yaml -j comprehensive_example_job
"""
defs = Definitions(
jobs=[
comprehensive_example_job,
configurable_job,
simple_linear_job,
]
)
# dagster dev -f basic_examples.py
"""
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)