第1章:Dagster基础概念和安装

学习目标

  • 理解Dagster的基本概念
  • 掌握Dagster的安装方法
  • 创建第一个Dagster应用
  • 了解Dagster的核心组件

目录

  1. 什么是Dagster
  2. Dagster vs 其他编排工具
  3. 安装和配置
  4. 核心概念
  5. 第一个Dagster应用
  6. 运行和监控
  7. 练习和扩展

什么是Dagster

Dagster是一个现代的数据编排平台,专门为构建、运行和监控数据管道而设计。它的核心特点包括:

主要特性

  • 声明式管道定义:使用Python代码定义数据管道
  • 类型安全:强大的类型系统确保数据质量
  • 依赖管理:自动管理任务之间的依赖关系
  • 可观测性:内置的监控和日志系统
  • 测试友好:易于编写单元测试和集成测试

适用场景

  • ETL/ELT数据管道
  • 机器学习工作流
  • 数据质量检查
  • 数据仓库维护
  • 实时数据处理

Dagster vs 其他编排工具

与Airflow比较

特性 Dagster Airflow
编程范式 声明式Python 基于DAG的Python
类型系统 强大,内置 较弱
测试支持 优秀 一般
学习曲线 中等 较陡峭
部署复杂度 较低 较高

与Prefect比较

特性 Dagster Prefect
核心概念 Assets为中心 Flows为中心
调度系统 内置强大 灵活但复杂
监控界面 优秀 优秀
社区生态 快速增长 成熟稳定

安装和配置

系统要求

  • Python 3.8+
  • pip 20.0+
  • 4GB+ RAM(推荐)

安装步骤

  1. 创建虚拟环境(推荐)
python -m venv dagster-env
source dagster-env/bin/activate  # Linux/Mac
# 或
dagster-env\Scripts\activate  # Windows
  1. 安装Dagster核心包
pip install dagster dagster-webserver
  1. 验证安装
python -c "import dagster; print(f'Dagster版本: {dagster.__version__}')"
  1. 安装可选依赖
# 数据处理
pip install pandas numpy

# 数据库连接
pip install sqlalchemy psycopg2-binary

# 开发工具
pip install pytest black isort

配置开发环境

创建.env文件:

# 开发环境配置
DAGSTER_HOME=./.dagster
LOG_LEVEL=INFO

# 数据库连接(可选)
DATABASE_URL=postgresql://user:password@localhost:5432/dagster_db

创建workspace.yaml文件:

load_from:
  - python_module: dagster_tutorial.chapter_01_basics

核心概念

1. Op(操作)

Op是Dagster中的基本计算单元,类似于函数。每个op:

  • 执行特定的任务
  • 可以有输入和输出
  • 支持配置参数
  • 可以记录日志

2. Job(作业)

Job是由多个op组成的执行单元:

  • 定义op之间的依赖关系
  • 可以被调度执行
  • 支持配置和参数化

3. Asset(资产)

Asset是数据管道中的产出物:

  • 可以是数据文件、数据库表等
  • 有版本和依赖关系
  • 支持数据沿袭追踪

4. Resource(资源)

Resource是外部依赖的抽象:

  • 数据库连接
  • API客户端
  • 文件系统访问
  • 配置管理

5. Schedule(调度)

Schedule定义作业的执行时间:

  • 定时执行
  • 支持cron表达式
  • 可以暂停/恢复

6. Sensor(传感器)

Sensor基于事件触发作业:

  • 文件变化
  • 数据库更新
  • API调用
  • 自定义事件

第一个Dagster应用

简单示例

创建hello_dagster.py

from dagster import op, job

@op
def hello_world(context):
    context.log.info("Hello, Dagster!")
    return "Hello from Dagster!"

@job
def hello_world_job():
    hello_world()

运行方式

  1. 命令行运行
dagster job execute -f hello_dagster.py -n hello_world_job
  1. Python脚本运行
from dagster import execute_job
from hello_dagster import hello_world_job

result = execute_job(hello_world_job)
print(f"执行结果: {result.success}")
  1. Dagster UI运行
dagster dev

然后访问 http://localhost:3000

带参数的示例

from dagster import op, job, Field, String

