DolphinScheduler 教程

(一)入门指南

简介

关于Dolphin

Apache DolphinScheduler是一个分布式易扩展的可视化DAG工作流任务调度开源系统。解决数据研发ETL 错综复杂的依赖关系,不能直观监控任务健康状态等问题。DolphinScheduler以DAG流式的方式将Task组装起来,可实时监控任务的运行状态,同时支持重试、从指定节点恢复失败、暂停及Kill任务等操作

简单易用

DAG监控界面,所有流程定义都是可视化,通过拖拽任务定制DAG,通过API方式与第三方系统对接, 一键部署

高可靠性

去中心化的多Master和多Worker, 自身支持HA功能, 采用任务队列来避免过载,不会造成机器卡死

丰富的使用场景

支持暂停恢复操作.支持多租户,更好的应对大数据的使用场景. 支持更多的任务类型,如 spark, hive, mr, python, sub_process, shell

高扩展性

支持自定义任务类型,调度器使用分布式调度,调度能力随集群线性增长,Master和Worker支持动态上下线

软硬件环境建议配置

DolphinScheduler 作为一款开源分布式工作流任务调度系统,可以很好地部署和运行在 Intel 架构服务器及主流虚拟化环境下,并支持主流的Linux操作系统环境

1. Linux 操作系统版本要求

操作系统版本
Red Hat Enterprise Linux7.0 及以上
CentOS7.0 及以上
Oracle Enterprise Linux7.0 及以上
Ubuntu LTS16.04 及以上

注意: 以上 Linux 操作系统可运行在物理服务器以及 VMware、KVM、XEN 主流虚拟化环境上

2. 服务器建议配置

DolphinScheduler 支持运行在 Intel x86-64 架构的 64 位通用硬件服务器平台。对生产环境的服务器硬件配置有以下建议:

生产环境
CPU内存硬盘类型网络实例数量
4核+8 GB+SAS千兆网卡1+

注意:

  • 以上建议配置为部署 DolphinScheduler 的最低配置,生产环境强烈推荐使用更高的配置
  • 硬盘大小配置建议 50GB+ ,系统盘和数据盘分开

3. 网络要求

DolphinScheduler正常运行提供如下的网络端口配置:

组件默认端口说明
MasterServer5678非通信端口,只需本机端口不冲突即可
WorkerServer1234非通信端口,只需本机端口不冲突即可
ApiApplicationServer12345提供后端通信端口

注意:

  • MasterServer 和 WorkerServer 不需要开启网络间通信,只需本机端口不冲突即可
  • 管理员可根据实际环境中 DolphinScheduler 组件部署方案,在网络侧和主机侧开放相关端口

4. 客户端 Web 浏览器要求

DolphinScheduler 推荐 Chrome 以及使用 Chromium 内核的较新版本浏览器访问前端可视化操作界面

名词解释

在对Apache DolphinScheduler了解之前,我们先来认识一下调度系统常用的名词

1.名词解释

DAG: 全称Directed Acyclic Graph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来,从入度为零的节点进行拓扑遍历,直到无后继节点为止。举例如下图:

dag示例

dag示例

流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG

流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例

任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态

任务类型:目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖)、,同时计划支持动态插件扩展,注意:其中子 SUB_PROCESS 也是一个单独的流程定义,是可以单独启动执行的

调度方式:系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。 其中 恢复被容错的工作流恢复等待线程 两种命令类型是由调度内部控制使用,外部无法调用

定时调度:系统采用 quartz 分布式调度器,并同时支持cron表达式可视化的生成

依赖:系统不单单支持 DAG 简单的前驱和后继节点之间的依赖,同时还提供任务依赖节点,支持流程间的自定义任务依赖

优先级 :支持流程实例和任务实例的优先级,如果流程实例和任务实例的优先级不设置,则默认是先进先出

邮件告警:支持 SQL任务 查询结果邮件发送,流程实例运行结果邮件告警及容错告警通知

失败策略:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,继续是指不管并行运行任务的状态,直到流程失败结束。结束是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束

补数:补历史数据,支持区间并行和串行两种补数方式

2.模块介绍

  • dolphinscheduler-alert 告警模块,提供 AlertServer 服务。
  • dolphinscheduler-api web应用模块,提供 ApiServer 服务。
  • dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类
  • dolphinscheduler-dao 提供数据库访问等操作。
  • dolphinscheduler-remote 基于 netty 的客户端、服务端
  • dolphinscheduler-server MasterServer 和 WorkerServer 服务
  • dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
  • dolphinscheduler-ui 前端模块

快速上手

  • 喜欢看视频的伙伴可以参见手把手教你如何《快速上手 Apache DolphinScheduler 教程》 B站教程

  • 管理员用户登录

    地址:http://192.168.xx.xx:12345/dolphinscheduler 用户名密码:admin/dolphinscheduler123

img

  • 创建队列

img

  • 创建租户

img

  • 创建普通用户

img

  • 创建告警组

img

  • 创建Worker分组

img

  • 创建环境

img

  • 创建token令牌

img

  • 使用普通用户登录

点击右上角用户名“退出”,重新使用普通用户登录。

  • 项目管理->创建项目->点击项目名称

img

  • 点击工作流定义->创建工作流定义->上线工作流定义

img

  • 运行工作流定义->点击工作流实例->点击工作流实例名称->双击任务节点->查看任务执行日志

img

部署指南

Standalone极速体验版

Standalone 仅适用于 DolphinScheduler 的快速体验.

如果你是新手,想要体验 DolphinScheduler 的功能,推荐使用Standalone方式体检。如果你想体验更完整的功能,或者更大的任务量,推荐使用伪集群部署。如果你是在生产中使用,推荐使用集群部署或者kubernetes

*注意:* Standalone仅建议20个以下工作流使用,因为其采用 H2 Database, Zookeeper Testing Server,任务过多可能导致不稳定

前置准备工作

  • JDK:下载JDK (1.8+),并将 JAVA_HOME 配置到以及 PATH 变量中。如果你的环境中已存在,可以跳过这步。
  • 二进制包:在下载页面下载 DolphinScheduler 二进制包

启动 DolphinScheduler Standalone Server

解压并启动 DolphinScheduler

二进制压缩包中有 standalone 启动的脚本,解压后即可快速启动。切换到有sudo权限的用户,运行脚本

# 解压并运行 Standalone Server
tar -xvzf apache-dolphinscheduler-*-bin.tar.gz
cd apache-dolphinscheduler-*-bin
sh ./bin/dolphinscheduler-daemon.sh start standalone-server

登录 DolphinScheduler

浏览器访问地址 http://localhost:12345/dolphinscheduler 即可登录系统UI。默认的用户名和密码是 admin/dolphinscheduler123

启停服务

脚本 ./bin/dolphinscheduler-daemon.sh 除了可以快捷启动 standalone 外,还能停止服务运行,全部命令如下

# 启动 Standalone Server 服务
sh ./bin/dolphinscheduler-daemon.sh start standalone-server
# 停止 Standalone Server 服务
sh ./bin/dolphinscheduler-daemon.sh stop standalone-server

伪集群部署

伪集群部署目的是在单台机器部署 DolphinScheduler 服务,该模式下master、worker、api server、logger server都在同一台机器上

如果你是新手,想要体验 DolphinScheduler 的功能,推荐使用Standalone方式体检。如果你想体验更完整的功能,或者更大的任务量,推荐使用伪集群部署。如果你是在生产中使用,推荐使用集群部署或者kubernetes

前置准备工作

伪分布式部署 DolphinScheduler 需要有外部软件的支持

  • JDK:下载JDK (1.8+),并将 JAVA_HOME 配置到以及 PATH 变量中。如果你的环境中已存在,可以跳过这步。
  • 二进制包:在下载页面下载 DolphinScheduler 二进制包
  • 数据库:PostgreSQL (8.2.15+) 或者 MySQL (5.7+),两者任选其一即可,如 MySQL 则需要 JDBC Driver 8.0.16
  • 注册中心:ZooKeeper (3.4.6+),下载地址
  • 进程树分析
    • macOS安装pstree
    • Fedora/Red/Hat/CentOS/Ubuntu/Debian安装psmisc

*注意:* DolphinScheduler 本身不依赖 Hadoop、Hive、Spark,但如果你运行的任务需要依赖他们,就需要有对应的环境支持

准备 DolphinScheduler 启动环境

配置用户免密及权限

创建部署用户,并且一定要配置 sudo 免密。以创建 dolphinscheduler 用户为例

# 创建用户需使用 root 登录
useradd dolphinscheduler

# 添加密码
echo "dolphinscheduler" | passwd --stdin dolphinscheduler

# 配置 sudo 免密
sed -i '$adolphinscheduler  ALL=(ALL)  NOPASSWD: NOPASSWD: ALL' /etc/sudoers
sed -i 's/Defaults    requirett/#Defaults    requirett/g' /etc/sudoers

# 修改目录权限,使得部署用户对 dolphinscheduler-bin 目录有操作权限
chown -R dolphinscheduler:dolphinscheduler dolphinscheduler-bin

*注意:*

  • 因为任务执行服务是以 sudo -u {linux-user} 切换不同 linux 用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。初学习者不理解的话,完全可以暂时忽略这一点
  • 如果发现 /etc/sudoers 文件中有 “Defaults requirett” 这行,也请注释掉

配置机器SSH免密登陆

由于安装的时候需要向不同机器发送资源,所以要求各台机器间能实现SSH免密登陆。配置免密登陆的步骤如下

su dolphinscheduler

ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys

*注意:* 配置完成后,可以通过运行命令 ssh localhost 判断是否成功,如果不需要输入密码就能ssh登陆则证明成功

启动zookeeper

进入 zookeeper 的安装目录,将 zoo_sample.cfg 配置文件复制到 conf/zoo.cfg,并将 conf/zoo.cfg 中 dataDir 中的值改成 dataDir=./tmp/zookeeper

# 启动 zookeeper
./bin/zkServer.sh start

初始化数据库

DolphinScheduler 元数据存储在关系型数据库中,目前支持 PostgreSQL 和 MySQL,如果使用 MySQL 则需要手动下载 mysql-connector-java 驱动 (5.1.47+) 并移动到 DolphinScheduler 的 lib目录下。下面以 MySQL 为例,说明如何初始化数据库

mysql -uroot -p

mysql> CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;

# 修改 {user} 和 {password} 为你希望的用户名和密码
mysql> GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'%' IDENTIFIED BY '{password}';
mysql> GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'localhost' IDENTIFIED BY '{password}';

mysql> flush privileges;

运行对应数据库的最新定义文件,位置在 dolphinscheduler/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_*.sql 。例如你是 MySQL ,运行 dolphinscheduler/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql ,是 PostgreSQL 则运行 dolphinscheduler/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql

*NOTICE:* 最新版本目前通过运行命令 sh script/create-dolphinscheduler.sh 初始化数据库的方式暂不可用,我们创建了一个issue-6597 去追踪并修复这个问题。

修改相关配置

完成了基础环境的准备后,在运行部署命令前,还需要根据环境修改配置文件。配置文件在路径在conf/config/install_config.conf下,一般部署只需要修改INSTALL MACHINE、DolphinScheduler ENV、Database、Registry Server部分即可完成部署,下面对必须修改参数进行说明

# ---------------------------------------------------------
# INSTALL MACHINE
# ---------------------------------------------------------
# 因为是在单节点上部署master、worker、API server,所以服务器的IP均为机器IP或者localhost
ips="localhost"
masters="localhost"
workers="localhost:default"
alertServer="localhost"
apiServers="localhost"

# DolphinScheduler安装路径,如果不存在会创建
installPath="~/dolphinscheduler"

# 部署用户,填写在 **配置用户免密及权限** 中创建的用户
deployUser="dolphinscheduler"

