调度平台dolphinscheduler
dolphinscheduler的内部可分为四个模块:
MasterServer、WorkerServer、AlertServer、ApiServer
1.架构说明
MasterServer
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。
MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。
该服务内主要包含:
- DistributedQuartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作;
- MasterSchedulerService是一个扫描线程,定时扫描数据库中的
t_ds_command
表,根据不同的命令类型进行不同的业务操作;() - WorkflowExecuteRunnable主要是负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理;
- TaskExecuteRunnable主要负责任务的处理和持久化,并生成任务事件提交到工作流的事件队列;
- EventExecuteService主要负责工作流实例的事件队列的轮询;
- StateWheelExecuteThread主要负责工作流和任务超时、任务重试、任务依赖的轮询,并生成对应的工作流或任务事件提交到工作流的事件队列;
- FailoverExecuteThread主要负责Master容错和Worker容错的相关逻辑;
启动步骤:
1.启动RPC服务(启动Netty 服务端服务)
2.导入Plugin
3.注册到Zookeeper
4.启动scheduler定时任务
5.启动Event处理器
6.启动StateWheel处理器
1 |
|
WorkerServer
WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。 WorkerServer基于netty提供监听服务。
该服务包含:
- WorkerManagerThread主要负责任务队列的提交,不断从任务队列中领取任务,提交到线程池处理;
- TaskExecuteThread主要负责任务执行的流程,根据不同的任务类型进行任务的实际处理;
- RetryReportTaskStatusThread主要负责定时轮询向Master汇报任务的状态,直到Master回复状态的ack,避免任务状态丢失;
启动步骤:
1.启动RPC服务(启动Netty 服务端服务)
2.注册到Zookeeper
3.维护workserver节点状态
4.启动TaskExecuteThread
5.启动RetryReportTaskStatusThread
1 |
|
ZooKeeper
ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。 我们也曾经基于Redis实现过队列,不过我们希望DolphinScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现。
AlertServer
提供告警服务,通过告警插件的方式实现丰富的告警手段。
ApiServer
API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。
UI
系统的前端页面,提供系统的各种可视化操作界面。
2.任务执行流程
首先我们可以从UI界面具体分析一些工作流是如何运行的?
1.通过一系列的执行,最终会解析一些参数,解析完之后会生成一条记录(Create Command)
在UI界面工作流界面点击某个工作流运行后会执行以下代码:
DolphinScheduler-api : ExecutorController.java → startProcessInstance(execService.execProcessInstance()) → createCommand() →写入表 t_ds_command
1 |
|
2.定时任务轮询获取
MasterServer会不定时扫描表里面的记录,然后拉取出来,最后构建流程的实例
Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
Dolphinscheduler-server: MasterSchedulerBootstrap.java → scheduleProcess() → findCommands() → command2ProcessInstance()→ 创建ProcessInstance(processService.handleCommand()
构建DAG并分发到worker
this.masterSchedulerBootstrap.start(),Master启动完之后会不停拉取命令,然后解析完之后分发给Worker
可以看一下它的run方法,Scheduleprocess方法去找一条命令→Findonecommand,找到之后对它进行解析,创建工作流实例,交给WorkflowExecuteThread线程执行,
1 |
|
WorkflowExecuteThread run方法启动之后提交一个节点
放到优先级队列里面,构造处理的一些参数,通过Netty分发
- WorkflowExecuteThread.java → startProcess() → buildFlowDag() → submitPostNode() → submitStandByTask() → submitTaskExec()
- TaskProcessor.java → dispatchTask() → 放入TaskPriorityQueue
- TaskPriorityQueueConsumer.java: → dispatch()
- ExecutorDispatcher.java → dispatch()
- NettyExecutorManager.java → execute() →build command → doExecute() → ettyRemotingClient.send()
3.调度问题排查
1.问题描述:工作流实例显示运行,任务实例没有任务
解决方法:
检查日志
查看master日志
tail -f /opt/dolphinscheduler/logs/dolphinscheduler-master.log
查看worker 日志
tail -f /opt/dolphinscheduler/logs/dolphinscheduler-worker.log
查看api 日志
tail -f /opt/dolphinscheduler/logs/dolphinscheduler-api-server.log
查看告警alert日志
tail -f /opt/dolphinscheduler/logs/dolphinscheduler-alert.log
确认服务启动情况:
确认服务启动情况:
ps -aux |grep
# master服务
ps -aux |grep MasterServer
# worker服务
ps -aux |grep WorkerServer
# logger服务
ps -aux |grep LoggerServer
# alert服务
ps -aux |grep AlertServer
重启服务:
重启集群所有服务
/opt/dolphinscheduler/bin/stop-all.sh
/opt/dolphinscheduler/bin/start-all.sh
单独启动workerServer服务
/opt/dolphinscheduler/bin/dolphinscheduler-daemon.sh stop worker-server
/opt/dolphinscheduler/bin/dolphinscheduler-daemon.sh start worker-server
2.工作流实例准备停止,导致工作流无法删除
进入dolph数据库
– 工作流
SELECT * FROM t_ds_process_definition
where name =’ads_spk_t_customer_analyze_recharge’ ;
– 工作流实例(工作流执行list)
– 查看准备停止的list 流程实例状态:0提交成功1运行2准备暂停3暂停4准备停止5停止6失败7成功8需要容错9杀死10等待线程11等待依赖关系完成
SELECT t.* FROM t_ds_process_instance t
LEFT JOIN t_ds_process_definition t1
ON t1.code =t.process_definition_code
where t1.name =’ads_spk_t_customer_analyze_recharge’
AND t.state =’4’;
– 删除准备停止的实例
delete from t_ds_process_instance
where state =’4’
AND name like ‘ads_spk_t_customer_analyze_recharge%’ ;
参考
github:https://github.com/apache/dolphinscheduler/blob/dev/docs/docs/zh/architecture/design.md
觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭
wechat pay
alipay