@op(
    config_schema={
        "name": Field(String, description="要问候的名字", default_value="World"),
        "repeat": Field(int, description="重复次数", default_value=1),
    }
)
def greet_person(context):
    name = context.op_config["name"]
    repeat = context.op_config["repeat"]
    message = f"Hello, {name}! " * repeat
    context.log.info(f"Greeting: {message}")
    return message.strip()

@job
def greeting_job():
    greet_person()

运行带配置的job:

dagster job execute -f hello_dagster.py -n greeting_job --config '{"ops": {"greet_person": {"config": {"name": "Alice", "repeat": 3}}}}'

运行和监控

启动开发服务器

dagster dev

访问Web界面

  • 打开浏览器访问 http://localhost:3000
  • 查看作业状态
  • 监控执行日志
  • 查看数据沿袭

常用命令

  1. 列出所有作业
dagster job list
  1. 执行特定作业
dagster job execute -f your_file.py -n your_job_name
  1. 查看作业详情
dagster job describe -f your_file.py -n your_job_name
  1. 清理缓存
dagster instance migrate

监控指标

  • 作业执行时间
  • 资源使用情况
  • 错误率统计
  • 数据质量指标

练习和扩展

基础练习

  1. 修改hello_worldop,使其接受一个名字参数
  2. 创建两个op,一个生成随机数,另一个计算平均值
  3. 添加op配置,控制随机数的范围和数量

中级练习

  1. 创建一个处理CSV文件的ETL管道
  2. 添加数据验证和错误处理
  3. 实现op之间的数据传递

高级练习

  1. 集成外部API获取数据
  2. 实现数据质量检查规则
  3. 添加监控和告警功能

扩展阅读

故障排除

常见问题

  1. 导入错误

    • 确保Dagster已正确安装
    • 检查Python路径
    • 验证虚拟环境
  2. 作业执行失败

    • 检查op配置
    • 查看详细日志
    • 验证依赖关系
  3. Web界面无法访问

    • 检查端口占用
    • 验证防火墙设置
    • 查看服务日志

调试技巧

  • 使用context.log记录关键信息
  • 添加详细的错误处理
  • 逐步测试每个op
  • 使用Dagster的测试工具

下一步

完成本章学习后,您应该:

  • 理解Dagster的基本概念
  • 能够安装和配置Dagster
  • 创建简单的数据管道
  • 使用Dagster UI监控作业

继续学习第2章:Ops和Jobs基础,深入了解Dagster的核心组件。

basic_examples.py

"""
"""1章:Dagster基础示例代码 - 详细注释版

================================================================================
第一章:Dagster核心概念入门
================================================================================

本章节将帮助你理解Dagster的最核心概念:
1. Op(操作):Dagster中的最小计算单元,类似于其他框架中的"任务""函数"
2. Job(作业):由多个Op组成的有向无环图(DAG),定义了Op之间的执行顺序
3. Graph(图):Op的组合结构,可以被多个Job复用
4. In/Out(输入/输出):定义Op的输入输出规范
5. Field/Config(字段/配置):定义Op的可配置参数

在开始之前,让我们理解几个关键概念:

【什么是Dagster?】
Dagster是一个现代化的数据编排平台,用于构建、管理和监控数据管线。
你可以把它想象成一个"数据管道的指挥官",它会按照你定义的顺序,
一个接一个地执行数据处理任务,并且能够:
- 自动处理任务之间的依赖关系
- 提供重试机制(某个任务失败时自动重试)
- 支持定时执行(像cron一样)
- 监控任务执行状态
- 支持数据回填(重新处理历史数据)

【Dagster的核心组件】
- Op(操作):执行具体的数据处理逻辑
- Job(作业):定义Op的执行顺序和依赖关系
- Asset(资产):代表数据产物,可以跟踪数据血缘
- Resource(资源):外部依赖(如数据库连接、API客户端等)
- Schedule(调度器):定时触发Job执行
- Sensor(传感器):响应外部事件触发Job执行