# ---------------------------------------------------------
# DolphinScheduler ENV
# ---------------------------------------------------------
# JAVA_HOME 的路径,是在 **前置准备工作** 安装的JDK中 JAVA_HOME 所在的位置
javaHome="/your/java/home/here"

# ---------------------------------------------------------
# Database
# ---------------------------------------------------------
# 数据库的类型,用户名,密码,IP,端口,元数据库db。其中dbtype目前支持 mysql 和 postgresql
dbtype="mysql"
dbhost="localhost:3306"
# 如果你不是以 dolphinscheduler/dolphinscheduler 作为用户名和密码的,需要进行修改
username="dolphinscheduler"
password="dolphinscheduler"
dbname="dolphinscheduler"

# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
# 注册中心地址,zookeeper服务的地址
registryServers="localhost:2181"

启动 DolphinScheduler

使用部署用户运行一下命令完成部署,部署后的运行日志将存放在 logs 文件夹内

sh install.sh

*注意:* 第一次部署的话,可能出现 5 次sh: bin/dolphinscheduler-daemon.sh: No such file or directory相关信息,次为非重要信息直接忽略即可

登录 DolphinScheduler

浏览器访问地址 http://localhost:12345/dolphinscheduler 即可登录系统UI。默认的用户名和密码是 admin/dolphinscheduler123

启停服务

# 一键停止集群所有服务
sh ./bin/stop-all.sh

# 一键开启集群所有服务
sh ./bin/start-all.sh

# 启停 Master
sh ./bin/dolphinscheduler-daemon.sh stop master-server
sh ./bin/dolphinscheduler-daemon.sh start master-server

# 启停 Worker
sh ./bin/dolphinscheduler-daemon.sh start worker-server
sh ./bin/dolphinscheduler-daemon.sh stop worker-server

# 启停 Api
sh ./bin/dolphinscheduler-daemon.sh start api-server
sh ./bin/dolphinscheduler-daemon.sh stop api-server

# 启停 Logger
sh ./bin/dolphinscheduler-daemon.sh start logger-server
sh ./bin/dolphinscheduler-daemon.sh stop logger-server

# 启停 Alert
sh ./bin/dolphinscheduler-daemon.sh start alert-server
sh ./bin/dolphinscheduler-daemon.sh stop alert-server

集群部署(Cluster)

集群部署目的是在多台机器部署 DolphinScheduler 服务,用于运行大量任务情况。

如果你是新手,想要体验 DolphinScheduler 的功能,推荐使用Standalone方式体检。如果你想体验更完整的功能,或者更大的任务量,推荐使用伪集群部署。如果你是在生产中使用,推荐使用集群部署或者kubernetes

部署步骤

集群部署(Cluster)使用的脚本和配置文件与伪集群部署中的配置一样,所以所需要的步骤也与伪集群部署大致一样。区别就是伪集群部署针对的是一台机器,而集群部署(Cluster)需要针对多台机器,且两者“修改相关配置”步骤区别较大

前置准备工作 && 准备 DolphinScheduler 启动环境

其中除了伪集群部署中的“前置准备工作”,“准备启动环境”除了“启动zookeeper”以及“初始化数据库”外,别的都需要在每台机器中进行配置

修改相关配置

这个是与伪集群部署差异较大的一步,因为部署脚本会通过 scp 的方式将安装需要的资源传输到各个机器上,所以这一步我们仅需要修改运行install.sh脚本的所在机器的配置即可。配置文件在路径在conf/config/install_config.conf下,此处我们仅需修改INSTALL MACHINEDolphinScheduler ENV、Database、Registry Server伪集群部署保持一致,下面对必须修改参数进行说明

# ---------------------------------------------------------
# INSTALL MACHINE
# ---------------------------------------------------------
# 需要配置master、worker、API server,所在服务器的IP均为机器IP或者localhost
# 如果是配置hostname的话,需要保证机器间可以通过hostname相互链接
# 如下图所示,部署 DolphinScheduler 机器的 hostname 为 ds1,ds2,ds3,ds4,ds5,其中 ds1,ds2 安装 master 服务,ds3,ds4,ds5安装 worker 服务,alert server安装在ds4中,api server 安装在ds5中
ips="ds1,ds2,ds3,ds4,ds5"
masters="ds1,ds2"
workers="ds3:default,ds4:default,ds5:default"
alertServer="ds4"
apiServers="ds5"

启动 DolphinScheduler && 登录 DolphinScheduler && 启停服务

与伪集群部署保持一致

功能介绍

首页

首页包含用户所有项目的任务状态统计、流程状态统计、工作流定义统计。

img

项目管理

创建项目

点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。

img

项目首页

在项目管理页面点击项目名称链接,进入项目首页,如下图所示,项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。这几个指标的说明如下

  • 任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数
  • 流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数
  • 工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义

img

任务类型

Shell节点

shell节点,在worker执行的时候,会生成一个临时shell脚本,使用租户同名的linux用户执行这个脚本。

  • 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。

  • 工具栏中拖动[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-04KjxJw7-1683205953746)(https://dolphinscheduler.apache.org/img/shell.png)]到画板中,如下图所示:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SAkHpS8X-1683205953749)(https://dolphinscheduler.apache.org/img/shell_dag.png)]

  • 节点名称:一个工作流定义中的节点名称是唯一的。

  • 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。

  • 描述信息:描述该节点的功能。

  • 任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。

  • Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。

  • 失败重试次数:任务失败重新提交的次数,支持下拉和手填。

  • 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。

  • 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.

  • 脚本:用户开发的SHELL程序。

  • 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。

  • 自定义参数:是SHELL局部的用户自定义参数,会替换脚本中以${变量}的内容。

子流程节点

  • 子流程节点,就是把外部的某个工作流定义当做一个任务节点去执行。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 节点名称:一个工作流定义中的节点名称是唯一的
  • 运行标志:标识这个节点是否能正常调度
  • 描述信息:描述该节点的功能
  • 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
  • 子节点:是选择子流程的工作流定义,右上角进入该子节点可以跳转到所选子流程的工作流定义

依赖节点

  • 依赖节点,就是依赖检查节点。比如A流程依赖昨天的B流程执行成功,依赖节点会去检查B流程在昨天是否有执行成功的实例。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

依赖节点提供了逻辑判断功能,比如检查昨天的B流程是否成功,或者C流程是否执行成功。

img

例如,A流程为周报任务,B、C流程为天任务,A任务需要B、C任务在上周的每一天都执行成功,如图示:

img

假如,周报A同时还需要自身在上周二执行成功:

img

存储过程节点

  • 根据选择的数据源,执行存储过程。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
  • 方法:是存储过程的方法名称
  • 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型

SQL节点

  • 拖动工具栏中的PNG任务节点到画板中

  • 非查询SQL功能:编辑非查询SQL任务信息,sql类型选择非查询,如下图所示:

    img

  • 查询SQL功能:编辑查询SQL任务信息,sql类型选择查询,选择表格或附件形式发送邮件到指定的收件人,如下图所示。

img

  • 数据源:选择对应的数据源
  • sql类型:支持查询和非查询两种,查询是select类型的查询,是有结果集返回的,可以指定邮件通知为表格、附件或表格附件三种模板。非查询是没有结果集返回的,是针对update、delete、insert三种类型的操作。
  • sql参数:输入参数格式为key1=value1;key2=value2…
  • sql语句:SQL语句
  • UDF函数:对于HIVE类型的数据源,可以引用资源中心中创建的UDF函数,其他类型的数据源暂不支持UDF函数。
  • 自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。
  • 前置sql:前置sql在sql语句之前执行。
  • 后置sql:后置sql在sql语句之后执行。

SPARK节点

  • 通过SPARK节点,可以直接直接执行SPARK程序,对于spark节点,worker会使用spark-submit方式提交任务

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 程序类型:支持JAVA、Scala和Python三种语言
  • 主函数的class:是Spark程序的入口Main Class的全路径
  • 主jar包:是Spark的jar包
  • 部署方式:支持yarn-cluster、yarn-client和local三种模式
  • Driver内核数:可以设置Driver内核数及内存数
  • Executor数量:可以设置Executor数量、Executor内存数和Executor内核数
  • 命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
  • 其他参数:支持 --jars、–files、–archives、–conf格式
  • 资源:如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Spark则没有主函数的class,其他都是一样

MapReduce(MR)节点

  • 使用MR节点,可以直接执行MR程序。对于mr节点,worker会使用hadoop jar方式提交任务

拖动工具栏中的PNG任务节点到画板中,如下图所示:

JAVA程序

img

  • 主函数的class:是MR程序的入口Main Class的全路径
  • 程序类型:选择JAVA语言
  • 主jar包:是MR的jar包
  • 命令行参数:是设置MR程序的输入参数,支持自定义参数变量的替换
  • 其他参数:支持 –D、-files、-libjars、-archives格式
  • 资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
Python程序

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p1oexZeW-1683205971444)(null)]

  • 程序类型:选择Python语言
  • 主jar包:是运行MR的Python jar包
  • 其他参数:支持 –D、-mapper、-reducer、-input -output格式,这里可以设置用户自定义参数的输入,比如:
  • -mapper “mapper.py 1” -file mapper.py -reducer reducer.py -file reducer.py –input /journey/words.txt -output /journey/out/mr/${currentTimeMillis}
  • 其中 -mapper 后的 mapper.py 1是两个参数,第一个参数是mapper.py,第二个参数是1
  • 资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

Python节点

  • 使用python节点,可以直接执行python脚本,对于python节点,worker会使用python **方式提交任务。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 脚本:用户开发的Python程序
  • 环境名称:执行Python程序的解释器路径,指定运行脚本的解释器。当你需要使用 Python 虚拟环境 时,可以通过创建不同的环境名称来实现。
  • 资源:是指脚本中需要调用的资源文件列表
  • 自定义参数:是Python局部的用户自定义参数,会替换脚本中以${变量}的内容
  • 注意:若引入资源目录树下的python文件,需添加 __init__.py 文件

Flink节点

  • 拖动工具栏中的img任务节点到画板中,如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D8Zut6Pc-1683205976158)(null)]

  • 程序类型:支持JAVA、Scala和Python三种语言
  • 主函数的class:是Flink程序的入口Main Class的全路径
  • 主jar包:是Flink的jar包
  • 部署方式:支持cluster、local三种模式
  • slot数量:可以设置slot数
  • taskManage数量:可以设置taskManage数
  • jobManager内存数:可以设置jobManager内存数
  • taskManager内存数:可以设置taskManager内存数
  • 命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
  • 其他参数:支持 --jars、–files、–archives、–conf格式
  • 资源:如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是Flink局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Flink则没有主函数的class,其他都是一样

http节点

  • 拖动工具栏中的img任务节点到画板中,如下图所示:

img

  • 节点名称:一个工作流定义中的节点名称是唯一的。
  • 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
  • 描述信息:描述该节点的功能。
  • 任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
  • Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
  • 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
  • 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
  • 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
  • 请求地址:http请求URL。
  • 请求类型:支持GET、POSt、HEAD、PUT、DELETE。
  • 请求参数:支持Parameter、Body、Headers。
  • 校验条件:支持默认响应码、自定义响应码、内容包含、内容不包含。
  • 校验内容:当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容。
  • 自定义参数:是http局部的用户自定义参数,会替换脚本中以${变量}的内容。

DATAX节点

  • 拖动工具栏中的img任务节点到画板中

    img

  • 自定义模板:打开自定义模板开关时,可以自定义datax节点的json配置文件内容(适用于控件配置不满足需求时)

  • 数据源:选择抽取数据的数据源

  • sql语句:目标库抽取数据的sql语句,节点执行时自动解析sql查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换

  • 目标库:选择数据同步的目标库

  • 目标表:数据同步的目标表名

  • 前置sql:前置sql在sql语句之前执行(目标库执行)。

  • 后置sql:后置sql在sql语句之后执行(目标库执行)。

  • json:datax同步的json配置文件

  • 自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。

