编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

dolphinscheduler集成数据质量任务

wxchong 2024-09-12 22:02:33 开源技术 11 ℃ 0 评论

参考官方文档:

https://dolphinscheduler.apache.org/zh-cn/docs/3.1.5/guide/data-quality


  1. Spark on yarn部署


1.1 配置Spark运行环境

/home/hadoop/spark-3.0.0/conf/spark-env.sh添加环境变量

YARN_CONF_DIR=/home/hadoop/hadoop-3.2.4/etc/hadoop

export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hn01:8020/spark/logs
-Dspark.history.retainedApplications=30"

hdfs创建目录:hadoop fs -mkdir -p /spark/logs/

1.2 配置worker节点

/home/hadoop/spark-3.0.0/conf/slaves配置

hn08
hn09
hn10

1.3 配置环境变量

[tomtop@ql1.tt.com ~]# vim /home/hadoop/.bashrc

#SPARK_HOME环境变量

export SPARK_HOME=/home/hadoop/spark-3.0.0

export PATH=$PATH:$SPARK_HOME/bin

[tomtop@ql1.tt.com ~]$ source /home/hadoop/.bashrc

1.4 Spark添加配置

/home/hadoop/spark-3.0.0/conf/spark-defaults.conf添加配置

spark.yarn.jars  hdfs://hn01:8020/spark/jars/*
spark.master                     yarn
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hn01:8020/spark/logs

spark.yarn.historyServer.address=hn08:18080
spark.history.ui.port=18080
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

spark.shuffle.service.enabled true
spark.shuffle.service.port 7337
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.maxExecutors 4
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s

1.5 配置依赖spark jar包

当Spark Application应用提交运行在YARN上时,默认情况下,每次提交应用都需要将依赖Spark相关jar包上传到YARN 集群中,为了节省提交时间和存储空间,将Spark相关jar包上传到HDFS目录中,设置属性告知Spark Application应用。


排除hive依赖,spark目录执行:

mkdir mkdir hive-spark-jars
mv jars/*hive*.jar hive-spark-jars/

添加hive版本依赖,spark目录执行:

cd hive-spark-jars
cp hive-storage-api-2.7.1.jar spark-hive_2.12-3.0.0.jar ../jars/
cp ~/apache-hive-3.1.3-bin/lib/hive-common-3.1.2.jar jars/
cd jars
[hadoop@hn08 ~/spark-3.0.0/jars]$ ll | grep hive

spark jar包上传至hdfs

hadoop fs -mkdir -p /spark/jars/
hadoop fs -put spark-3.0.0/jars/*.jar  /spark/jars/

hive相关jar包仅保留这三个。


~/hadoop-3.2.4/share/hadoop/yarn/lib增加jar包:spark-3.0.0-yarn-shuffle.jar

cp ~/spark-3.0.0/yarn/spark-3.0.0-yarn-shuffle.jar ~/hadoop-3.2.4/share/hadoop/yarn/lib/

1.6 分发各节点

 xsync.sh ~/spark-3.0.0

1.7 启动SPARK历史服务

hn08上启动SPARK历史服务,spark目录/sbin下执行:

添加执行权限:chmod +x *.sh
启动历史服务:./start-history-server.sh

历史服务UI:http://hn08:18080/


  1. dolphinscheduler增加配置
  1. /home/hadoop/dolphinscheduler/bin/envdolphinscheduler_env.sh文件增加配置:
export SPARK_HOME2=${SPARK_HOME2:-/home/hadoop/spark-3.0.0}
  1. /home/hadoop/dolphinscheduler/worker-server/conf/common.properties

添加配置信息:

data-quality.jar.name=dolphinscheduler-data-quality-3.1.5.jar
  • 这里的data-quality.jar.name请根据实际打包的名称来填写。
  1. /home/hadoop/dolphinscheduler/worker-server/conf/和/home/hadoop/dolphinscheduler/worker-server/conf/的common.properties修改配置对应hdfs命名服务
resource.hdfs.fs.defaultFS=hdfs://tTCluster:8020

jvm配置:

[hadoop@ql1.tt.com ~/module/dolphinscheduler/bin/env]$ vim dolphinscheduler_env.sh

export JAVA_OPTS="
-server
-Xmx16g
-Xms1g
-Xss512k
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:+UseFastAccessorMethods
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSInitiatingOccupancyFraction=70
-Duser.timezone=${SPRING_JACKSON_TIME_ZONE}-XX:+PrintGCDetails
-Xloggc:gc.log
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=dump.hprof
"
含义如下:
-server:这是JVM的一种运行模式,它会优化代码,使其运行更快,但是启动速度会稍慢。
-Xmx10g:设置JVM最大可用内存为16GB。
-Xms1g:设置JVM初始内存大小为1GB。
-Xss512k:设置每个线程的堆栈大小为512KB。
-XX:+UseConcMarkSweepGC:使用CMS垃圾收集器。
-XX:+CMSParallelRemarkEnabled:并行清除阶段,多线程并行清除。
-XX:+UseFastAccessorMethods:原始类型的快速优化访问。
-XX:+UseCMSInitiatingOccupancyOnly:仅在到达阈值时才在CMS中启动标记过程。
-XX:CMSInitiatingOccupancyFraction=70:设置CMS收集器在堆内存使用达到70%时开始收集。
-Duser.timezone=${SPRING_JACKSON_TIME_ZONE}:设置用户的时区。
-XX:+PrintGCDetails:打印垃圾收集的详细信息。
-Xloggc:gc.log:将垃圾收集日志输出到名为gc.log的文件。
-XX:+HeapDumpOnOutOfMemoryError:在内存溢出时生成堆转储文件。
-XX:HeapDumpPath=dump.hprof:设置堆转储文件的路径为dump.hprof。


  1. 重启dolphinscheduler集群服务

/home/hadoop/dolphinscheduler/bin下执行命令:

# 一键停止集群所有服务
./stop-all.sh

# 一键开启集群所有服务
./start-all.sh

# 查看集群服务状态
./status-all.sh


  1. 数据质量任务测试

dim_user_l空值检查staff_name:





检查逻辑详解

https://dolphinscheduler.apache.org/zh-cn/docs/3.1.5/guide/data-quality

  • 校验公式:[校验方式][操作符][阈值],如果结果为真,则表明数据不符合期望,执行失败策略
  • 校验方式:[Expected-Actual][期望值-实际值][Actual-Expected][实际值-期望值][Actual/Expected][实际值/期望值]x100%[(Expected-Actual)/Expected][(期望值-实际值)/期望值]x100%
  • 操作符:=、>、>=、<、<=、!=
  • 期望值类型固定值日均值周均值月均值最近7天均值最近30天均值源表总行数目标表总行数
  • 例子校验方式为:[Actual/Expected][实际值/期望值]x100%[操作符]:>[阈值]:0.2期望值类型:源表总行数
  • 假设实际值为10,操作符为 >, 期望值为20,那么结果 10 / 20 > 0.2 为真,那就意味列为空的行数据已经超过阈值,任务被判定为失败,执行失败策略告警

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表