================================================================================
"""

# ================================================================================
# 第一部分:导入必要的模块
# ================================================================================

"""
在Python中,我们使用import语句来导入其他模块中定义的函数、类或变量。
这里我们从dagster库导入各种组件。
"""
from dagster import Definitions
from dagster import op, job, graph, In, Out, Field, String, Int
"""
导入说明:
- op: 装饰器,用于将一个Python函数转换为Dagster的Op(操作单元)
- job: 装饰器,用于将一个或多个Op组合成Job(作业)
- graph: 装饰器,用于将Op组合成Graph(图),可以复用
- In: 类,用于定义Op的输入规范(输入什么类型的数据)
- Out: 类,用于定义Op的输出规范(输出什么类型的数据)
- Field: 类,用于定义Op的配置字段(可调整的参数)
- String: 类型注解,表示字符串类型
- Int: 类型注解,表示整数类型

这些是Dagster最基础的构建块,让我们逐一了解:
"""

from dagster import get_dagster_logger
"""
get_dagster_logger: 函数,用于获取Dagster的日志记录器

为什么需要日志?
- 在Dagster中,Op可能运行在分布式环境中的不同进程或机器上
- 日志是追踪Op执行过程的重要手段
- 通过context.log.info()等方法,你可以记录Op执行时的关键信息

使用示例:
    logger = get_dagster_logger()
    logger.info("这是一条信息日志")
    logger.warning("这是一条警告日志")
    logger.error("这是一条错误日志")
"""

import time
"""
Python标准库,用于处理时间相关的操作
- time.time(): 返回当前时间戳(从197011日开始的秒数)
- time.sleep(): 让程序暂停执行指定的秒数
"""

from typing import List, Dict, Any, Tuple
"""
typing模块:Python的类型注解系统

为什么要用类型注解?
1. 让代码更清晰,别人一看就知道函数期望什么类型的输入输出
2. IDE可以提供更好的代码补全和错误检查
3. Dagster使用类型注解来验证数据管线中的数据类型

- List[int]: 整数列表,例如 [1, 2, 3, 4, 5]
- Dict[str, Any]: 键为字符串,值为任意类型的字典,例如 {"name": "张三", "age": 25}
- Tuple[int, float]: 元组,包含整数和浮点数,例如 (10, 3.14)
- Any: 任意类型
"""

import random
"""
Python标准库,用于生成随机数
- random.randint(a, b): 生成[a, b]区间内的随机整数
- random.choice(sequence): 从序列中随机选择一个元素
"""


# ================================================================================
# 第二部分:理解@op装饰器
# ================================================================================

"""
【@op装饰器详解】

@op是Dagster中最核心的装饰器之一。它将一个普通的Python函数转换为一个"Op"。

装饰器是什么?
装饰器是Python的一种语法糖,形式是 @装饰器名,放在函数定义的上方。
@op装饰器会"包装"你的函数,添加Dagster需要的基础设施代码。

@op可以接收以下常用参数:
1. config_schema: 定义这个Op可以接受哪些配置参数
2. ins: 定义这个Op需要哪些输入
3. out: 定义这个Op的输出

什么是config_schema?
config_schema就像是函数的"参数规范",告诉Dagster:
- 这个Op需要哪些配置项
- 每个配置项是什么类型(String、Int、Bool等)
- 是否有默认值
- 有什么描述(方便他人理解)

示例:
    @op(config_schema={"name": Field(String, default_value="World")})
    def greet(context):
        name = context.op_config["name"]  # 获取配置值
        return f"Hello, {name}!"

什么是ins(输入)?
ins是"inputs"的缩写,定义了Op需要接收哪些数据。
每个输入需要指定:
- 输入的名称
- 输入的类型(使用In类包装)
- 可选的描述

示例:
    @op(ins={"numbers": In(List[int], description="数字列表")})
    def sum_numbers(numbers):
        return sum(numbers)

什么是out(输出)?
out是"outputs"的缩写,定义了Op会输出什么数据。
每个输出需要指定:
- 输出的名称
- 输出的类型(使用Out类包装)
- 可选的描述

示例:
    @op(out={"result": Out(int, description="求和结果")})
    def sum_numbers():
        return 42

context参数是什么?
每个被@op装饰的函数都会自动接收一个context参数(通常是第一个参数)。
context是OpExecutionContext类型的对象,提供了:
- context.op_config: 获取Op的配置参数
- context.log: 获取日志记录器
- context.resources: 访问定义的资源
- context.retry_context: 获取重试相关信息
- 等等...