参数

内置参数

基础内置参数
变量名声明方式含义
system.biz.date${system.biz.date}日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1
system.biz.curdate${system.biz.curdate}日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1
system.datetime${system.datetime}日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1
衍生内置参数
  • 支持代码中自定义变量名,声明方式:${变量名}。可以是引用 “系统参数”

  • 我们定义这种基准变量为 [ . . . ] 格式的, [...] 格式的, [...]格式的,[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等

  • 也可以通过以下两种方式:

    1.使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月

    • 后 N 年:$[add_months(yyyyMMdd,12*N)]
    • 前 N 年:$[add_months(yyyyMMdd,-12*N)]
    • 后 N 月:$[add_months(yyyyMMdd,N)]
    • 前 N 月:$[add_months(yyyyMMdd,-N)]

    2.直接加减数字 在自定义格式后直接“+/-”数字

    • 后 N 周:$[yyyyMMdd+7*N]
    • 前 N 周:$[yyyyMMdd-7*N]
    • 后 N 天:$[yyyyMMdd+N]
    • 前 N 天:$[yyyyMMdd-N]
    • 后 N 小时:$[HHmmss+N/24]
    • 前 N 小时:$[HHmmss-N/24]
    • 后 N 分钟:$[HHmmss+N/24/60]
    • 前 N 分钟:$[HHmmss-N/24/60]

全局参数

作用域

在工作流定义页面配置的参数,作用于该工作流中全部的任务

使用方式

全局参数配置方式如下:在工作流定义页面,点击“设置全局”右边的加号,填写对应的变量名称和对应的值,保存即可

img

img

这里定义的global_bizdate参数可以被其它任一节点的局部参数引用,并设置global_bizdate的value为通过引用系统参数system.biz.date获得的值

本地参数

作用域

在任务定义页面配置的参数,默认作用域仅限该任务,如果配置了参数传递则可将该参数作用到下游任务中。

使用方式

本地参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,填写对应的变量名称和对应的值,保存即可

img

img

如果想要在本地参数中调用系统内置参数,将内置参数对应的值填到value中,如上图中的${biz_date}以及${curdate}

参数的引用

DolphinScheduler 提供参数间相互引用的能力,包括:本地参数引用全局参数、上下游参数传递。因为有引用的存在,就涉及当参数名相同时,参数的优先级问题,详见参数优先级

本地任务引用全局参数

本地任务引用全局参数的前提是,你已经定义了全局参数,使用方式和本地参数中的使用方式类似,但是参数的值需要配置成全局参数中的key

parameter-call-global-in-local

如上图中的${biz_date}以及${curdate},就是本地参数引用全局参数的例子。观察上图的最后一行,local_param_bizdate通过 g l o b a l b i z d a t e 来引用全局参数,在 s h e l l 脚本中可以通过 {global_bizdate}来引用全局参数,在shell脚本中可以通过 globalbizdate来引用全局参数,在shell脚本中可以通过{local_param_bizdate}来引全局变量 global_bizdate的值,或通过JDBC直接将local_param_bizdate的值set进去。同理,local_param通过{local_param}引用上一节中定义的全局参数。biz_date、biz_curdate、system.datetime都是用户自定义的参数,通过{全局参数}进行赋值。

上游任务传递给下游任务

DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支持上游单向传递给下游。目前支持这个特性的任务类型有:

当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。目前我们主要针对 SQL 和 SHELL 节点做了可以向下传递参数的功能。

SQL

prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。

如果 SQL 节点的结果只有一行,一个或多个字段,prop 的名字需要和字段名称一致。数据类型可选择为除 LIST 以外的其他类型。变量会选择 SQL 查询结果中的列名中与该变量名称相同的列对应的值。

如果 SQL 节点的结果为多行,一个或多个字段,prop 的名字需要和字段名称一致。数据类型选择为LIST。获取到 SQL 查询结果后会将对应列转化为 LIST,并将该结果转化为 JSON 后作为对应变量的值。

我们再以上图中包含 SQL 节点的流程举例说明:

上图中节点【createParam1】的定义如下:

image-20210723104957031

节点【createParam2】的定义如下:

image-20210723105026924

您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。

节点实例【createParam1】如下:

image-20210723105131381

这里当然 “id” 的值会等于 12.

我们再来看节点实例【createParam2】的情况。

image-20210723105255850

这里只有 “id” 的值。尽管用户定义的 sql 查到的是 “id” 和 “database_name” 两个字段,但是由于只定义了一个为 out 的变量 “id”,所以只会设置一个变量。由于显示的原因,这里已经替您查好了该 list 的长度为 10。

Shell

prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。

用户需要传递参数,在定义 shell 脚本时,需要输出格式为 ${setValue(key=value)} 的语句,key 为对应参数的 prop,value 为该参数的值。

例如下图中, 通过 echo '${setValue(trans=hello trans)}', 将’trans’设置为"hello trans", 在下游任务中就可以使用trans这个变量了:

trans-shell

shell 节点定义时当日志检测到 ${setValue(key=value1)} 的格式时,会将 value1 赋值给 key,下游节点便可以直接使用变量 key 的值。同样,您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6pUPPoeR-1683205971021)(null)]

参数优先级

DolphinScheduler 中所涉及的参数值的定义可能来自三种类型:

  • 全局参数:在工作流保存页面定义时定义的变量
  • 上游任务传递的参数:上游任务传递过来的参数
  • 本地参数:节点的自有变量,用户在“自定义参数”定义的变量,并且用户可以在工作流定义时定义该部分变量的值

因为参数的值存在多个来源,当参数名相同时,就需要会存在参数优先级的问题。DolphinScheduler 参数的优先级从高到低为:全局参数 > 上游任务传递的参数 > 本地参数

在上游任务传递的参数的情况下,由于上游可能存在多个任务向下游传递参数。当上游传递的参数名称相同时:

  • 下游节点会优先使用值为非空的参数
  • 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数

例子

下面例子向你展示如何使用任务参数传递的优先级问题

1:先以 shell 节点解释第一种情况

image-20210723102938239

节点 【useParam】可以使用到节点【createParam】中设置的变量。而节点 【useParam】与节点【noUseParam】中并没有依赖关系,所以并不会获取到节点【noUseParam】的变量。上图中只是以 shell 节点作为例子,其他类型节点具有相同的使用规则。

image-20210723103316896

其中节点【createParam】在使用变量时直接使用即可。另外该节点设置了 “key” 和 “key1” 两个变量,这里用户用定义了一个与上游节点传递的变量名相同的变量 key1,并且复制了值为 “12”,但是由于我们设置的优先级的关系,这里的值 “12” 会被抛弃,最终使用上游节点设置的变量值。

2:我们再以 sql 节点来解释另外一种情况

image-20210723103937052

节点【use_create】的定义如下:

image-20210723104411489

“status” 是当前节点设置的节点的自有变量。但是用户在保存时也同样设置了 “status” 变量,并且赋值为 -1。那在该 SQL 执行时,status 的值为优先级更高的 -1。抛弃了节点的自有变量的值。

这里的 “id” 是上游节点设置的变量,用户在节点【createParam1】、节点【createParam2】中设置了相同参数名 “id” 的参数。而节点【use_create】中使用了最先结束的【createParam1】的值。

数据源中心

数据源

数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源。

  • 点击“数据源中心->创建数据源”,根据需求创建不同类型的数据源。
  • 点击“测试连接”,测试数据源是否可以连接成功。

MySQL数据源

  • 数据源:选择MYSQL
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP主机名:输入连接MySQL的IP
  • 端口:输入连接MySQL的端口
  • 用户名:设置连接MySQL的用户名
  • 密码:设置连接MySQL的密码
  • 数据库名:输入连接MySQL的数据库名称
  • Jdbc连接参数:用于MySQL连接的参数设置,以JSON形式填写

img

POSTGRESQL数据源

  • 数据源:选择POSTGRESQL
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP/主机名:输入连接POSTGRESQL的IP
  • 端口:输入连接POSTGRESQL的端口
  • 用户名:设置连接POSTGRESQL的用户名
  • 密码:设置连接POSTGRESQL的密码
  • 数据库名:输入连接POSTGRESQL的数据库名称
  • Jdbc连接参数:用于POSTGRESQL连接的参数设置,以JSON形式填写

img

HIVE数据源

使用HiveServer2

img

  • 数据源:选择HIVE
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP/主机名:输入连接HIVE的IP
  • 端口:输入连接HIVE的端口
  • 用户名:设置连接HIVE的用户名
  • 密码:设置连接HIVE的密码
  • 数据库名:输入连接HIVE的数据库名称
  • Jdbc连接参数:用于HIVE连接的参数设置,以JSON形式填写
使用HiveServer2 HA Zookeeper

img

注意:如果开启了kerberos,则需要填写 Principal

img

Spark数据源

img

  • 数据源:选择Spark
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP/主机名:输入连接Spark的IP
  • 端口:输入连接Spark的端口
  • 用户名:设置连接Spark的用户名
  • 密码:设置连接Spark的密码
  • 数据库名:输入连接Spark的数据库名称
  • Jdbc连接参数:用于Spark连接的参数设置,以JSON形式填写

注意:如果开启了kerberos,则需要填写 Principal

img

告警

如何创建告警插件以及告警组

在2.0.0版本中,用户需要创建告警实例,然后同告警组进行关联,一个告警组可以使用多个告警实例,我们会逐一进行进行告警通知。

首先需要进入到安全中心,选择告警组管理,然后点击左侧的告警实例管理,然后创建一个告警实例,然后选择对应的告警插件,填写相关告警参数。

然后选择告警组管理,创建告警组,选择相应的告警实例即可。

img img img img

企业微信

如果您需要使用到企业微信进行告警,请在安装完成后,修改 alert.properties 文件,然后重启 alert 服务即可。企业微信的配置样例如下

# 设置企业微信告警功能是否开启:开启为 true,否则为 false
enterprise.wechat.enable="true"

# 设置 corpid,每个企业都拥有唯一的 corpid,获取此信息可在管理后台 “我的企业” - “企业信息” 下查看 “企业 ID”(需要有管理员权限)
enterprise.wechat.corp.id="xxx"

# 设置 secret,secret 是企业应用里面用于保障数据安全的 “钥匙”,每一个应用都有一个独立的访问密钥
enterprise.wechat.secret="xxx"

# 设置 agentid,每个应用都有唯一的 agentid。在管理后台 -> “应用与小程序” -> “应用”,点进某个应用,即可看到 agentid
enterprise.wechat.agent.id="xxxx"

# 设置 userid,多个用逗号分隔。每个成员都有唯一的 userid,即所谓 “帐号”。在管理后台 -> “通讯录” -> 点进某个成员的详情页,可以看到
enterprise.wechat.users=zhangsan,lisi

# 获取 access_token 的地址,使用如下例子无需修改
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}

# 发送应用消息地址,使用如下例子无需改动
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}

# 发送消息格式,无需改动
enterprise.wechat.user.send.msg={\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}

资源中心

如果需要用到资源上传功能,针对单机可以选择本地文件目录作为上传文件夹(此操作不需要部署 Hadoop)。当然也可以选择上传到 Hadoop or MinIO 集群上,此时则需要有Hadoop (2.6+) 或者 MinIO 等相关环境

*注意:*

  • 如果用到资源上传的功能,那么 安装部署中,部署用户需要有这部分的操作权限
  • 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的 core-site.xmlhdfs-site.xml 复制到 /opt/dolphinscheduler/conf,非 NameNode HA 跳过次步骤

