> 文章列表 > 工作流调度系统 Azkaban使用方法大全(二)

工作流调度系统 Azkaban使用方法大全(二)

工作流调度系统 Azkaban使用方法大全(二)

1 案例

1.1 Hello World

first.project

azkaban-flow-version: 2.0

first.flow

nodes:- name: jobAtype: commandconfig:command: echo "hi 大佬"

打包为zip,即可上传

name:job名称

type:job类型。command表示要执行作业的方式为命令。

config:job配置

1.2 作业依赖案例

需求:JobA和Job执行完了,才能执行 JobC

pojo.project

azkaban-flow-version: 2.0

pojo.flow

nodes:- name: jobCtype: command# JobC 依赖于 JobA 和 JobBdependsOn:- jobA- jobBconfig:command: echo "hi 大佬"- name: jobAtype: commandconfig:command: echo "开始执行 jobA"- name: jobBtype: commandconfig:command: echo "开始执行 jobB"  

dependsOn: 作业依赖

工作流调度系统 Azkaban使用方法大全(二)

1.3 自动失败重试

需求:如果执行任务失败,需要重试3次,重试时间间隔10000ms

pojo.flow

nodes:- name: jobAtype: commandconfig:command: sh /usr/local/azkaban/test/test.shretries: 3retry.backoff: 10000

retries: 重试次数

retry.backoff: 重试的时间间隔

1.4 手动失败重试

需求:JobA->jobF,一次依赖吗,在生产环境,任何job都有可能挂掉,可以根据需求执行想要执行的job。

pojo.flow

nodes:- name: jobAtype: commandconfig:command: echo "执行jobA"- name: jobBtype: commanddependsOn:- jobAconfig:command: echo "执行jobB"- name: jobCtype: commanddependsOn:- jobBconfig:command: echo "执行jobC"- name: jobDtype: commanddependsOn:- jobCconfig:command: echo "执行jobD"- name: jobEtype: commanddependsOn:- jobDconfig:command: echo "执行jobE"- name: jobFtype: commanddependsOn:- jobEconfig:command: echo "执行jobF"  

工作流调度系统 Azkaban使用方法大全(二)

工作流调度系统 Azkaban使用方法大全(二)

或者禁止

工作流调度系统 Azkaban使用方法大全(二)

2 JavaProcess作业案例

JavaProcess类型可以运行一个自定义主类方法,type类型为javaprocess,可用配置为:

Xms: 最小堆
Xmx: 最大堆
classpath: 类路径
java.class: 要运行的java对象,必须包含Main方法
main.args: main方法的参数

案例:

1、新建一个azkaban的maven工程: azkaban_cs

2、创建包名:com.cs

3、创建 AzTest类

package com.cs;public class AzTest {public static void main(String[] args) {System.out.println("java测试AzTest");}
}

4、然后打包: azkaban_cs-1.0-SNAPSHOT.jar

5、testJava.flow

nodes:- name: test_javatype: javaprocessconfig:Xms: 96MXmx: 200Mjava.class: com.cs.AzTest

3 条件工作流

条件工作流功能允许用户自定义执行条件来决定是否运行某些job。条件可以由当前job的父job输出运行时的参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定job执行逻辑时获得更大的灵活性,例如,至于只要父job之一成功,就可以运行当前job。

3.1 运行时参数

1、运行原理

  • 父job将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件。
  • 子job使用 ${jobName:param} 来获取父job输出的参数并定义执行条件。

2、支持的运算符

==、!=、>、<、>=、<=、&& 与、

3、需求案例

  • jobA执行一个shell脚本
  • jobB执行一个shell脚本,但jobB不需要每天都执行,而只需要每周一执行。

工作流调度系统 Azkaban使用方法大全(二)

jobA.sh

#!/bin/bash
echo "执行jobA"
wk=`date +%w`
echo "{\\"wk\\":$wk}" > $JOB_OUTPUT_PROP_FILE

jobB.sh

#!/bin/bash
echo "执行jobB"

condition.flow

nodes:- name: jobAtype: commandconfig:command: sh JobA.sh- name: jobBtype: commanddependsOn:- jobAconfig:command: sh jobB.shcondition: ${jobA:wk} == 1

3.2 预定义宏案例

Azkaban中预置了几个特殊的判断条件,称为预定义宏。

预定义宏会根据所有父job的完成情况进行判断,在决定是否执行。可用的预定义宏入下:

1、all_sucess: 表示父job全部成功才执行(默认)

2、all_done:表示父job全部完成才执行

3、all_failed:表示父job全部失败才执行

4、one_sucess:表示父job至少一个成功才执行

5、one_failed:表示父job至少一个失败才执行

需求:

  • jobA执行一个shell脚本
  • jobB执行一个shell脚本
  • jobC执行一个shell脚本,要求jobA、jobB中有一个成功即可执行。

工作流调度系统 Azkaban使用方法大全(二)

jobA.sh

#!/bin/bash
echo "执行jobA"

jobB.sh

#!/bin/bash
echo "执行jobB"

jobC.sh

#!/bin/bash
echo "执行jobC"

marco.flow

nodes:- name: jobAtype: commandconfig:command: sh jobA.sh- name: jobBtype: commandconfig:command: sh jobB.sh- name: jobCtype: commanddependsOn:- jobA- jobBconfig:command: sh jobC.shcondition: one_success

4 定时执行

工作流调度系统 Azkaban使用方法大全(二)

工作流调度系统 Azkaban使用方法大全(二)

需求:jobA 每隔1分钟执行一次

使用方法同 cron,具体参考:https://www.couragesteak.com/article/189

5 报警

5.1 邮件

修改配置文件

cd /usr/local/azkaban/azkaban-web-server-3.84.4
vim conf/azkaban.properties
mail.sender=发件人邮箱
mail.host=SMTP服务器地址
mail.user=
mail.password=

重启 web server

cd /usr/local/azkaban/azkaban-web-server-3.84.4
bin/shutdown-web.sh
bin/start-web.sh

工作流调度系统 Azkaban使用方法大全(二)

5.2 电话告警

通常邮件报警可能会不及时,因此需要电话来及时通知。可与第三方平台进行集成。

睿象云:https://www.aiops.com/

5 Azkaban 多 Executor 模式注意事项

Azkaban 多 Executor模式指:在集群中多个节点部署 Executor。在这种模式下, Azkaban web server会根据策略,选取其中一个 Executor 去执行任务。

为确保所选的 Executor 能够准确的执行任务,我们须在一下两种方案任选其一,推荐方案二。

方案一:在特定的 Executor 去执行任务

1、在 MySQL 中 azkaban 数据库 executors 表中,查询 Executor的id。

use azkaban;select * from executors;

工作流调度系统 Azkaban使用方法大全(二)

这是任务脚本,可以写为绝对路径。

方案二:在 Executor 所在所有节点部署任务所需的脚本和应用。