注意:context参数名可以是任意名称,但约定俗成用context。
"""


# ================================================================================
# 示例1:带参数的Op
# ================================================================================

"""
【第一个示例:带配置的问候Op】

这个示例展示了一个最基本的Op,它:
1. 接受配置参数(名字和重复次数)
2. 根据配置生成问候语
3. 使用日志记录执行过程
4. 返回问候语

重点学习:
- 如何定义config_schema
- 如何在Op中使用context.op_config获取配置
- 如何使用context.log记录日志
"""

@op(
    config_schema={
        # config_schema是一个字典,键是配置项名称,值是Field对象
        # Field用于定义配置项的详细规范

        # 第一个配置项:name(名字)
        # - String表示这个配置项是字符串类型
        # - description是描述,方便他人理解这个配置项的用途
        # - default_value是默认值,如果用户不提供,就使用这个值
        "name": Field(String, description="要问候的名字", default_value="World"),

        # 第二个配置项:repeat(重复次数)
        # - Int表示这个配置项是整数类型
        # - 用户可以指定问候语重复几次
        "repeat": Field(Int, description="重复次数", default_value=1),
    }
)
def greet_person(context) -> str:
    """
    这是一个带配置的问候Op。

    参数:
        context: Op执行上下文,由Dagster自动传入
                包含了配置、日志、资源等信息

    返回:
        str: 生成的问候语,例如 "Hello, World! Hello, World! Hello, World! "
    """
    # context.op_config是字典类型,存储用户提供的配置
    # 我们从中获取name和repeat的值
    name = context.op_config["name"]
    repeat = context.op_config["repeat"]

    # 生成问候语:字符串乘法相当于重复连接
    # "Hello, {}! ".format(name) * repeat 等于重复repeat次
    message = f"Hello, {name}! " * repeat

    # 使用context.log记录日志
    # log.info()用于记录一般信息
    context.log.info(f"Greeting: {message}")

    # strip()去除字符串两端的空白字符
    return message.strip()


# ================================================================================
# 示例2:带输入输出的Op
# ================================================================================

"""
【第二个示例:计算统计信息】

这个示例展示了一个接受输入、返回多个输出的Op:
1. 接受一个整数列表作为输入
2. 计算总和和平均值作为输出
3. 使用Tuple(元组)返回多个值

重点学习:
- 如何定义ins(输入)
- 如何定义out(多个输出)
- Tuple类型注解的使用
"""

# In和Out类用于定义Op的输入输出规范
# ins参数:是一个字典,键是输入名称,值是In对象
# out参数:是一个字典,键是输出名称,值是Out对象
@op(
    ins={
        # "numbers"是输入的名称,可以是任意字符串
        # In(List[int])表示输入是一个整数列表
        # description参数提供对输入的描述
        "numbers": In(List[int], description="输入的数字列表")
    },
    out={
        # 第一个输出:sum(总和)
        "sum": Out(int, description="数字总和"),

        # 第二个输出:avg(平均值)
        "avg": Out(float, description="数字平均值")
    },
)
def calculate_stats(numbers: List[int]) -> Tuple[int, float]:
    """
    计算数字列表的统计信息。

    参数:
        numbers: 输入的数字列表,例如 [1, 2, 3, 4, 5]

    返回:
        Tuple[int, float]: 包含总和和平均值的元组
                          例如 (15, 3.0) 表示总和15,平均值3.0
    """
    # 获取Dagster日志记录器
    logger = get_dagster_logger()

    # 边界情况处理:如果列表为空
    if not numbers:
        logger.warning("输入列表为空")
        return 0, 0.0

    # 计算总和:sum()是Python内置函数,计算列表元素之和
    total = sum(numbers)

    # 计算平均值:总和除以元素个数
    # 注意:浮点数除法用/,整数除法用//
    average = total / len(numbers)

    # 记录计算结果
    logger.info(f"计算统计: 总和={total}, 平均值={average:.2f}")
    # :.2f表示格式化浮点数,保留2位小数

    # 返回元组:Python允许直接return a, b,会自动包装成(a, b)
    return total, average


# ================================================================================
# 示例3:数据处理Op
# ================================================================================

"""
【第三个示例:数据处理】