hdfs资源配置

  • 上传资源文件和udf函数,所有上传的文件和资源都会被存储到hdfs上,所以需要以下配置项:
conf/common.properties  
    # Users who have permission to create directories under the HDFS root path
    hdfs.root.user=hdfs
    # data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
    resource.upload.path=/dolphinscheduler
    # resource storage type : HDFS,S3,NONE
    resource.storage.type=HDFS
    # whether kerberos starts
    hadoop.security.authentication.startup.state=false
    # java.security.krb5.conf path
    java.security.krb5.conf.path=/opt/krb5.conf
    # loginUserFromKeytab user
    login.user.keytab.username=hdfs-mycluster@ESZ.COM
    # loginUserFromKeytab path
    login.user.keytab.path=/opt/hdfs.headless.keytab    
    # if resource.storage.type is HDFS,and your Hadoop Cluster NameNode has HA enabled, you need to put core-site.xml and hdfs-site.xml in the installPath/conf directory. In this example, it is placed under /opt/soft/dolphinscheduler/conf, and configure the namenode cluster name; if the NameNode is not HA, modify it to a specific IP or host name.
    # if resource.storage.type is S3,write S3 address,HA,for example :s3a://dolphinscheduler,
    # Note,s3 be sure to create the root directory /dolphinscheduler
    fs.defaultFS=hdfs://mycluster:8020    
    #resourcemanager ha note this need ips , this empty if single
    yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx    
    # If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
    yarn.application.status.address=http://xxxx:8088/ws/v1/cluster/apps/%s

文件管理

是对各种资源文件的管理,包括创建基本的txt/log/sh/conf/py/java等文件、上传jar包等各种类型文件,可进行编辑、重命名、下载、删除等操作。

img

  • 创建文件

    文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties

    img

  • 上传文件

    上传文件:点击"上传文件"按钮进行上传,将文件拖拽到上传区域,文件名会自动以上传的文件名称补全

    img

  • 文件查看

    对可查看的文件类型,点击文件名称,可查看文件详情

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rUFXelKK-1683205976507)(null)]

  • 下载文件

    点击文件列表的"下载"按钮下载文件或者在文件详情中点击右上角"下载"按钮下载文件

  • 文件重命名

img

  • 删除

文件列表->点击"删除"按钮,删除指定文件

UDF管理

资源管理

资源管理和文件管理功能类似,不同之处是资源管理是上传的UDF函数,文件管理上传的是用户程序,脚本及配置文件 操作功能:重命名、下载、删除。

  • 上传udf资源

    和上传文件相同。

函数管理

  • 创建udf函数

    点击“创建UDF函数”,输入udf函数参数,选择udf资源,点击“提交”,创建udf函数。 目前只支持HIVE的临时UDF函数

  • UDF函数名称:输入UDF函数时的名称

  • 包名类名:输入UDF函数的全路径

  • UDF资源:设置创建的UDF对应的资源文件

img

监控中心

服务管理

  • 服务管理主要是对系统中的各个服务的健康状况和基本信息的监控和显示

master监控

  • 主要是master的相关信息。

img

worker监控

  • 主要是worker的相关信息。

img

Zookeeper监控

  • 主要是zookpeeper中各个worker和master的相关配置信息。

img

DB监控

  • 主要是DB的健康状况

img

统计管理

img

  • 待执行命令数:统计t_ds_command表的数据
  • 执行失败的命令数:统计t_ds_error_command表的数据
  • 待运行任务数:统计Zookeeper中task_queue的数据
  • 待杀死任务数:统计Zookeeper中task_kill的数据

安全中心(权限系统)

  • 安全中心只有管理员账户才有权限操作,分别有队列管理、租户管理、用户管理、告警组管理、worker分组管理、令牌管理等功能,在用户管理模块可以对资源、数据源、项目等授权
  • 管理员登录,默认用户名密码:admin/dolphinscheduler123

创建队列

  • 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。
  • 管理员进入安全中心->队列管理页面,点击“创建队列”按钮,创建队列。

img

添加租户

  • 租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,则会导致任务运行失败。你可以通过修改 worker.properties 配置文件中参数 worker.tenant.auto.create=true 实现当 linux 用户不存在时自动创建该用户。worker.tenant.auto.create=true 参数会要求 worker 可以免密运行 sudo 命令。
  • 租户编码:租户编码是Linux上的用户,唯一,不能重复
  • 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。

img

创建普通用户

  • 用户分为管理员用户普通用户
    • 管理员有授权和用户管理等权限,没有创建项目和工作流定义的操作的权限。
    • 普通用户可以创建项目和对工作流定义的创建,编辑,执行等操作。
    • 注意:如果该用户切换了租户,则该用户所在租户下所有资源将复制到切换的新租户下。
  • 进入安全中心->用户管理页面,点击“创建用户”按钮,创建用户。

img

编辑用户信息

  • 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息。
  • 普通用户登录后,点击用户名下拉框中的用户信息,进入用户信息页面,点击"编辑"按钮,编辑用户信息。

修改用户密码

  • 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息时,输入新密码修改用户密码。
  • 普通用户登录后,点击用户名下拉框中的用户信息,进入修改密码页面,输入密码并确认密码后点击"编辑"按钮,则修改密码成功。

创建告警组

  • 告警组是在启动时设置的参数,在流程结束以后会将流程的状态和其他信息以邮件形式发送给告警组。

  • 管理员进入安全中心->告警组管理页面,点击“创建告警组”按钮,创建告警组。

    img

令牌管理

由于后端接口有登录检查,令牌管理提供了一种可以通过调用接口的方式对系统进行各种操作。

  • 管理员进入安全中心->令牌管理页面,点击“创建令牌”按钮,选择失效时间与用户,点击"生成令牌"按钮,点击"提交"按钮,则选择用户的token创建成功。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQj8OzXt-1683205975817)(null)]

  • 普通用户登录后,点击用户名下拉框中的用户信息,进入令牌管理页面,选择失效时间,点击"生成令牌"按钮,点击"提交"按钮,则该用户创建token成功。

  • 调用示例:

    /**
     * test token
     */
    public  void doPOSTParam()throws Exception{
        // create HttpClient
        CloseableHttpClient httpclient = HttpClients.createDefault();

        // create http post request
        HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create");
        httpPost.setHeader("token", "123");
        // set parameters
        List<NameValuePair> parameters = new ArrayList<NameValuePair>();
        parameters.add(new BasicNameValuePair("projectName", "qzw"));
        parameters.add(new BasicNameValuePair("desc", "qzw"));
        UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
        httpPost.setEntity(formEntity);
        CloseableHttpResponse response = null;
        try {
            // execute
            response = httpclient.execute(httpPost);
            // response status code 200
            if (response.getStatusLine().getStatusCode() == 200) {
                String content = EntityUtils.toString(response.getEntity(), "UTF-8");
                System.out.println(content);
            }
        } finally {
            if (response != null) {
                response.close();
            }
            httpclient.close();
        }
    }

授予权限

  • 授予权限包括项目权限,资源权限,数据源权限,UDF函数权限。

  • 管理员可以对普通用户进行非其创建的项目、资源、数据源和UDF函数进行授权。因为项目、资源、数据源和UDF函数授权方式都是一样的,所以以项目授权为例介绍。

  • 注意:对于用户自己创建的项目,该用户拥有所有的权限。则项目列表和已选项目列表中不会显示。

  • 管理员进入安全中心->用户管理页面,点击需授权用户的“授权”按钮,如下图所示:

    img

  • 选择项目,进行项目授权。

img

  • 资源、数据源、UDF函数授权同项目授权。

Worker分组

每个worker节点都会归属于自己的Worker分组,默认分组为default.

在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务.

新增/更新 worker分组

  • 打开要设置分组的worker节点上的"conf/worker.properties"配置文件. 修改worker.groups参数.
  • worker.groups参数后面对应的为该worker节点对应的分组名称,默认为default.
  • 如果该worker节点对应多个分组,则以逗号隔开.
示例: 
worker.groups=default,test
  • 也可以在运行中修改worker所属的worker分组,如果修改成功,worker就会使用这个新建的分组,忽略worker.properties中的配置。修改步骤为"安全中心 -> worker分组管理 -> 点击 ‘新建worker分组’ -> 输入’组名称’ -> 选择已有worker -> 点击’提交’"

环境管理

  • 在线配置Worker运行环境,一个Worker可以指定多个环境,每个环境等价于dolphinscheduler_env.sh文件.
  • 默认环境为dolphinscheduler_env.sh文件.
  • 在任务执行时,可以将任务分配给指定worker分组,根据worker分组选择对应的环境,最终由该组中的worker节点执行环境后执行该任务.

创建/更新 环境

  • 环境配置等价于dolphinscheduler_env.sh文件内配置

    img

使用 环境

  • 在工作流定义中创建任务节点选择Worker分组和Worker分组对应的环境,任务执行时Worker会先执行环境在执行任务.

    img

API 调用

背景

一般都是通过页面来创建项目、流程等,但是与第三方系统集成就需要通过调用 API 来管理项目、流程

操作步骤

创建 token

  1. 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌

img

  1. 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”

img

使用 Token

  1. 打开 API文档页面

    地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

img

  1. 选一个测试的接口,本次测试选取的接口是:查询所有项目

    projects/query-project-list

  2. 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果

    token:刚刚生成的Token
    

img

创建项目

这里以创建名为 “wudl-flink-test” 的项目为例

img

img

img

返回 msg 信息为 “success”,说明我们已经成功通过 API 的方式创建了项目。

如果您对创建项目的源码感兴趣,欢迎继续阅读下面内容

附:创建项目源码

img

img

Flink调用

调用 flink 操作步骤

创建队列

  1. 登录调度系统,点击 “安全中心”,再点击左侧的 “队列管理”,点击 “队列管理” 创建队列
  2. 填写队列名称和队列值,然后点击 “提交”

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QOXUFbby-1683205974909)(null)]

创建租户

1.租户对应的是 linux 用户, 用户 worker 提交作业所使用的的用户, 如果 linux 没有这个用户, worker 会在执行脚本的时候创建这个用户
2.租户和租户编码都是唯一不能重复,好比一个人有名字有身份证号。
3.创建完租户会在 hdfs 对应的目录上有相关的文件夹。

img

创建用户

img

创建 Token

  1. 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌

img

  1. 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”

img

使用 Token

  1. 打开 API文档页面

    地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

img

  1. 选一个测试的接口,本次测试选取的接口是:查询所有项目

    projects/query-project-list

  2. 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果

    token: 刚刚生成的 Token
    

img

用户授权

img

用户登录

http://192.168.1.163:12345/dolphinscheduler/ui/#/monitor/servers/master

img

资源上传

img

创建工作流

img

img

img

img

查看执行结果

img

查看日志结果

img

(二)高级指南

系统架构设计

本章节介绍Apache DolphinScheduler调度系统架构

1.系统架构

1.1 系统架构图

系统架构图

系统架构图

1.2 启动流程活动图

Start process activity diagram

启动流程活动图

