Airflow DAG 调度配置:那些年我们一起踩过的坑
Airflow DAG 调度配置:那些年我们一起踩过的坑
那天,我正在为一个大型的数据管道项目配置 Airflow 的调度任务。一切看起来都很顺利,DAG 文件也早已调试完毕,满怀信心地将它们部署到生产环境。然而,几个小时后,报警短信铺天盖地而来,原本应该每小时执行一次的 ETL 任务,竟然一个小时内执行了 10 次!那一刻,我的心情跌到了谷底。为什么会出现这种状况?我赶紧打开 Airflow 的 UI 界面,查看调度日志,才发现是自己在调度配置上犯了一个低级错误。今天,我将结合这次经历,详细解析 Airflow DAG 的调度配置,帮助你避免类似的错误。
调度配置的基本概念
在 Airflow 中,DAG(Directed Acyclic Graph,有向无环图)是用来定义任务工作流的文件。每个 DAG 文件中,我们可以通过配置 schedule_interval 来设定任务的调度频率。然而,schedule_interval 并不是唯一影响调度的因素,start_date、end_date、catchup 等参数也会对任务的执行产生重要影响。
坑 1:schedule_interval 的坑
schedule_interval 是定义 DAG 调度频率的关键参数,支持使用 Cron 表达式、timedelta 对象或 None。当时,我的任务配置如下:
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
# 设置 DAG
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval=timedelta(hours=1),
description='A simple ETL pipeline',
)
# 定义任务
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
这个配置看起来没有任何问题,但问题就出在 days_ago(1) 上。days_ago(1) 表示从今天往前推一天的时间。这意味着,如果今天是 2023-10-01,那么 start_date 会被设置为 2023-09-30。然而,如果你在 2023-10-01 的 12:00 PM 部署这个 DAG,那么 start_date 会是 2023-09-30 的 12:00 PM。Airflow 会立即执行所有从 2023-09-30 12:00 PM 到 2023-10-01 12:00 PM 之间的任务,这就是为什么我的任务在一个小时内执行了 10 次。
为了避免这种情况,我们可以在 start_date 上添加一个具体的时间点,比如 datetime(2023, 10, 1, 1, 0),这样就可以确保任务从今天早上 1 点开始执行:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 1, 0), # 具体到某个时间点
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval=timedelta(hours=1),
description='A simple ETL pipeline',
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
坑 2:catchup 的坑
catchup 参数用于控制 Airflow 是否执行过期的任务。默认情况下,catchup 是 True,这意味着如果你的 start_date 配置得很早,而 DAG 今天才部署,Airflow 会立即执行所有过期的任务。这在某些场景下是非常有用的,但也有时候会带来灾难性的后果。
为了避免过期任务的执行,我们可以将 catchup 设置为 False。这样,Airflow 只会执行从当前时间点开始的调度任务:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 1, 0),
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval=timedelta(hours=1),
description='A simple ETL pipeline',
catchup=False, # 关闭补跑
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
坑 3:cron 表达式的坑
Cron 表达式是一种非常灵活的调度方式,但也是最容易出错的部分。我曾经在一个项目中配置了一个每 5 分钟执行一次的任务,结果任务一直没有执行。检查了 Cron 表达式后,才发现在转换为 UTC 时间时,时间计算出现了问题。
假设你希望每 5 分钟执行一次任务,可以使用 */5 * * * * 这样的 Cron 表达式。但如果你的工作区设置为 UTC+8,那么在 Airflow 中,这个表达式会被转换为 UTC 时间。因此,你需要确保 Cron 表达式的计算结果在 UTC 时间下也是正确的:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 1, 0),
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval='*/5 * * * *', # 每 5 分钟执行一次
description='A simple ETL pipeline',
catchup=False,
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
如果你对 Cron 表达式不熟悉,可以使用在线工具 Hey Cron 来生成准确的表达式。只需输入中文描述,如“每 5 分钟”,工具会自动生成对应的 Cron 表达式,非常方便。
坑 4:max_active_runs 的坑
max_active_runs 参数用于限制同时运行的 DAG 实例数量。这个参数非常有用,可以防止在高并发场景下,DAG 实例数量过多导致资源耗尽。然而,如果你设置得太低,可能会导致任务积压,影响整个数据管道的效率。
在某个项目中,我将 max_active_runs 设置为 1,结果发现任务执行非常缓慢,因为每次只能有一个实例在运行,其他实例需要等待。解决这个问题的方法是根据实际需求,适当调整 max_active_runs 的值:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 1, 0),
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval='*/5 * * * *',
description='A simple ETL pipeline',
catchup=False,
max_active_runs=3, # 允许同时运行 3 个实例
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
坑 5:depends_on_past 和 wait_for_downstream 的坑
depends_on_past 参数用于控制当前任务实例是否依赖于上一个实例的成功。如果设置为 True,当前实例只有在上一个实例成功后才会执行。这在某些场景下非常有用,但也有时候会导致任务长时间卡住。
wait_for_downstream 参数用于控制当前任务实例是否等待下游任务的上一个实例完成。如果设置为 True,当前实例会等待下游任务的上一个实例成功后才会开始执行。这两个参数可以单独或组合使用,但一定要谨慎,否则可能会导致任务调度出现意想不到的情况。
举个例子,假设你希望一个任务只有在前一天的同时间点成功后才会执行,可以这样配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 1, 0),
'depends_on_past': True, # 依赖于上一个实例
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval='0 1 * * *', # 每天 1 点执行
description='A simple ETL pipeline',
catchup=False,
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
坑 6:retry 机制的坑
retries 和 retry_delay 参数用于控制任务失败后的重试机制。retries 表示最大重试次数,retry_delay 表示每次重试之间的间隔时间。如果你没有正确配置这些参数,可能会导致任务在失败时反复重试,浪费大量资源。
举个例子,假设你希望一个任务在失败后最多重试 3 次,每次重试间隔 5 分钟,可以这样配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 1, 0),
'retries': 3, # 最多重试 3 次
'retry_delay': timedelta(minutes=5), # 每次重试间隔 5 分钟
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval='*/5 * * * *',
description='A simple ETL pipeline',
catchup=False,
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
坑 7:timezone 的坑
Airflow 的调度时间是基于 UTC 时间的。如果你的工作区使用的是其他时区,一定要注意时间转换的问题。否则,你可能会遇到任务调度时间和预期不符的情况。
假设你的工作区使用的是 UTC+8,而你希望任务每天早上 8 点执行,你需要这样配置:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
import pendulum
# 设置时区
local_tz = pendulum.timezone('Asia/Shanghai')
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1, 8, 0, tzinfo=local_tz), # 设置时区为 UTC+8
}
dag = DAG(
'my_etl_pipeline',
default_args=default_args,
schedule_interval='0 8 * * *', # 每天早上 8 点执行
description='A simple ETL pipeline',
catchup=False,
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
实用工具推荐:Hey Cron
在配置 Airflow DAG 时,Cron 表达式是最常用的调度方式之一。但手动编写 Cron 表达式有时会非常麻烦,特别是当你需要考虑复杂的调度需求时。推荐一个非常实用的在线工具 Hey Cron,它可以帮助你快速生成准确的 Cron 表达式。你只需要输入中文描述,如“每 5 分钟”、“每天早上 8 点”等,工具会自动生成对应的 Cron 表达式,非常方便。
此外,Hey Cron 还提供了其他一些非常实用的功能,如正则表达式生成器、中英互译、JSON 格式化、Base64 编码解码、时间戳转换和 JWT 解析。这些都是我在日常开发中经常用到的功能,强烈推荐大家试试。
结语
调度配置是 Airflow 中非常重要的一个环节,稍有不慎就可能引发一系列问题。通过上述几个具体的踩坑经历,希望大家能够对 Airflow 的调度配置有更深入的理解,避免类似的错误。如果你在配置过程中遇到任何问题,不妨利用 Hey Cron 这样的工具来辅助你,事半功倍。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)