这个示例展示了对字典数据的处理:
1. 接受一个字典作为输入
2. 复制并增强输入数据(添加新字段)
3. 如果数据包含数值列表,计算汇总统计

重点学习:
- Dict[str, Any]类型的使用
- 字典的copy()方法
- isinstance()类型检查
"""

@op(
    ins={
        "data": In(Dict[str, Any], description="输入数据")
        # Dict[str, Any]表示输入是一个字典
        # 键必须是字符串,值可以是任意类型
    },
    out={
        "processed_data": Out(Dict[str, Any], description="处理后的数据")
    },
)
def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    处理输入数据,添加元数据和统计信息。

    参数:
        data: 输入字典,例如 {"name": "test", "values": [1, 2, 3]}

    返回:
        Dict[str, Any]: 处理后的字典,包含原始数据和新增字段
    """
    logger = get_dagster_logger()

    # copy()方法创建字典的浅拷贝
    # 这样我们可以在不修改原始数据的情况下添加新字段
    processed = data.copy()

    # 添加处理时间戳
    # time.time()返回当前时间的时间戳(1970年至今的秒数)
    processed["processed_at"] = time.time()
    processed["processed_by"] = "dagster_tutorial"

    # 如果数据中包含"values"字段,且是数值列表,计算统计
    if "values" in processed:
        values = processed["values"]

        # isinstance()检查values是否是列表,且所有元素都是数值类型
        # all()检查序列中所有元素是否都满足条件
        if isinstance(values, list) and all(isinstance(v, (int, float)) for v in values):
            # 使用sum()计算总和
            processed["sum"] = sum(values)
            # len()获取列表长度
            processed["count"] = len(values)
            # 计算平均值
            processed["average"] = sum(values) / len(values) if values else 0

    logger.info(f"数据处理完成: {len(processed)} 个字段")
    return processed


# ================================================================================
# 示例4:模拟长时间运行的Op
# ================================================================================

"""
【第四个示例:模拟长时间运行的任务】

这个示例展示了一个模拟耗时操作的Op:
1. 接受duration和should_fail作为配置
2. 模拟执行进度
3. 可以配置为失败(用于测试重试机制)

重点学习:
- 模拟长时间运行的模式
- 进度日志的记录方式
- 条件抛出异常
"""

@op(
    config_schema={
        # duration:模拟任务持续时间(秒)
        "duration": Field(float, description="模拟运行时间(秒)", default_value=1.0),
        # should_fail:是否模拟失败
        "should_fail": Field(bool, description="是否模拟失败", default_value=False),
    }
)
def simulate_long_task(context) -> str:
    """
    模拟一个长时间运行的任务,可选择失败。

    参数:
        context: Op执行上下文

    返回:
        str: 任务完成消息
    """
    # 从配置中获取参数
    duration = context.op_config["duration"]
    should_fail = context.op_config["should_fail"]

    context.log.info(f"开始模拟任务,预计时长: {duration}秒")

    # 模拟工作进度:将总时长分成10个阶段
    for i in range(10):
        # time.sleep()暂停执行指定的秒数
        time.sleep(duration / 10)

        # 计算当前进度百分比
        progress = (i + 1) * 10
        context.log.info(f"任务进度: {progress}%")

    # 如果配置要求失败,则抛出异常
    if should_fail:
        # raise语句用于抛出异常
        # 异常会中断程序执行,并传递给Dagster的重试机制
        raise Exception("模拟任务失败!")

    # 格式化字符串:f-string中:表示格式化
    result = f"任务完成,耗时{duration}秒"
    context.log.info(result)
    return result


# ================================================================================
# 示例5:生成数据的Op
# ================================================================================

"""
【第五个示例:生成测试数据】

这个示例展示了几种不同的数据生成Op:
1. generate_numbers:生成固定数字列表
2. create_example_data:创建示例字典
3. generate_data:生成随机或顺序测试数据

重点学习:
- 返回Python内置数据结构
- 字典的构建方式
- random模块的使用
"""

@op
def generate_numbers(context) -> List[int]:
    """
    生成一个固定的数字列表。

    返回:
        List[int]: [1, 2, 3, 4, 5]
    """
    context.log.info("生成数字列表: [1, 2, 3, 4, 5]")
    return [1, 2, 3, 4, 5]