1.3 架构说明
  • MasterServer

    MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。

    该服务内主要包含:
    • Distributed Quartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作
    • MasterSchedulerService是一个扫描线程,定时扫描数据库中的 command 表,生成工作流实例,根据不同的命令类型进行不同的业务操作
    • WorkflowExecuteThread主要是负责DAG任务切分、任务提交、各种不同命令类型的逻辑处理,处理任务状态和工作流状态事件
    • EventExecuteService处理master负责的工作流实例所有的状态变化事件,使用线程池处理工作流的状态事件
    • StateWheelExecuteThread处理依赖任务和超时任务的定时状态更新
  • WorkerServer

    WorkerServer也采用分布式无中心设计理念,支持自定义任务插件,主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

    该服务包含:
    • WorkerManagerThread主要通过netty领取master发送过来的任务,并根据不同任务类型调用TaskExecuteThread对应执行器。
    • RetryReportTaskStatusThread主要通过netty向master汇报任务状态,如果汇报失败,会一直重试汇报
    • LoggerServer是一个日志服务,提供日志分片查看、刷新和下载等功能
  • Registry

    注册中心,使用插件化实现,默认支持Zookeeper, 系统中的MasterServer和WorkerServer节点通过注册中心来进行集群管理和容错。另外系统还基于注册中心进行事件监听和分布式锁。

  • Alert

    提供告警相关功能,仅支持单机服务。支持自定义告警插件。

  • API

    API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

  • UI

    系统的前端页面,提供系统的各种可视化操作界面,详见系统使用手册部分。

1.4 架构设计思想
一、去中心化vs中心化
中心化思想

中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:

master-slave角色

  • Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
  • Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。

中心化思想设计存在的问题:

  • 一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
  • 另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。
去中心化

去中心化

  • 在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。
  • 去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。
  • 实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go语言实现的Etcd。
  • DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,使用分片机制,公平分配工作流在master上执行,并通过不同的发送策略将任务发送给worker执行具体的任务
二、Master执行流程
  1. DolphinScheduler使用分片算法将command取模,根据master的排序id分配,master将拿到的command转换成工作流实例,使用线程池处理工作流实例
  2. DolphinScheduler对工作流的处理流程:
  • 通过UI或者API调用,启动工作流,持久化一条command到数据库中
  • Master通过分片算法,扫描Command表,生成工作流实例ProcessInstance,同时删除Command数据
  • Master使用线程池运行WorkflowExecuteThread,执行工作流实例的流程,包括构建DAG,创建任务实例TaskInstance,将TaskInstance通过netty发送给worker
  • Worker收到任务以后,修改任务状态,并将执行信息返回Master
  • Master收到任务信息,持久化到数据库,并且将状态变化事件存入EventExecuteService事件队列
  • EventExecuteService根据事件队列调用WorkflowExecuteThread进行后续任务的提交和工作流状态的修改
三、容错设计

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况

1. 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

DolphinScheduler容错设计

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。

  • Master容错流程图:

Master容错流程图

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

  • Worker容错流程图:

Worker容错流程图

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。

注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。

2.任务失败重试

这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

  • 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
  • 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行从当前节点开始执行
  • 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行

接下来说正题,我们将工作流中的任务节点分了两种类型。

  • 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
  • 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。

所有任务都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。

如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作

四、任务优先级设计

在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:

  • 按照

    不同流程实例优先级

    优先于

    同一个流程实例优先级

    优先于

    同一流程内任务优先级

    优先于

    同一流程内任务

    提交顺序依次从高到低进行任务处理。

    • 具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务

      • 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图

        流程优先级配置

      • 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图

        任务优先级配置

五、Logback和netty实现日志访问
  • 由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:
  • 将日志放到ES搜索引擎上
  • 通过netty通信获取远程日志信息
  • 介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。

grpc远程访问

  • 我们使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件。
  • FileAppender主要实现如下:
/**
 * task log appender
 */
public class TaskLogAppender extends FileAppender<ILoggingEvent> {

    ...

   @Override
   protected void append(ILoggingEvent event) {

       if (currentlyActiveFile == null){
           currentlyActiveFile = getFile();
       }
       String activeFile = currentlyActiveFile;
       // thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
       String threadName = event.getThreadName();
       String[] threadNameArr = threadName.split("-");
       // logId = processDefineId_processInstanceId_taskInstanceId
       String logId = threadNameArr[1];
       ...
       super.subAppend(event);
   }
}

以/流程定义id/流程实例id/任务实例id.log的形式生成日志

  • 过滤匹配以TaskLogInfo开始的线程名称:
  • TaskLogFilter实现如下:
