Apache Doris在橙联的应用实践

1. 背景

1.1 公司介绍

橙联(Orange Connex)是一家服务全球跨境电商的科技公司,致力于通过市场分析、系统研发及资源整合,为客户提供物流、金融、大数据等多方面的服务产品,为全球跨境电商提供高品质、全方位的服务解决方案。橙联股份由中信产业基金和 eBAY 联合成立,拥有非常丰富的物流服务经验和客户市场资源,旨在为卖家客户提供快捷 、稳定 、成本领先 、可视及可持续的物流服务,让买家客户享有更加优质贴心的服务体验

1.2 目前痛点

随着公司业务的发展和数据的不断增长,基于 MySQL 的传统数仓架构已经无法应对公司数据的快速增长。业务的需求和运营的决策对于数据时效性的要求越来越高,对数仓准实时能力的需求越发强烈

基于 MySQL 的传统离线数仓的数据时效性是 T+1,调度频率以天为单位,无法支撑准实时场景的数据需求。即使将调度频率设置成 x 小时,也只能解决部分时效性要求不高的场景,对于实效性要求很高的场景还是无法支撑

2. 架构与实现

公司目前已搭建完成了基于 Doris 的准实时数仓 1.0 版本,在介绍实时数仓的架构之前,简单介绍公司业务的几个计算场景

公司业务主要计算场景:

场景1. 物流数据宽表

物流数据宽表数据的生成来自于几十张表的关联,以下是物流数据宽表的主要驱动表

​ 1. 轨迹时间表,日均变动数据峰值 100W 条,用于更新订单轨迹节点时间

​ 2. 订单表,日均数据变动数据峰值 25W 条,用于 ascan 和 dscan 节点时间更新

​ 3. 财务流水表,日均数据变动数据峰值 30W 条,该表用于更新订单物流费用

​ 4. 订单异常表,日均数据变动不大,用于更新异常订单状态

​ 5. 订单取消表,日均数据变动不大,用于更新订单状态

​ 6. 发货预报表

​ 7. 揽收表

​ 8. 分拣表

​ 9. 分拣大小包关系表

​ 10. 申报信息表

​ 11. 预计流程 / 预计流程回 call 表

场景2. 财务数据统计

财务日常统计需求,如按天+客户来统计客户物流费用

在统计当天,来自于供应商的部分数据并不能及时回传到业务系统,客户会在发现问题后采取补回传的操作

业务实际发生时间到数据补传时间跨度甚至在 1 个月左右,针对补回传的数据,需要对数据涉及的业务日期的天进行重新统计

场景3. SLA 计算

一条订单有多个轨迹节点,如 A、B、C、D、E

各个节点时间的回传并非严格遵守时间顺序,后续节点的时间会先于前节点时间进入系统

每个节点时间进入系统时,需要计算两个节点之间的 SLA,如 A–B,A–C,B–D 的时间

SLA 计算逻辑复杂,需要考虑不同国家节假日及冬令时/夏令时转换,若 2 个节点跨国家,还需要考虑时区转换

不同订单根据路向,物流产品,内部产品,尾程供应商等6个维度各种组合来判定 SLA 的计算逻辑

举例来说同一考核点,P1 产品需要按 A–B 来考核,P2 产品需要按照 A–C 来考核,目前单一考核点的规则最多达 20+ 种

准实时数仓架构

流批一体准实时数仓数据加工

以上几个业务场景都对数据时效性有较高的要求,为了解决业务场景数据时效性高痛点,我们采用基于 Doris 的流批一体的准实时数仓架构,Doris 的三种数据模型能够全面的覆盖公司当前所有的数据场景,为了满足从业务数据库到 Doris ODS 层的端到端 EOS,我们使用了 Unique 模型,有效的解决了数据的精准一次问题;使用 Aggregate 模型,加速固定模式的报表查询;使用 Duplicate 模型解决既没有主键也没有聚合需求的多维分析场景

以下是业务数据的采集、清洗、流转流程图

对于业务数据源 MySQL 数据的实时获取,我们使用 FlinkCDC 实时获取业务 MySQL 备库的 Binlog 日志,写入到 Kafka 做为中间缓冲,下游 Flink 消费 Kafka 数据,使用 flink-doris-connector 将数据写入 Doris ODS。通过 Flink at least once + Doris Unique 模型,实现数据从 Flink CDC → Kafka → Flink → Doris 端到端 EOS 保证