@op
def create_example_data(context) -> Dict[str, Any]:
    """
    创建包含数值列表的示例数据。

    返回:
        Dict[str, Any]: {"values": [1, 2, 3, 4, 5], "name": "example"}
    """
    context.log.info("创建示例数据")
    return {"values": [1, 2, 3, 4, 5], "name": "example"}


@op
def generate_data(context) -> List[Dict[str, Any]]:
    """
    生成测试数据集,支持随机或顺序模式。

    返回:
        List[Dict[str, Any]]: 生成的测试数据列表
    """
    # 固定生成10条数据
    count = 10

    # 数据类型:可选"random"(随机)或"sequential"(顺序)
    data_type = "random"

    # 初始化空列表来存储生成的数据
    data = []

    # for循环:range(count)生成0到count-1的序列
    for i in range(count):
        if data_type == "random":
            # random.randint(1, 100)生成1到100之间的随机整数
            # random.choice(["A", "B", "C", "D"])从列表中随机选一个
            item = {
                "id": i + 1,  # id从1开始
                "value": random.randint(1, 100),  # 随机值
                "category": random.choice(["A", "B", "C", "D"]),  # 随机分类
                "timestamp": time.time() + i,  # 时间戳
            }
        elif data_type == "sequential":
            # chr(65)返回ASCII字符'A',chr(65+i%4)返回A、B、C、D循环
            item = {
                "id": i + 1,
                "value": i * 10,  # 顺序值:0, 10, 20, 30...
                "category": chr(65 + (i % 4)),  # A, B, C, D循环
                "timestamp": time.time() + i,
            }
        else:
            # 默认情况
            item = {
                "id": i + 1,
                "value": 0,
                "category": "unknown",
                "timestamp": time.time(),
            }

        # 将每条数据添加到列表
        data.append(item)

    context.log.info(f"生成了 {len(data)} 条{data_type}类型的数据")
    return data


# ================================================================================
# 示例6:验证数据
# ================================================================================

"""
【第六个示例:数据验证Op】

这个示例展示了对数据集的验证:
1. 接受一个字典列表作为输入
2. 根据规则验证每条数据
3. 返回有效数据和无效数据计数

重点学习:
- 复杂的数据验证逻辑
- 返回多个输出
- 布尔类型和计数
"""

@op(
    ins={
        "data": In(List[Dict[str, Any]], description="要验证的数据")
        # 这是一个字典列表,每个字典代表一条数据记录
    },
    out={
        "valid_data": Out(List[Dict[str, Any]], description="有效数据"),
        "invalid_count": Out(int, description="无效数据数量")
    },
)
def validate_data(data: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], int]:
    """
    验证数据列表,筛选出有效数据。

    验证规则:
    1. 必须有"id"字段,且为整数
    2. 必须有"value"字段,且为数值类型
    3. 必须有"category"字段,且为字符串

    参数:
        data: 待验证的数据列表

    返回:
        Tuple[List[Dict[str, Any]], int]: (有效数据列表, 无效数据数量)
    """
    logger = get_dagster_logger()

    # 初始化结果容器
    valid_data = []  # 存储验证通过的数据
    invalid_count = 0  # 计数器,统计失败数量

    # 遍历每条数据进行验证
    for item in data:
        # 默认假设数据有效
        is_valid = True

        # 验证规则1:检查"id"字段
        # in运算符检查键是否存在于字典中
        # isinstance(x, int)检查x是否是整数类型
        if "id" not in item or not isinstance(item["id"], int):
            is_valid = False

        # 验证规则2:检查"value"字段(可以是int或float)
        if "value" not in item or not isinstance(item["value"], (int, float)):
            is_valid = False

        # 验证规则3:检查"category"字段
        if "category" not in item or not isinstance(item["category"], str):
            is_valid = False

        # 根据验证结果分类存储
        if is_valid:
            valid_data.append(item)  # 添加到有效列表
        else:
            invalid_count += 1  # 计数加1

    logger.info(f"数据验证完成: 有效={len(valid_data)}, 无效={invalid_count}")
    return valid_data, invalid_count


# ================================================================================
# 示例7:生成汇总报告
# ================================================================================