/**
*  task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent> {

   @Override
   public FilterReply decide(ILoggingEvent event) {
       if (event.getThreadName().startsWith("TaskLogInfo-")){
           return FilterReply.ACCEPT;
       }
       return FilterReply.DENY;
   }
}

Dolphin Scheduler 2.0元数据文档

表概览

表名表信息
t_ds_access_token访问ds后端的token
t_ds_alert告警信息
t_ds_alertgroup告警组
t_ds_command执行命令
t_ds_datasource数据源
t_ds_error_command错误命令
t_ds_process_definition流程定义
t_ds_process_instance流程实例
t_ds_project项目
t_ds_queue队列
t_ds_relation_datasource_user用户关联数据源
t_ds_relation_process_instance子流程
t_ds_relation_project_user用户关联项目
t_ds_relation_resources_user用户关联资源
t_ds_relation_udfs_user用户关联UDF函数
t_ds_relation_user_alertgroup用户关联告警组
t_ds_resources资源文件
t_ds_schedules流程定时调度
t_ds_session用户登录的session
t_ds_task_instance任务实例
t_ds_tenant租户
t_ds_udfsUDF资源
t_ds_user用户
t_ds_versionds版本信息

用户 队列 数据源

image.png

  • 一个租户下可以有多个用户
  • t_ds_user中的queue字段存储的是队列表中的queue_name信息,t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列
  • t_ds_datasource表中的user_id字段表示创建该数据源的用户,t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户

项目 资源 告警

image.png

  • 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定
  • t_ds_projcet表中的user_id表示创建该项目的用户,t_ds_relation_project_user表中的user_id表示对项目有权限的用户
  • t_ds_resources表中的user_id表示创建该资源的用户,t_ds_relation_resources_user中的user_id表示对资源有权限的用户
  • t_ds_udfs表中的user_id表示创建该UDF的用户,t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户

命令 流程 任务

image.png
image.png

  • 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例
  • t_ds_schedulers表存放流程定义的定时调度信息
  • t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表

核心表Schema

t_ds_process_definition
字段类型注释
idint主键
namevarchar流程定义名称
versionint流程定义版本
release_statetinyint流程定义的发布状态:0 未上线 1已上线
project_idint项目id
user_idint流程定义所属用户id
process_definition_jsonlongtext流程定义json串
descriptiontext流程定义描述
global_paramstext全局参数
flagtinyint流程是否可用:0 不可用,1 可用
locationstext节点坐标信息
connectstext节点连线信息
receiverstext收件人
receivers_cctext抄送人
create_timedatetime创建时间
timeoutint超时时间
tenant_idint租户id
update_timedatetime更新时间
modify_byvarchar修改用户
resource_idsvarchar资源id集
t_ds_process_instance
字段类型注释
idint主键
namevarchar流程实例名称
process_definition_idint流程定义id
statetinyint流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
recoverytinyint流程实例容错标识:0 正常,1 需要被容错重启
start_timedatetime流程实例开始时间
end_timedatetime流程实例结束时间
run_timesint流程实例运行次数
hostvarchar流程实例所在的机器
command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
command_paramtext命令的参数(json格式)
task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
max_try_timestinyint最大重试次数
failure_strategytinyint失败策略 0 失败后结束,1 失败后继续
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组id
schedule_timedatetime预期运行时间
command_start_timedatetime开始命令时间
global_paramstext全局参数(固化流程定义的参数)
process_instance_jsonlongtext流程实例json(copy的流程定义的json)
flagtinyint是否可用,1 可用,0不可用
update_timetimestamp更新时间
is_sub_processint是否是子工作流 1 是,0 不是
executor_idint命令执行用户
locationstext节点坐标信息
connectstext节点连线信息
history_cmdtext历史命令,记录所有对流程实例的操作
dependence_schedule_timestext依赖节点的预估时间
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
timeoutint超时时间
tenant_idint租户id
t_ds_task_instance
字段类型注释
idint主键
namevarchar任务名称
task_typevarchar任务类型
process_definition_idint流程定义id
process_instance_idint流程实例id
task_jsonlongtext任务节点json
statetinyint任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
submit_timedatetime任务提交时间
start_timedatetime任务开始时间
end_timedatetime任务结束时间
hostvarchar执行任务的机器
execute_pathvarchar任务执行路径
log_pathvarchar任务日志路径
alert_flagtinyint是否告警
retry_timesint重试次数
pidint进程pid
app_linkvarcharyarn app id
flagtinyint是否可用:0 不可用,1 可用
retry_intervalint重试间隔
max_retry_timesint最大重试次数
task_instance_priorityint任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
t_ds_schedules
字段类型注释
idint主键
process_definition_idint流程定义id
start_timedatetime调度开始时间
end_timedatetime调度结束时间
crontabvarcharcrontab 表达式
failure_strategytinyint失败策略: 0 结束,1 继续
user_idint用户id
release_statetinyint状态:0 未上线,1 上线
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组id
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
create_timedatetime创建时间
update_timedatetime更新时间
t_ds_command
字段类型注释
idint主键
command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
process_definition_idint流程定义id
command_paramtext命令的参数(json格式)
task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
failure_strategytinyint失败策略:0结束,1继续
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组
schedule_timedatetime预期运行时间
start_timedatetime开始时间
executor_idint执行用户id
dependencevarchar依赖字段
update_timedatetime更新时间
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组

配置文件

前言

本文档为dolphinscheduler配置文件说明文档,针对版本为 dolphinscheduler-1.3.x 版本.

目录结构

目前dolphinscheduler 所有的配置文件都在 [conf ] 目录中. 为了更直观的了解[conf]目录所在的位置以及包含的配置文件,请查看下面dolphinscheduler安装目录的简化说明. 本文主要讲述dolphinscheduler的配置文件.其他部分先不做赘述.

[注:以下 dolphinscheduler 简称为DS.]

├─bin                               DS命令存放目录
│  ├─dolphinscheduler-daemon.sh         启动/关闭DS服务脚本
│  ├─start-all.sh                       根据配置文件启动所有DS服务
│  ├─stop-all.sh                        根据配置文件关闭所有DS服务
├─conf                              配置文件目录
│  ├─application-api.properties         api服务配置文件
│  ├─datasource.properties              数据库配置文件
│  ├─zookeeper.properties               zookeeper配置文件
│  ├─master.properties                  master服务配置文件
│  ├─worker.properties                  worker服务配置文件
│  ├─quartz.properties                  quartz服务配置文件
│  ├─common.properties                  公共服务[存储]配置文件
│  ├─alert.properties                   alert服务配置文件
│  ├─config                             环境变量配置文件夹
│      ├─install_config.conf                DS环境变量配置脚本[用于DS安装/启动]
│  ├─env                                运行脚本环境变量配置目录
│      ├─dolphinscheduler_env.sh            运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
│  ├─org                                mybatis mapper文件目录
│  ├─i18n                               i18n配置文件目录
│  ├─logback-api.xml                    api服务日志配置文件
│  ├─logback-master.xml                 master服务日志配置文件
│  ├─logback-worker.xml                 worker服务日志配置文件
│  ├─logback-alert.xml                  alert服务日志配置文件
├─sql                               DS的元数据创建升级sql文件
│  ├─create                             创建SQL脚本目录
│  ├─upgrade                            升级SQL脚本目录
│  ├─dolphinscheduler_postgre.sql       postgre数据库初始化脚本
│  ├─dolphinscheduler_mysql.sql         mysql数据库初始化脚本
│  ├─soft_version                       当前DS版本标识文件
├─script                            DS服务部署,数据库创建/升级脚本目录
│  ├─create-dolphinscheduler.sh         DS数据库初始化脚本      
│  ├─upgrade-dolphinscheduler.sh        DS数据库升级脚本                
│  ├─monitor-server.sh                  DS服务监控启动脚本               
│  ├─scp-hosts.sh                       安装文件传输脚本                                                    
│  ├─remove-zk-node.sh                  清理zookeeper缓存文件脚本       
├─ui                                前端WEB资源目录
├─lib                               DS依赖的jar存放目录
├─install.sh                        自动安装DS服务脚本

配置文件详解

序号服务分类配置文件
1启动/关闭DS服务脚本dolphinscheduler-daemon.sh
2数据库连接配置datasource.properties
3zookeeper连接配置zookeeper.properties
4公共[存储]配置common.properties
5API服务配置application-api.properties
6Master服务配置master.properties
7Worker服务配置worker.properties
8Alert 服务配置alert.properties
9Quartz配置quartz.properties
10DS环境变量配置脚本[用于DS安装/启动]install_config.conf
11运行脚本加载环境变量配置文件 [如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …]dolphinscheduler_env.sh
12各服务日志配置文件api服务日志配置文件 : logback-api.xml master服务日志配置文件 : logback-master.xml worker服务日志配置文件 : logback-worker.xml alert服务日志配置文件 : logback-alert.xml
1.dolphinscheduler-daemon.sh [启动/关闭DS服务脚本]

dolphinscheduler-daemon.sh脚本负责DS的启动&关闭. start-all.sh/stop-all.sh最终也是通过dolphinscheduler-daemon.sh对集群进行启动/关闭操作. 目前DS只是做了一个基本的设置,JVM参数请根据各自资源的实际情况自行设置.

默认简化参数如下:

export DOLPHINSCHEDULER_OPTS="
-server 
-Xmx16g 
-Xms1g 
-Xss512k 
-XX:+UseConcMarkSweepGC 
-XX:+CMSParallelRemarkEnabled 
-XX:+UseFastAccessorMethods 
-XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=70
"

不建议设置"-XX:DisableExplicitGC" , DS使用Netty进行通讯,设置该参数,可能会导致内存泄漏.

2.datasource.properties [数据库连接]

在DS中使用Druid对数据库连接进行管理,默认简化配置如下.

参数默认值描述
spring.datasource.driver-class-name数据库驱动
spring.datasource.url数据库连接地址
spring.datasource.username数据库用户名
spring.datasource.password数据库密码
spring.datasource.initialSize5初始连接池数量
spring.datasource.minIdle5最小连接池数量
spring.datasource.maxActive5最大连接池数量
spring.datasource.maxWait60000最大等待时长
spring.datasource.timeBetweenEvictionRunsMillis60000连接检测周期
spring.datasource.timeBetweenConnectErrorMillis60000重试间隔
spring.datasource.minEvictableIdleTimeMillis300000连接保持空闲而不被驱逐的最小时间
spring.datasource.validationQuerySELECT 1检测连接是否有效的sql
spring.datasource.validationQueryTimeout3检测连接是否有效的超时时间[seconds]
spring.datasource.testWhileIdletrue申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
spring.datasource.testOnBorrowtrue申请连接时执行validationQuery检测连接是否有效
spring.datasource.testOnReturnfalse归还连接时执行validationQuery检测连接是否有效
spring.datasource.defaultAutoCommittrue是否开启自动提交
spring.datasource.keepAlivetrue连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作。
spring.datasource.poolPreparedStatementstrue开启PSCache
spring.datasource.maxPoolPreparedStatementPerConnectionSize20要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。
3.zookeeper.properties [zookeeper连接配置]
参数默认值描述
zookeeper.quorumlocalhost:2181zk集群连接信息
zookeeper.dolphinscheduler.root/dolphinschedulerDS在zookeeper存储根目录
zookeeper.session.timeout60000session 超时
zookeeper.connection.timeout30000连接超时
zookeeper.retry.base.sleep100基本重试时间差
zookeeper.retry.max.sleep30000最大重试时间
zookeeper.retry.maxtime10最大重试次数
4.common.properties [hadoop、s3、yarn配置]

common.properties配置文件目前主要是配置hadoop/s3a相关的配置.

参数默认值描述
data.basedir.path/tmp/dolphinscheduler本地工作目录,用于存放临时文件
resource.storage.typeNONE资源文件存储类型: HDFS,S3,NONE
resource.upload.path/dolphinscheduler资源文件存储路径
hadoop.security.authentication.startup.statefalsehadoop是否开启kerberos权限
java.security.krb5.conf.path/opt/krb5.confkerberos配置目录
login.user.keytab.usernamehdfs-mycluster@ESZ.COMkerberos登录用户
login.user.keytab.path/opt/hdfs.headless.keytabkerberos登录用户keytab
kerberos.expire.time2kerberos过期时间,整数,单位为小时
resource.view.suffixstxt,log,sh,conf,cfg,py,java,sql,hql,xml,properties资源中心支持的文件格式
hdfs.root.userhdfs如果存储类型为HDFS,需要配置拥有对应操作权限的用户
fs.defaultFShdfs://mycluster:8020请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
fs.s3a.endpoints3 endpoint地址
fs.s3a.access.keys3 access key
fs.s3a.secret.keys3 secret key
yarn.resourcemanager.ha.rm.idsyarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.application.status.addresshttp://ds1:8088/ws/v1/cluster/apps/%s如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.pathenv/dolphinscheduler_env.sh运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …]
development.statefalse是否处于开发模式
5.application-api.properties [API服务配置]
参数默认值描述
server.port12345api服务通讯端口
server.servlet.session.timeout7200session超时时间
server.servlet.context-path/dolphinscheduler请求路径
spring.servlet.multipart.max-file-size1024MB最大上传文件大小
spring.servlet.multipart.max-request-size1024MB最大请求大小
server.jetty.max-http-post-size5000000jetty服务最大发送请求大小
spring.messages.encodingUTF-8请求编码
spring.jackson.time-zoneGMT+8设置时区
spring.messages.basenamei18n/messagesi18n配置
security.authentication.typePASSWORD权限校验类型
6.master.properties [Master服务配置]
参数默认值描述
master.listen.port5678master监听端口
master.exec.threads100master工作线程数量,用于限制并行的流程实例数量
master.exec.task.num20master每个流程实例的并行任务数量
master.dispatch.task.num3master每个批次的派发任务数量
master.host.selectorLowerWeightmaster host选择器,用于选择合适的worker执行任务,可选值: Random, RoundRobin, LowerWeight
master.heartbeat.interval10master心跳间隔,单位为秒
master.task.commit.retryTimes5任务重试次数
master.task.commit.interval1000任务提交间隔,单位为毫秒
master.max.cpuload.avg-1master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2
master.reserved.memory0.3master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G
7.worker.properties [Worker服务配置]
参数默认值描述
worker.listen.port1234worker监听端口
worker.exec.threads100worker工作线程数量,用于限制并行的任务实例数量
worker.heartbeat.interval10worker心跳间隔,单位为秒
worker.max.cpuload.avg-1worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2
worker.reserved.memory0.3worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G
worker.groupsdefaultworker分组配置,逗号分隔,例如’worker.groups=default,test’ worker启动时会根据该配置自动加入对应的分组
8.alert.properties [Alert 告警服务配置]
参数默认值描述
alert.typeEMAIL告警类型
mail.protocolSMTP邮件服务器协议
mail.server.hostxxx.xxx.com邮件服务器地址
mail.server.port25邮件服务器端口
mail.senderxxx@xxx.com发送人邮箱
mail.userxxx@xxx.com发送人邮箱名称
mail.passwd111111发送人邮箱密码
mail.smtp.starttls.enabletrue邮箱是否开启tls
mail.smtp.ssl.enablefalse邮箱是否开启ssl
mail.smtp.ssl.trustxxx.xxx.com邮箱ssl白名单
xls.file.path/tmp/xls邮箱附件临时工作目录
以下为企业微信配置[选填]
enterprise.wechat.enablefalse企业微信是否启用
enterprise.wechat.corp.idxxxxxxx
enterprise.wechat.secretxxxxxxx
enterprise.wechat.agent.idxxxxxxx
enterprise.wechat.usersxxxxxxx
enterprise.wechat.token.urlhttps://qyapi.weixin.qq.com/cgi-bin/gettoken? corpid=corpId&corpsecret=secret
enterprise.wechat.push.urlhttps://qyapi.weixin.qq.com/cgi-bin/message/send? access_token=$token
enterprise.wechat.user.send.msg发送消息格式
enterprise.wechat.team.send.msg群发消息格式
plugin.dir/Users/xx/your/path/to/plugin/dir插件目录
9.quartz.properties [Quartz配置]

这里面主要是quartz配置,请结合实际业务场景&资源进行配置,本文暂时不做展开.

参数默认值描述
org.quartz.jobStore.driverDelegateClassorg.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.driverDelegateClassorg.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.scheduler.instanceNameDolphinScheduler
org.quartz.scheduler.instanceIdAUTO
org.quartz.scheduler.makeSchedulerThreadDaemontrue
org.quartz.jobStore.usePropertiesfalse
org.quartz.threadPool.classorg.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemonstrue
org.quartz.threadPool.threadCount25
org.quartz.threadPool.threadPriority5
org.quartz.jobStore.classorg.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.tablePrefixQRTZ_
org.quartz.jobStore.isClusteredtrue
org.quartz.jobStore.misfireThreshold60000
org.quartz.jobStore.clusterCheckinInterval5000
org.quartz.jobStore.acquireTriggersWithinLocktrue
org.quartz.jobStore.dataSourcemyDs
org.quartz.dataSource.myDs.connectionProvider.classorg.apache.dolphinscheduler.service.quartz.DruidConnectionProvider
10.install_config.conf [DS环境变量配置脚本[用于DS安装/启动]]

install_config.conf这个配置文件比较繁琐,这个文件主要有两个地方会用到.

  • 1.DS集群的自动安装.

调用install.sh脚本会自动加载该文件中的配置.并根据该文件中的内容自动配置上述的配置文件中的内容. 比如:dolphinscheduler-daemon.sh、datasource.properties、zookeeper.properties、common.properties、application-api.properties、master.properties、worker.properties、alert.properties、quartz.properties 等文件.

  • 2.DS集群的启动&关闭.

DS集群在启动&关闭的时候,会加载该配置文件中的masters,workers,alertServer,apiServers等参数,启动/关闭DS集群.

文件内容如下:

# 注意: 该配置文件中如果包含特殊字符,如: `.*[]^${}\+?|()@#&`, 请转义,
#      示例: `[` 转义为 `\[`

# 数据库类型, 目前仅支持 postgresql 或者 mysql
dbtype="mysql"

# 数据库 地址 & 端口
dbhost="192.168.xx.xx:3306"

# 数据库 名称
dbname="dolphinscheduler"


# 数据库 用户名
username="xx"

# 数据库 密码
password="xx"

# Zookeeper地址
zkQuorum="192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181"

# 将DS安装到哪个目录,如: /data1_1T/dolphinscheduler,
installPath="/data1_1T/dolphinscheduler"

# 使用哪个用户部署
# 注意: 部署用户需要sudo 权限, 并且可以操作 hdfs .
#     如果使用hdfs的话,根目录必须使用该用户进行创建.否则会有权限相关的问题.
deployUser="dolphinscheduler"


# 以下为告警服务配置
# 邮件服务器地址
mailServerHost="smtp.exmail.qq.com"

# 邮件服务器 端口
mailServerPort="25"

# 发送者
mailSender="xxxxxxxxxx"

# 发送用户
mailUser="xxxxxxxxxx"

# 邮箱密码
mailPassword="xxxxxxxxxx"

# TLS协议的邮箱设置为true,否则设置为false
starttlsEnable="true"

# 开启SSL协议的邮箱配置为true,否则为false。注意: starttlsEnable和sslEnable不能同时为true
sslEnable="false"

# 邮件服务地址值,同 mailServerHost
sslTrust="smtp.exmail.qq.com"

#业务用到的比如sql等资源文件上传到哪里,可以设置:HDFS,S3,NONE。如果想上传到HDFS,请配置为HDFS;如果不需要资源上传功能请选择NONE。
resourceStorageType="NONE"

# if S3,write S3 address,HA,for example :s3a://dolphinscheduler,
# Note,s3 be sure to create the root directory /dolphinscheduler
defaultFS="hdfs://mycluster:8020"

# 如果resourceStorageType 为S3 需要配置的参数如下:
s3Endpoint="http://192.168.xx.xx:9010"
s3AccessKey="xxxxxxxxxx"
s3SecretKey="xxxxxxxxxx"

# 如果ResourceManager是HA,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx",否则如果是单ResourceManager或者根本没用到yarn,请配置yarnHaIps=""即可,如果没用到yarn,配置为""
yarnHaIps="192.168.xx.xx,192.168.xx.xx"

# 如果是单ResourceManager,则配置为ResourceManager节点ip或主机名,否则保持默认值即可。
singleYarnIp="yarnIp1"

# 资源文件在 HDFS/S3  存储路径
resourceUploadPath="/dolphinscheduler"


# HDFS/S3  操作用户
hdfsRootUser="hdfs"

# 以下为 kerberos 配置

# kerberos是否开启
kerberosStartUp="false"
# kdc krb5 config file path
krb5ConfPath="$installPath/conf/krb5.conf"
# keytab username
keytabUserName="hdfs-mycluster@ESZ.COM"
# username keytab path
keytabPath="$installPath/conf/hdfs.headless.keytab"


# api 服务端口
apiServerPort="12345"


# 部署DS的所有主机hostname
ips="ds1,ds2,ds3,ds4,ds5"

# ssh 端口 , 默认 22
sshPort="22"

# 部署master服务主机
masters="ds1,ds2"

# 部署 worker服务的主机
# 注意: 每一个worker都需要设置一个worker 分组的名称,默认值为 "default"
workers="ds1:default,ds2:default,ds3:default,ds4:default,ds5:default"

#  部署alert服务主机
alertServer="ds3"

# 部署api服务主机 
apiServers="ds1"
11.dolphinscheduler_env.sh [环境变量配置]

通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中. 涉及到的任务类型有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等

export HADOOP_HOME=/opt/soft/hadoop
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/java
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax/bin/datax.py

export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
12.各服务日志配置文件
对应服务服务名称日志文件名
api服务日志配置文件logback-api.xml
master服务日志配置文件logback-master.xml
worker服务日志配置文件logback-worker.xml
alert服务日志配置文件logback-alert.xml

任务总体存储结构

在dolphinscheduler中创建的所有任务都保存在t_ds_process_definition 表中.

该数据库表结构如下表所示:

序号字段类型描述
1idint(11)主键
2namevarchar(255)流程定义名称
3versionint(11)流程定义版本
4release_statetinyint(4)流程定义的发布状态:0 未上线 , 1已上线
5project_idint(11)项目id
6user_idint(11)流程定义所属用户id
7process_definition_jsonlongtext流程定义JSON
8descriptiontext流程定义描述
9global_paramstext全局参数
10flagtinyint(4)流程是否可用:0 不可用,1 可用
11locationstext节点坐标信息
12connectstext节点连线信息
13receiverstext收件人
14receivers_cctext抄送人
15create_timedatetime创建时间
16timeoutint(11)超时时间
17tenant_idint(11)租户id
18update_timedatetime更新时间
19modify_byvarchar(36)修改用户
20resource_idsvarchar(255)资源ids

其中process_definition_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.

公共的数据结构如下表.

序号字段类型描述
1globalParamsArray全局参数
2tasksArray流程中的任务集合 [ 各个类型的结构请参考如下章节]
3tenantIdint租户id
4timeoutint超时时间

数据示例:

{
    "globalParams":[
        {
            "prop":"golbal_bizdate",
            "direct":"IN",
            "type":"VARCHAR",
            "value":"${system.biz.date}"
        }
    ],
    "tasks":Array[1],
    "tenantId":0,
    "timeout":0
}

任务结构

各任务类型存储结构详解

Shell节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringShell脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14maxRetryTimesString最大重试次数
15retryIntervalString重试间隔
16timeoutObject超时控制
17taskInstancePriorityString任务优先级
18workerGroupStringWorker 分组
19preTasksArray前置任务

节点数据样例:

{
    "type":"SHELL",
    "id":"tasks-80760",
    "name":"Shell Task",
    "params":{
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "rawScript":"echo "This is a shell script""
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

SQL节点

通过 SQL对指定的数据源进行数据查询、更新操作.

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SQL
3nameString名称
4paramsObject自定义参数Json 格式
5typeString数据库类型
6datasourceInt数据源id
7sqlString查询SQL语句
8udfsStringudf函数UDF函数id,以逗号分隔.
9sqlTypeStringSQL节点类型0 查询 , 1 非查询
10titleString邮件标题
11receiversString收件人
12receiversCcString抄送人
13showTypeString邮件显示类型TABLE 表格 , ATTACHMENT附件
14connParamsString连接参数
15preStatementsArray前置SQL
16postStatementsArray后置SQL
17localParamsArray自定义参数
18descriptionString描述
19runFlagString运行标识
20conditionResultObject条件分支
21successNodeArray成功跳转节点
22failedNodeArray失败跳转节点
23dependenceObject任务依赖与params互斥
24maxRetryTimesString最大重试次数
25retryIntervalString重试间隔
26timeoutObject超时控制
27taskInstancePriorityString任务优先级
28workerGroupStringWorker 分组
29preTasksArray前置任务

节点数据样例:

{
    "type":"SQL",
    "id":"tasks-95648",
    "name":"SqlTask-Query",
    "params":{
        "type":"MYSQL",
        "datasource":1,
        "sql":"select id , namge , age from emp where id =  ${id}",
        "udfs":"",
        "sqlType":"0",
        "title":"xxxx@xxx.com",
        "receivers":"xxxx@xxx.com",
        "receiversCc":"",
        "showType":"TABLE",
        "localParams":[
            {
                "prop":"id",
                "direct":"IN",
                "type":"INTEGER",
                "value":"1"
            }
        ],
        "connParams":"",
        "preStatements":[
            "insert into emp ( id,name ) value (1,'Li' )"
        ],
        "postStatements":[

        ]
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

PROCEDURE[存储过程]节点

节点数据结构如下: 节点数据样例:

SPARK节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SPARK
3nameString名称
4paramsObject自定义参数Json 格式
5mainClassString运行主类
6mainArgsString运行参数
7othersString其他参数
8mainJarObject程序 jar 包
9deployModeString部署模式local,client,cluster
10driverCoresStringdriver核数
11driverMemoryStringdriver 内存数
12numExecutorsStringexecutor数量
13executorMemoryStringexecutor内存
14executorCoresStringexecutor核数
15programTypeString程序类型JAVA,SCALA,PYTHON
16sparkVersionStringSpark 版本SPARK1 , SPARK2
17localParamsArray自定义参数
18resourceListArray资源文件
19descriptionString描述
20runFlagString运行标识
21conditionResultObject条件分支
22successNodeArray成功跳转节点
23failedNodeArray失败跳转节点
24dependenceObject任务依赖与params互斥
25maxRetryTimesString最大重试次数
26retryIntervalString重试间隔
27timeoutObject超时控制
28taskInstancePriorityString任务优先级
29workerGroupStringWorker 分组
30preTasksArray前置任务

节点数据样例:

{
    "type":"SPARK",
    "id":"tasks-87430",
    "name":"SparkTask",
    "params":{
        "mainClass":"org.apache.spark.examples.SparkPi",
        "mainJar":{
            "id":4
        },
        "deployMode":"cluster",
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "driverCores":1,
        "driverMemory":"512M",
        "numExecutors":2,
        "executorMemory":"2G",
        "executorCores":2,
        "mainArgs":"10",
        "others":"",
        "programType":"SCALA",
        "sparkVersion":"SPARK2"
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

MapReduce(MR)节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型MR
3nameString名称
4paramsObject自定义参数Json 格式
5mainClassString运行主类
6mainArgsString运行参数
7othersString其他参数
8mainJarObject程序 jar 包
9programTypeString程序类型JAVA,PYTHON
10localParamsArray自定义参数
11resourceListArray资源文件
12descriptionString描述
13runFlagString运行标识
14conditionResultObject条件分支
15successNodeArray成功跳转节点
16failedNodeArray失败跳转节点
17dependenceObject任务依赖与params互斥
18maxRetryTimesString最大重试次数
19retryIntervalString重试间隔
20timeoutObject超时控制
21taskInstancePriorityString任务优先级
22workerGroupStringWorker 分组
23preTasksArray前置任务

节点数据样例:

{
    "type":"MR",
    "id":"tasks-28997",
    "name":"MRTask",
    "params":{
        "mainClass":"wordcount",
        "mainJar":{
            "id":5
        },
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "mainArgs":"/tmp/wordcount/input /tmp/wordcount/output/",
        "others":"",
        "programType":"JAVA"
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

Python节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型PYTHON
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringPython脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14maxRetryTimesString最大重试次数
15retryIntervalString重试间隔
16timeoutObject超时控制
17taskInstancePriorityString任务优先级
18workerGroupStringWorker 分组
19preTasksArray前置任务

节点数据样例:

{
    "type":"PYTHON",
    "id":"tasks-5463",
    "name":"Python Task",
    "params":{
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "rawScript":"print("This is a python script")"
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

Flink节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型FLINK
3nameString名称
4paramsObject自定义参数Json 格式
5mainClassString运行主类
6mainArgsString运行参数
7othersString其他参数
8mainJarObject程序 jar 包
9deployModeString部署模式local,client,cluster
10slotStringslot数量
11taskManagerStringtaskManager数量
12taskManagerMemoryStringtaskManager内存数
13jobManagerMemoryStringjobManager内存数
14programTypeString程序类型JAVA,SCALA,PYTHON
15localParamsArray自定义参数
16resourceListArray资源文件
17descriptionString描述
18runFlagString运行标识
19conditionResultObject条件分支
20successNodeArray成功跳转节点
21failedNodeArray失败跳转节点
22dependenceObject任务依赖与params互斥
23maxRetryTimesString最大重试次数
24retryIntervalString重试间隔
25timeoutObject超时控制
26taskInstancePriorityString任务优先级
27workerGroupStringWorker 分组
38preTasksArray前置任务

节点数据样例:

{
    "type":"FLINK",
    "id":"tasks-17135",
    "name":"FlinkTask",
    "params":{
        "mainClass":"com.flink.demo",
        "mainJar":{
            "id":6
        },
        "deployMode":"cluster",
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "slot":1,
        "taskManager":"2",
        "jobManagerMemory":"1G",
        "taskManagerMemory":"2G",
        "executorCores":2,
        "mainArgs":"100",
        "others":"",
        "programType":"SCALA"
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

HTTP节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型HTTP
3nameString名称
4paramsObject自定义参数Json 格式
5urlString请求地址
6httpMethodString请求方式GET,POST,HEAD,PUT,DELETE
7httpParamsArray请求参数
8httpCheckConditionString校验条件默认响应码200
9conditionString校验内容
10localParamsArray自定义参数
11descriptionString描述
12runFlagString运行标识
13conditionResultObject条件分支
14successNodeArray成功跳转节点
15failedNodeArray失败跳转节点
16dependenceObject任务依赖与params互斥
17maxRetryTimesString最大重试次数
18retryIntervalString重试间隔
19timeoutObject超时控制
20taskInstancePriorityString任务优先级
21workerGroupStringWorker 分组
22preTasksArray前置任务

节点数据样例:

{
    "type":"HTTP",
    "id":"tasks-60499",
    "name":"HttpTask",
    "params":{
        "localParams":[

        ],
        "httpParams":[
            {
                "prop":"id",
                "httpParametersType":"PARAMETER",
                "value":"1"
            },
            {
                "prop":"name",
                "httpParametersType":"PARAMETER",
                "value":"Bo"
            }
        ],
        "url":"https://www.xxxxx.com:9012",
        "httpMethod":"POST",
        "httpCheckCondition":"STATUS_CODE_DEFAULT",
        "condition":""
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

DataX节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型DATAX
3nameString名称
4paramsObject自定义参数Json 格式
5customConfigInt自定义类型0定制 , 1自定义
6dsTypeString源数据库类型
7dataSourceInt源数据库ID
8dtTypeString目标数据库类型
9dataTargetInt目标数据库ID
10sqlStringSQL语句
11targetTableString目标表
12jobSpeedByteInt限流(字节数)
13jobSpeedRecordInt限流(记录数)
14preStatementsArray前置SQL
15postStatementsArray后置SQL
16jsonString自定义配置customConfig=1时生效
17localParamsArray自定义参数customConfig=1时生效
18descriptionString描述
19runFlagString运行标识
20conditionResultObject条件分支
21successNodeArray成功跳转节点
22failedNodeArray失败跳转节点
23dependenceObject任务依赖与params互斥
24maxRetryTimesString最大重试次数
25retryIntervalString重试间隔
26timeoutObject超时控制
27taskInstancePriorityString任务优先级
28workerGroupStringWorker 分组
29preTasksArray前置任务

节点数据样例:

{
    "type":"DATAX",
    "id":"tasks-91196",
    "name":"DataxTask-DB",
    "params":{
        "customConfig":0,
        "dsType":"MYSQL",
        "dataSource":1,
        "dtType":"MYSQL",
        "dataTarget":1,
        "sql":"select id, name ,age from user ",
        "targetTable":"emp",
        "jobSpeedByte":524288,
        "jobSpeedRecord":500,
        "preStatements":[
            "truncate table emp "
        ],
        "postStatements":[
            "truncate table user"
        ]
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

Sqoop节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SQOOP
3nameString名称
4paramsObject自定义参数JSON 格式
5concurrencyInt并发度
6modelTypeString流向import,export
7sourceTypeString数据源类型
8sourceParamsString数据源参数JSON格式
9targetTypeString目标数据类型
10targetParamsString目标数据参数JSON格式
11localParamsArray自定义参数
12descriptionString描述
13runFlagString运行标识
14conditionResultObject条件分支
15successNodeArray成功跳转节点
16failedNodeArray失败跳转节点
17dependenceObject任务依赖与params互斥
18maxRetryTimesString最大重试次数
19retryIntervalString重试间隔
20timeoutObject超时控制
21taskInstancePriorityString任务优先级
22workerGroupStringWorker 分组
23preTasksArray前置任务

节点数据样例:

{
            "type":"SQOOP",
            "id":"tasks-82041",
            "name":"Sqoop Task",
            "params":{
                "concurrency":1,
                "modelType":"import",
                "sourceType":"MYSQL",
                "targetType":"HDFS",
                "sourceParams":"{"srcType":"MYSQL","srcDatasource":1,"srcTable":"","srcQueryType":"1","srcQuerySql":"selec id , name from user","srcColumnType":"0","srcColumns":"","srcConditionList":[],"mapColumnHive":[{"prop":"hivetype-key","direct":"IN","type":"VARCHAR","value":"hivetype-value"}],"mapColumnJava":[{"prop":"javatype-key","direct":"IN","type":"VARCHAR","value":"javatype-value"}]}",
                "targetParams":"{"targetPath":"/user/hive/warehouse/ods.db/user","deleteTargetDir":false,"fileType":"--as-avrodatafile","compressionCodec":"snappy","fieldsTerminated":",","linesTerminated":"@"}",
                "localParams":[

                ]
            },
            "description":"",
            "runFlag":"NORMAL",
            "conditionResult":{
                "successNode":[
                    ""
                ],
                "failedNode":[
                    ""
                ]
            },
            "dependence":{

            },
            "maxRetryTimes":"0",
            "retryInterval":"1",
            "timeout":{
                "strategy":"",
                "interval":null,
                "enable":false
            },
            "taskInstancePriority":"MEDIUM",
            "workerGroup":"default",
            "preTasks":[

            ]
        }

条件分支节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数null
5descriptionString描述
6runFlagString运行标识
7conditionResultObject条件分支
8successNodeArray成功跳转节点
9failedNodeArray失败跳转节点
10dependenceObject任务依赖与params互斥
11maxRetryTimesString最大重试次数
12retryIntervalString重试间隔
13timeoutObject超时控制
14taskInstancePriorityString任务优先级
15workerGroupStringWorker 分组
16preTasksArray前置任务

节点数据样例:

{
    "type":"CONDITIONS",
    "id":"tasks-96189",
    "name":"条件",
    "params":{

    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            "test04"
        ],
        "failedNode":[
            "test05"
        ]
    },
    "dependence":{
        "relation":"AND",
        "dependTaskList":[

        ]
    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[
        "test01",
        "test02"
    ]
}

子流程节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数Json 格式
5processDefinitionIdInt流程定义id
6descriptionString描述
7runFlagString运行标识
8conditionResultObject条件分支
9successNodeArray成功跳转节点
10failedNodeArray失败跳转节点
11dependenceObject任务依赖与params互斥
12maxRetryTimesString最大重试次数
13retryIntervalString重试间隔
14timeoutObject超时控制
15taskInstancePriorityString任务优先级
16workerGroupStringWorker 分组
17preTasksArray前置任务

节点数据样例:

{
            "type":"SUB_PROCESS",
            "id":"tasks-14806",
            "name":"SubProcessTask",
            "params":{
                "processDefinitionId":2
            },
            "description":"",
            "runFlag":"NORMAL",
            "conditionResult":{
                "successNode":[
                    ""
                ],
                "failedNode":[
                    ""
                ]
            },
            "dependence":{

            },
            "timeout":{
                "strategy":"",
                "interval":null,
                "enable":false
            },
            "taskInstancePriority":"MEDIUM",
            "workerGroup":"default",
            "preTasks":[

            ]
        }

依赖(DEPENDENT)节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型DEPENDENT
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringShell脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14relationString关系AND,OR
15dependTaskListArray依赖任务清单
16maxRetryTimesString最大重试次数
17retryIntervalString重试间隔
18timeoutObject超时控制
19taskInstancePriorityString任务优先级
20workerGroupStringWorker 分组
21preTasksArray前置任务

节点数据样例:

{
            "type":"DEPENDENT",
            "id":"tasks-57057",
            "name":"DenpendentTask",
            "params":{

            },
            "description":"",
            "runFlag":"NORMAL",
            "conditionResult":{
                "successNode":[
                    ""
                ],
                "failedNode":[
                    ""
                ]
            },
            "dependence":{
                "relation":"AND",
                "dependTaskList":[
                    {
                        "relation":"AND",
                        "dependItemList":[
                            {
                                "projectId":1,
                                "definitionId":7,
                                "definitionList":[
                                    {
                                        "value":8,
                                        "label":"MRTask"
                                    },
                                    {
                                        "value":7,
                                        "label":"FlinkTask"
                                    },
                                    {
                                        "value":6,
                                        "label":"SparkTask"
                                    },
                                    {
                                        "value":5,
                                        "label":"SqlTask-Update"
                                    },
                                    {
                                        "value":4,
                                        "label":"SqlTask-Query"
                                    },
                                    {
                                        "value":3,
                                        "label":"SubProcessTask"
                                    },
                                    {
                                        "value":2,
                                        "label":"Python Task"
                                    },
                                    {
                                        "value":1,
                                        "label":"Shell Task"
                                    }
                                ],
                                "depTasks":"ALL",
                                "cycle":"day",
                                "dateValue":"today"
                            }
                        ]
                    },
                    {
                        "relation":"AND",
                        "dependItemList":[
                            {
                                "projectId":1,
                                "definitionId":5,
                                "definitionList":[
                                    {
                                        "value":8,
                                        "label":"MRTask"
                                    },
                                    {
                                        "value":7,
                                        "label":"FlinkTask"
                                    },
                                    {
                                        "value":6,
                                        "label":"SparkTask"
                                    },
                                    {
                                        "value":5,
                                        "label":"SqlTask-Update"
                                    },
                                    {
                                        "value":4,
                                        "label":"SqlTask-Query"
                                    },
                                    {
                                        "value":3,
                                        "label":"SubProcessTask"
                                    },
                                    {
                                        "value":2,
                                        "label":"Python Task"
                                    },
                                    {
                                        "value":1,
                                        "label":"Shell Task"
                                    }
                                ],
                                "depTasks":"SqlTask-Update",
                                "cycle":"day",
                                "dateValue":"today"
                            }
                        ]
                    }
                ]
            },
            "maxRetryTimes":"0",
            "retryInterval":"1",
            "timeout":{
                "strategy":"",
                "interval":null,
                "enable":false
            },
            "taskInstancePriority":"MEDIUM",
            "workerGroup":"default",
            "preTasks":[

            ]
        }

负载均衡

负载均衡即通过路由算法(通常是集群环境),合理的分摊服务器压力,达到服务器性能的最大优化。

DolphinScheduler-Worker 负载均衡算法

DolphinScheduler-Master 分配任务至 worker,默认提供了三种算法:

加权随机(random)

平滑轮询(roundrobin)

线性负载(lowerweight)

默认配置为线性加权负载。

由于路由是在客户端做的,即 master 服务,因此你可以更改 master.properties 中的 master.host.selector 来配置你所想要的算法。

eg:master.host.selector=random(不区分大小写)

Worker 负载均衡配置

配置文件 worker.properties

权重

上述所有的负载算法都是基于权重来进行加权分配的,权重影响分流结果。你可以在 修改 worker.weight 的值来给不同的机器设置不同的权重。

预热

考虑到 JIT 优化,我们会让 worker 在启动后低功率的运行一段时间,使其逐渐达到最佳状态,这段过程我们称之为预热。感兴趣的同学可以去阅读 JIT 相关的文章。

因此 worker 在启动后,他的权重会随着时间逐渐达到最大(默认十分钟,我们没有提供配置项,如果需要,你可以修改并提交相关的 PR)。

负载均衡算法细述

随机(加权)

该算法比较简单,即在符合的 worker 中随机选取一台(权重会影响他的比重)。

平滑轮询(加权)

加权轮询算法一个明显的缺陷。即在某些特殊的权重下,加权轮询调度会生成不均匀的实例序列,这种不平滑的负载可能会使某些实例出现瞬时高负载的现象,导致系统存在宕机的风险。为了解决这个调度缺陷,我们提供了平滑加权轮询算法。

每台 worker 都有两个权重,即 weight(预热完成后保持不变),current_weight(动态变化),每次路由。都会遍历所有的 worker,使其 current_weight+weight,同时累加所有 worker 的 weight,计为 total_weight,然后挑选 current_weight 最大的作为本次执行任务的 worker,与此同时,将这台 worker 的 current_weight-total_weight。

线性加权(默认算法)

该算法每隔一段时间会向注册中心上报自己的负载信息。我们主要根据两个信息来进行判断

  • load 平均值(默认是 CPU 核数 *2)
  • 可用物理内存(默认是 0.3,单位是 G)

如果两者任何一个低于配置项,那么这台 worker 将不参与负载。(即不分配流量)

你可以在 worker.properties 修改下面的属性来自定义配置

  • worker.max.cpuload.avg=-1 (worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2)
  • worker.reserved.memory=0.3 (worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G)
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