调度平台dolphinscheduler

dolphinscheduler的内部可分为四个模块:

MasterServer、WorkerServer、AlertServer、ApiServer

1.架构说明

MasterServer

MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。

MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。

该服务内主要包含:
  • DistributedQuartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作;
  • MasterSchedulerService是一个扫描线程,定时扫描数据库中的t_ds_command表,根据不同的命令类型进行不同的业务操作;()
  • WorkflowExecuteRunnable主要是负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理;
  • TaskExecuteRunnable主要负责任务的处理和持久化,并生成任务事件提交到工作流的事件队列;
  • EventExecuteService主要负责工作流实例的事件队列的轮询;
  • StateWheelExecuteThread主要负责工作流和任务超时、任务重试、任务依赖的轮询,并生成对应的工作流或任务事件提交到工作流的事件队列;
  • FailoverExecuteThread主要负责Master容错和Worker容错的相关逻辑;
启动步骤:

1.启动RPC服务(启动Netty 服务端服务)

2.导入Plugin

3.注册到Zookeeper

4.启动scheduler定时任务

5.启动Event处理器

6.启动StateWheel处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1.对应路径里面存放了MasterServer.java的类,里面有对应的main方法
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
}

2.执行完构造函数后,会启动run方法及其各个组件
/**
* run master server
* 通常我们会是在Spring框架中使用到@PostConstruct注解
* 该注解的方法在整个Bean初始化中的执行顺序:
* Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
*/
@PostConstruct
public void run() throws SchedulerException {
// 1.init rpc server
this.masterRPCServer.start();

// 2.install task plugin
this.taskPluginManager.loadPlugin();

// 2.self tolerant
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);

// 4.MasterSchedulerService启动scheduler定时任务
this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();

// 5.启动Event处理器
this.eventExecuteService.start();
this.failoverExecuteThread.start();

// CRUD操作
this.schedulerApi.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) {
close("MasterServer shutdownHook");
}
}));
}

3.注册元数据信息到Zookeeper上面,值得一提的是这里采用的是临时路径,比如说在过程中服务断开了或者session过期,临时路径过一段时间会自己去Delete掉。

WorkerServer

WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。 WorkerServer基于netty提供监听服务。

该服务包含:
  • WorkerManagerThread主要负责任务队列的提交,不断从任务队列中领取任务,提交到线程池处理;
  • TaskExecuteThread主要负责任务执行的流程,根据不同的任务类型进行任务的实际处理;
  • RetryReportTaskStatusThread主要负责定时轮询向Master汇报任务的状态,直到Master回复状态的ack,避免任务状态丢失;
启动步骤:

1.启动RPC服务(启动Netty 服务端服务)

2.注册到Zookeeper

3.维护workserver节点状态

4.启动TaskExecuteThread

5.启动RetryReportTaskStatusThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@PostConstruct
public void run() {
//启动Netty服务,过程中也会提供一些端口,以便和其他进程交互
this.workerRpcServer.start();
this.workerRpcClient.start();
this.taskPluginManager.loadPlugin();

//向Zookeeper注册信息,其中WorkerregistryClient调用了Zookeeper的代码
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();

//启动组件,WorkerManagerThread主要是管理Master发过来的任务
this.workerManagerThread.start();

// 启动组件,messageRetryRunner主要是给Master反馈信息
this.messageRetryRunner.start();

/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!ServerLifeCycleManager.isStopped()) {
close("WorkerServer shutdown hook");
}
}));
}

ZooKeeper

ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。 我们也曾经基于Redis实现过队列,不过我们希望DolphinScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现。

AlertServer

提供告警服务,通过告警插件的方式实现丰富的告警手段。

ApiServer

API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。

UI

系统的前端页面,提供系统的各种可视化操作界面。


2.任务执行流程

首先我们可以从UI界面具体分析一些工作流是如何运行的?

1.通过一系列的执行,最终会解析一些参数,解析完之后会生成一条记录(Create Command)

在UI界面工作流界面点击某个工作流运行后会执行以下代码:

DolphinScheduler-api : ExecutorController.java → startProcessInstance(execService.execProcessInstance()) → createCommand() →写入表 t_ds_command

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
--------------DolphinScheduler-API模块下/controller/ExecutorController.java------------------
--------------------------------------------------------------------------------------------
---ExecutorController.java的startProcessInstance()方法中-------------------------------------
--------------------------------------------调用execService.execProcessInstance()的方法------

public Result startProcessInstance(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "version", required = false) Integer version) {

if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
}
Map<String, String> startParamMap = null;
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}

if (complementDependentMode == null) {
complementDependentMode = ComplementDependentMode.OFF_MODE;
}

Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag,
complementDependentMode, version);
return returnDataList(result);
}