"""
【第七个示例:多输入单输出Op】

这个示例展示了一个接收多个输入的Op:
1. 接收7个不同类型的输入
2. 生成格式化的汇总报告

重点学习:
- 多个输入的定义方式
- 字符串的格式化方法
- 生成文本报告

字符串格式化方法说明:
1. f-string(推荐):f"Hello, {name}!"
2. .format()"Hello, {}!".format(name)
3. %操作符(老式):"Hello, %s!" % name
"""

@op(
    ins={
        # 字符串输入:问候语
        "greeting": In(str, description="问候语"),
        # 整数输入:统计总和
        "stats_sum": In(int, description="统计总和"),
        # 浮点数输入:统计平均值
        "stats_avg": In(float, description="统计平均值"),
        # 字典输入:处理后的数据
        "processed_data": In(Dict[str, Any], description="处理后的数据"),
        # 字符串输入:任务执行结果
        "task_result": In(str, description="任务结果"),
        # 列表输入:有效数据
        "valid_data": In(List[Dict[str, Any]], description="有效数据"),
        # 整数输入:无效数据计数
        "invalid_count": In(int, description="无效数据数量"),
    },
    out={
        "report": Out(str, description="汇总报告")
    },
)
def generate_report(
    greeting: str,
    stats_sum: int,
    stats_avg: float,
    processed_data: Dict[str, Any],
    task_result: str,
    valid_data: List[Dict[str, Any]],
    invalid_count: int,
) -> str:
    """
    汇总所有处理结果,生成格式化的报告。

    参数:
        greeting: 问候语字符串
        stats_sum: 数字总和
        stats_avg: 数字平均值
        processed_data: 处理后的数据字典
        task_result: 任务执行结果描述
        valid_data: 有效数据列表
        invalid_count: 无效数据条数

    返回:
        str: 格式化的文本报告
    """
    logger = get_dagster_logger()

    # 构建报告内容
    # "=" * 50 创建50个等号组成的字符串,用作分隔线
    report_lines = [
        "=" * 50,
        "DAGSTER教程 - 第1章示例汇总报告",
        "=" * 50,
        f"1. 问候: {greeting}",
        f"2. 统计: 总和={stats_sum}, 平均值={stats_avg:.2f}",
        # len(processed_data)获取字典的键数量
        f"3. 数据处理: 包含{len(processed_data)}个字段",
        f"4. 任务结果: {task_result}",
        # 统计有效数据条数
        f"5. 数据验证: 有效数据{len(valid_data)}条, 无效数据{invalid_count}条",
        # valid_data[:2]是切片操作,获取前2条数据
        f"6. 有效数据样例: {valid_data[:2] if valid_data else '无'}",
        # time.strftime()将时间戳转换为格式化字符串
        f"7. 报告生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
        "=" * 50,
    ]

    # "\n".join()用换行符连接列表中的所有字符串
    report = "\n".join(report_lines)
    logger.info("报告生成完成")
    return report


# ================================================================================
# 第三部分:理解@job装饰器
# ================================================================================

"""
【@job装饰器详解】

@job装饰器将一个或多个Op组合成一个"Job"(作业)。

Job是什么?
- Job定义了Op的执行顺序和依赖关系
- 在Dagster中,Job是一个有向无环图(DAG)
- Op之间的边表示数据依赖:Op B依赖Op A的输出

如何在Job中组合Op?
1. 直接调用Op函数(不带参数)
2. Op的返回值可以传递给其他Op
3. Dagster会自动解析依赖关系

Op别名(alias)有什么用?
- 使用.op_name.alias("新名称")()可以创建Op的副本
- 这样同一个Op可以在Job中使用多次
- 类似于"复制粘贴"一个Op

重要约束:
1. Job组合中不能直接传递字面值(如数字、字符串)
2. 必须通过Op生成这些值
3. 如果需要常量,使用专门的constant Op
"""


# ================================================================================
# 综合示例Job
# ================================================================================