在准实时数仓的分层设计上,采用 ODS(Operation Data Store 数据准备区,也称为贴源层), 数据细节层 DWD、数据中间层 DWM、数据服务层 DWS,数据应用层 ADS 的分层思想

3. 问题及解决方案

在基于 Doris 的准实时数仓搭建过程中也解决了一些问题

1. longtext 字段数据采用外部表映射方式导入数据到 Doris 报错

不是字段 null 的问题,这个提示有误,实际问题是 detail 字段太大了,不适合利用外部表的方式导入,外部表不支持 longtext 这类超长字段的导入

以下是 MySQL 和 Doris 中的类型匹配情况

MySQL 表结构如下,detail 字段数据长度可达几十 MB

解决方案:采用 FlinkCDC 的方式,成功导入

2. 业务数据库物理删除操作

业务数据库会有物理删除的情况,FlinkCDC 在做数据抽取的时候使用 Debezium 提供的 op 来判断物理删除,Doris 的 ODS 表增加逻辑删除字段 is_deleted_flag 来标识数据是否删除

3. FlinkCDC 抽取数据写入到 Doris 报错 err=-235

FlinkCDC 抽取数据写入到 Doris 报错,err: tablet writer write failed, tablet_id=8165971, txn_id=122211550, err=-235, see more in null

排查问题发现是 Doris 集群挂了以后,在重启时积累了大量的数据写入批次,Doris 内部来不及 compaction 导致了 tablet 的数据版本超过了最大限制(默认500),这里目前暂时解决方案是将 max_tablet_version_num 扩大了 6 倍

未来将升级 Doris 1.1.1 版本,配合最新的 flink-doris-connector 来避免此类问题

Doris 1.1 版本新特性:

Compaction 实时性做出了优化,数据能快速合并,Tablet 数据版本个数维持在 50 以下, Compaction Score 稳定。相比于之前高并发导入频出的 -235 问题,Compaction 合并效率有 10+ 倍提升

CPU 资源消耗上,Doris 1.1.1 针对小文件的 Compaction 进行了策略优化,在上述高并发导入场景,CPU 资源消耗下降 25%

另外对于 QPS 查询延迟的稳定性,Doris 1.1 通过降低 CPU 使用率,减少数据版本的个数,提升了数据整体有序性,从而减少了 SQL 查询的延迟。

Doris 1.1 对高并发导入、秒级别数据同步、数据实时可见等场景都做了针对性优化,大大增加了 Flink + Doris 系统的易用性以及稳定性,节省了集群整体资源

另外我们部署了 Doris 集群 Grafana Dashboard 监控,能及时监控 Doris 集群相关指标

4. SQL执行超时

批处理过程中确实会有一些复杂的任务或者写入数据太多的任务会超时,除了调大 timeout 参数(目前设置为 10 分钟)以外,我们还把任务做了切分。有些写入的 SQL 按照分区字段或者日期区间来分批计算或引入中间层 DWM 进行复杂任务的拆分

未来会使用 Doris 多租户的功能进行资源隔离,减少大作业的资源占用对生产集群其他作业的影响

5. 调度系统存在的问题

我们之前使用 Dolphinscheduler 2.x 对 Doris 进行数据调度,使用中发现当多段 Doris SQL 编辑在同一个 node 时,会出现只执行了第一段 Doris SQL 的情况

以生产的一张宽表调度任务为例,这张宽表有 75 个字段,数据量为几十万,关联表有 15 张,调度在正常情况下不可能 1s ~ 2s 完成,debug 发现调度任务只执行了第一段 Doris SQL

针对次问题 Dolphinscheduler 2.x 官网解释是不支持 MySQL 数据源一次执行多段 SQL,Dolphinscheduler 3.0 修复了此问题,可在任务参数中配置分段执行符号

我们将 Dolphinscheduler 升级到 3.x,并持续观察运行时长,稳定在 1m30s 左右,此类调度问题被解决

结束语

目前我们数仓 1.0 版本已上线,橙联正在形成以 Apache Doris 为核心的数据生态,随着应用的深入,我们也正在创建以 Doris 为基础的数据中台和 DataOps

最后,感谢 Apache Doris 团队给予的支持,祝愿 Apache Doris 社区发展越来越好!

作者:喻婷 橙联 大数据开发工程师


觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

Apache Doris在橙联的应用实践
http://yuting0907.github.io/posts/18e85b64.html
作者
Echo Yu
发布于
2022年7月24日
许可协议