--------------------------------------------------------------------------------------------
-----------ExecutorServiceImpl.java的execProcessInstance()方法中调用createCommand()的方法------
--------------------------------------------------------------------------------------------
@Override
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode,
String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType,
Integer warningGroupId, RunMode runMode,
Priority processInstancePriority, String workerGroup,
Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun, int testFlag,
ComplementDependentMode complementDependentMode, Integer version) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_START);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
log.warn("Parameter timeout is invalid, timeout:{}.", timeout);
putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
ProcessDefinition processDefinition;
if (null != version) {
processDefinition = processService.findProcessDefinition(processDefinitionCode, version);
} else {
processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
}
// check process define release state
this.checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode,
processDefinition.getVersion());
// check current version whether include startNodeList
checkStartNodeList(startNodeList, processDefinitionCode, processDefinition.getVersion());
if (!checkTenantSuitable(processDefinition)) {
log.error(
"There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
processDefinition.getCode(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}

checkScheduleTimeNumExceed(commandType, cronTime);
checkMasterExists();

long triggerCode = CodeGenerateUtils.getInstance().genCode();

/**
* create command
*/
int create =
this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType,
failureStrategy,
startNodeList,
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup,
environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
complementDependentMode);

2.定时任务轮询获取

MasterServer会不定时扫描表里面的记录,然后拉取出来,最后构建流程的实例

Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.

Dolphinscheduler-server: MasterSchedulerBootstrap.java → scheduleProcess() → findCommands() → command2ProcessInstance()→ 创建ProcessInstance(processService.handleCommand()

构建DAG并分发到worker

this.masterSchedulerBootstrap.start(),Master启动完之后会不停拉取命令,然后解析完之后分发给Worker

可以看一下它的run方法,Scheduleprocess方法去找一条命令→Findonecommand,找到之后对它进行解析,创建工作流实例,交给WorkflowExecuteThread线程执行,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
MasterSchedulerBootstrap.java类

public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}

@Override
public synchronized void start() {
log.info("Master schedule bootstrap starting..");
super.start();
workflowEventLooper.start();
log.info("Master schedule bootstrap started...");
}

@Override
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
// the current server is not at running status, cannot consume command.
log.warn("The current server {} is not at running status, cannot consumes commands.",
this.masterAddress);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
log.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}

List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
// indicate that the command transform to processInstance error, sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}

WorkflowExecuteThread run方法启动之后提交一个节点

放到优先级队列里面,构造处理的一些参数,通过Netty分发

  • WorkflowExecuteThread.java → startProcess() → buildFlowDag() → submitPostNode() → submitStandByTask() → submitTaskExec()
  • TaskProcessor.java → dispatchTask() → 放入TaskPriorityQueue
  • TaskPriorityQueueConsumer.java: → dispatch()
  • ExecutorDispatcher.java → dispatch()
  • NettyExecutorManager.java → execute() →build command → doExecute() → ettyRemotingClient.send()

3.调度问题排查

1.问题描述:工作流实例显示运行,任务实例没有任务

解决方法:

检查日志

查看master日志
tail -f /opt/dolphinscheduler/logs/dolphinscheduler-master.log

查看worker 日志
tail -f /opt/dolphinscheduler/logs/dolphinscheduler-worker.log

查看api 日志

tail -f /opt/dolphinscheduler/logs/dolphinscheduler-api-server.log

查看告警alert日志

tail -f /opt/dolphinscheduler/logs/dolphinscheduler-alert.log

确认服务启动情况:

确认服务启动情况:

ps -aux |grep

# master服务

ps -aux |grep MasterServer

# worker服务

ps -aux |grep WorkerServer

# logger服务
ps -aux |grep LoggerServer

# alert服务
ps -aux |grep AlertServer

重启服务:

重启集群所有服务

/opt/dolphinscheduler/bin/stop-all.sh

/opt/dolphinscheduler/bin/start-all.sh

单独启动workerServer服务

/opt/dolphinscheduler/bin/dolphinscheduler-daemon.sh stop worker-server

/opt/dolphinscheduler/bin/dolphinscheduler-daemon.sh start worker-server

2.工作流实例准备停止,导致工作流无法删除

进入dolph数据库

– 工作流
SELECT * FROM t_ds_process_definition
where name =’ads_spk_t_customer_analyze_recharge’ ;

– 工作流实例(工作流执行list)
– 查看准备停止的list 流程实例状态:0提交成功1运行2准备暂停3暂停4准备停止5停止6失败7成功8需要容错9杀死10等待线程11等待依赖关系完成

SELECT t.* FROM t_ds_process_instance t
LEFT JOIN t_ds_process_definition t1
ON t1.code =t.process_definition_code
where t1.name =’ads_spk_t_customer_analyze_recharge’
AND t.state =’4’;

– 删除准备停止的实例
delete from t_ds_process_instance
where state =’4’
AND name like ‘ads_spk_t_customer_analyze_recharge%’ ;

参考

github:https://github.com/apache/dolphinscheduler/blob/dev/docs/docs/zh/architecture/design.md

https://zhuanlan.zhihu.com/p/512387735


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

调度平台dolphinscheduler
http://yuting0907.github.io/posts/be95ec5b.html
作者
Echo Yu
发布于
2023年3月1日
许可协议