@job
def comprehensive_example_job():
    """
    综合示例Job,展示多个Op的复杂依赖关系。

    这个Job展示了Dagster的各种Op组合模式:

    1. 顺序依赖:A -> B 表示B等待A完成后执行
    2. 并行执行:A、B、C 之间无依赖,可以并行执行
    3. 多输出:Op可以有多个输出
    4. 多输入:Op可以依赖多个其他Op的输出
    """
    # 生成数据:返回一个字典列表
    data = generate_data()

    # 以下Op可以并行执行,因为它们之间没有依赖关系
    # 调用带配置的Op时不传参数,使用默认配置
    greeting = greet_person()

    # 返回固定数字列表
    numbers = generate_numbers()

    # 依赖numbers的输出
    # calculate_stats返回两个值,使用元组解构接收
    stats_sum, stats_avg = calculate_stats(numbers)

    # 返回示例字典
    example_data = create_example_data()

    # 依赖example_data的输出
    processed = process_data(example_data)

    # 使用默认配置运行长时间任务
    task_result = simulate_long_task()

    # 依赖data的输出
    valid_data, invalid_count = validate_data(data)

    # 最终Op依赖多个输入
    # 注意:这里传入的顺序要与Op定义的参数顺序一致
    report = generate_report(
        greeting,      # -> greeting: str
        stats_sum,      # -> stats_sum: int
        stats_avg,      # -> stats_avg: float
        processed,      # -> processed_data: Dict
        task_result,    # -> task_result: str
        valid_data,     # -> valid_data: List
        invalid_count,  # -> invalid_count: int
    )


# ================================================================================
# 简单线性Job
# ================================================================================

@job
def simple_linear_job():
    """
    简单的线性Job,演示基本的Op链式调用。

    执行流程:
    generate_data.alias("data_generator")() -> validate_data.alias("data_validator")(data)

    alias()创建Op的命名副本,这样同一个Op类型可以在Job中使用多次。
    """
    # generate_data别名,使用别名后是一个独立的Op实例
    data = generate_data.alias("data_generator")()

    # validate_data别名,依赖data的输出
    return validate_data.alias("data_validator")(data)


# ================================================================================
# 可配置Job
# ================================================================================

@job
def configurable_job():
    """
    可配置的Job,展示如何使用别名创建带不同配置的Op实例。
    """
    # 别名后可以有不同的行为(目前共享相同配置)
    greeting = greet_person.alias("custom_greeting")()
    long_task = simulate_long_task.alias("custom_task")()

    # 返回多个值
    return greeting, long_task


# ================================================================================
# 导出定义
# ================================================================================

"""
__all__是一个特殊变量,定义了这个模块被"from module import *"时
哪些符号会被导出。这是一种明确公开API的方式。

但在实际使用Dagster时,通常不需要关心__all__,
因为我们直接通过文件名导入:
    from dagster_tutorial.chapter_01_basics.basic_examples import greet_person
"""

__all__ = [
    "greet_person",
    "calculate_stats",
    "process_data",
    "simulate_long_task",
    "generate_data",
    "validate_data",
    "generate_report",
    "comprehensive_example_job",
    "simple_linear_job",
    "configurable_job",
]


# ================================================================================
# 运行说明
# ================================================================================

"""
【如何运行本文件中的示例】

方法1:使用Dagster命令执行Job
------------------------------------------
# 在项目根目录下运行
dagster job execute -f dagster_tutorial/chapter_01_basics/basic_examples.py -j comprehensive_example_job

参数说明:
- -f: 指定包含Job定义的文件
- -j: 指定要执行的Job名称

方法2:使用Python直接执行(如果文件有if __name__ == "__main__"块)
------------------------------------------
python dagster_tutorial/chapter_01_basics/basic_examples.py

方法3:在Dagster UI中查看和执行
------------------------------------------
# 启动Dagster Web服务器
dagster dev -f dagster_tutorial/chapter_01_basics/basic_examples.py

然后在浏览器中打开 http://localhost:3000 查看和执行Job


【配置Op执行】

如果你想修改Op的配置,可以在执行时提供配置文件:

# config.yaml
ops:
  greet_person:
    config:
      name: "Dagster Student"
      repeat: 3

# 执行时指定配置
dagster job execute -f ... -c config.yaml -j comprehensive_example_job
"""

defs = Definitions(
    jobs=[
        comprehensive_example_job,
        configurable_job,
        simple_linear_job,
    ]
)

# dagster dev -f basic_examples.py
"""

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