2-3.大数据国赛第2套任务B-子任务三指标计算

任务要求1

1.1编写job2脚本

1.2编写job3脚本

1.3编写job4脚本

1.4编写azkaban脚本

1.5打包上传azkaban脚本

1.6执行azkaban脚本

任务要求2

2.1实现流程概要

2.2创建DWS层表

2.3统计每个用户每天的消费金额

2.4查看统计结果

任务要求3

3.1实现流程概要

3.2创建dws层表结构

3.3按月统计省份订单总金额和订单数量

3.4对统计结果生成排序并将最终结果写入表

3.5查看统计结果

任务要求4

4.1实现流程概要

4.2创建MySQL数据库和表

4.3创建clickhouse数据库和表

4.4连接spark

4.5创建dws表

4.6统计省份和地区的平均金额

4.7比较省份和地区的平均金额

4.8将结果写入dws层表

4.9映射MySQL表到spark

4.10将DWS层分析结果数据写入MySQL表

4.11查询写入结果

4.12在clickhouse中查询结果数据

 

 

任务要求1

根据dwd_ds_hudi库中的表统计每个省每月下单的数量和下单的总金额,并按照year,month,region_id进行分组,按照total_amount逆序排序,形成sequence值,将计算结果存入Hudi的dws_ds_hudi数据库province_consumption_day_aggr表中(表结构如下),然后使用spark-shell根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,在查询时对于订单总金额字段将其转为bigint类型(避免用科学计数法展示),将SQL语句复制粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

字段 类型 中文含义 备注
uuid string 随机字符 随机字符,保证不同即可,作为primaryKey
province_id int 省份表主键  
province_name string 省份名称  
region_id int 地区主键  
region_name string 地区名称  
total_amount double 订单总金额 当月订单总金额
total_count int 订单总数 当月订单总数。同时可作为preCombineField(作为合并字段时,无意义,因为主键为随机生成)
sequence int 次序  
year int 订单产生的年,为动态分区字段
month int 订单产生的月,为动态分区字段

1.1编写job2脚本

将任务2的SQL语句写入到user_consumption_day_aggr.scala文件中,代码如下:

//(1)
spark.sql(
 """
   |create table if not exists `dws_ds_hudi`.`user_consumption_day_aggr` (
   |`uuid` string comment '随机字符',
   |`user_id` int comment '客户主键',
   |`user_name` string comment '客户名称',
   |`total_amount` double comment '订单总金额',
   |`total_count` int comment '订单总数',
   |`year` int comment '年',
   |`month` int comment '月',
   |`day` int comment '日'
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'uuid',
   | preCombineField = 'total_count'
   |)
   |partitioned by (`year`,`month`,`day`);
   |""".stripMargin)

//(2)
spark.sql(
 """ insert overwrite `dws_ds_hudi`.`user_consumption_day_aggr`
   | select
   | uuid() as `uuid`,
   | a.`user_id`,
   | max(b.`name`) as `user_name`,
   | sum(coalesce(a.`total_amount`,0)) as `total_amount`,
   | count(a.`id`) `total_count`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) as `year`,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) as `month`,
   | cast(SUBSTRING(a.`create_time`,8,2) as int) as `day`
   | from `dwd_ds_hudi`.`fact_order_info` a
   | left join
   | (select * from `dwd_ds_hudi`.`dim_user_info` where etl_date='20230901') b
   | on a.`user_id`=b.`id`
   | group by
   | a.`user_id`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) ,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) ,
   | cast(SUBSTRING(a.`create_time`,8,2) as int)
   |""".stripMargin)
//(3)
spark.sql("select * from `dws_ds_hudi`.`user_consumption_day_aggr` order by `user_id` desc, `total_amount` desc limit 5 ").show()

新建user_consumption_day_aggr.sh脚本文件,编写内容如下:

/opt/spark-3.2.3-bin-hadoop3.2/bin/spark-shell  
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' < /rgsoft/Desktop/Study/task/user_consumption_day_aggr.scala

1.2编写job3脚本

将任务3的SQL语句写入到province_consumption_day_aggr.scala文件中,代码如下:

//(1)
spark.sql(
 """
   |create table if not exists `dws_ds_hudi`.`province_consumption_day_aggr` (
   |`uuid` string comment '随机字符',
   |`province_id` int comment '省份表主键',
   |`province_name` string comment '省份名称',
   |`region_id` int comment '地区主键',
   |`region_name` string comment '地区名称',
   |`total_amount` double comment '订单总金额',
   |`total_count` int comment '订单总数',
   |`sequence` int comment '次序',
   |`year` int comment '年',
   |`month` int comment '月'
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'uuid',
   | preCombineField = 'total_count'
   |)
   |partitioned by (`year`,`month`);
   |""".stripMargin)

//(2)
spark.sql(
 """ insert overwrite `dws_ds_hudi`.`province_consumption_day_aggr`
   | select
   | uuid() as `uuid`,
   | d.`province_id`,
   | d.`province_name`,
   | d.`region_id`,
   | d.`region_name` ,
   | d.`total_amount`,
   | d.`total_count`,
   | row_number() over (partition by `year`,`month`,`region_id` order by `total_amount` desc ) as `sequence`,
   | d.`year`,
   | d.`month`
   | from
   | (select
   | a.`province_id`,
   | max(b.`name`) as `province_name`,
   | b.`region_id`,
   | max(c.`region_name`) as region_name ,
   | sum(coalesce(a.`total_amount`,0)) as `total_amount`,
   | count(a.`id`) `total_count`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) as `year`,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) as `month`
   | from `dwd_ds_hudi`.`fact_order_info` a
   | left join
   | (select * from `dwd_ds_hudi`.`dim_province` where etl_date='20230901') b
   | on a.`province_id`=b.`id`
   | left join
   | (select * from `dwd_ds_hudi`.`dim_region` where etl_date='20230901') c
   | on b.`region_id`=c.`id`
   | group by
   | a.`province_id`,
   | b.`region_id`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) ,
   | cast(SUBSTRING(a.`create_time`,5,2) as int)
   | ) d
   |""".stripMargin)
//(3)
spark.sql("select * from `dws_ds_hudi`.`province_consumption_day_aggr`order by `total_count` desc,`total_amount` desc,`province_id` desc limit 5 ").show()

新建province_consumption_day_aggr.sh脚本文件,编写内容如下:

/opt/spark-3.2.3-bin-hadoop3.2/bin/spark-shell  
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' < /rgsoft/Desktop/Study/task/province_consumption_day_aggr.scala

1.3编写job4脚本

将任务4的SQL语句写入到provinceavgcmpregion.scala文件中,代码如下:

//(1)
spark.sql(
 """
   |create table if not exists `dws_ds_hudi`.`provinceavgcmpregion` (
   |`uuid` string,
   |`provinceid` int comment '省份表主键',
   |`provincename` string comment '省份名称',
   |`provinceavgconsumption` double comment '该省平均订单金额',
   |`region_id` int comment '地区表主键',
   |`region_name` string comment '地区名称',
   |`regionavgconsumption` double comment '地区平均订单金额',
   |`comparison` string comment '比较结果'
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'uuid'
   |)
   |""".stripMargin)

//(2)

spark.sql(
 """
   |insert overwrite `dws_ds_hudi`.`provinceavgcmpregion`
   |select uuid() as `uuid`,a.`province_id`,a.`province_name`,a.`provinceavgconsumption`,a.`region_id`,a.`region_name`,a.`regionavgconsumption`,
   | case
   |   when a.`provinceavgconsumption`=a.`regionavgconsumption` then '相等'
   |   when a.`provinceavgconsumption`>a.`regionavgconsumption` then '高'
   |   else '低'
   | end as `comparison`
   | from
   |(select distinct
   |province_id,
   |province_name,
   |sum( case when year=2020 and month=4 then `total_amount` else 0 end) over(partition by province_id )/sum(case when year=2020 and month=4 then `total_count` else 0 end) over(partition by province_id ) as `provinceavgconsumption`,
   |region_id,
   |region_name,
   |sum(`total_amount`) over(partition by region_id)/sum(`total_count`) over(partition by region_id) as `regionavgconsumption`
   |from `dws_ds_hudi`.`province_consumption_day_aggr`) a
   |
   |""".stripMargin)

//(3)
spark.sql(
 """
   | create temporary view tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_result?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'provinceavgcmpregion',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

//(4)
spark.sql(
 """
   |insert overwrite tmp
   |select
   |`provinceid`,
   |`provincename`,
   |`provinceavgconsumption`,
   |`region_id`,
   |`region_name`,
   |`regionavgconsumption`,
   |`comparison`
   |from `dws_ds_hudi`.`provinceavgcmpregion`
   |""".stripMargin)
//(5)
spark.sql("select * from tmp order by `provinceid` desc,`provinceavgconsumption` desc,`regionavgconsumption` desc limit 5").show()

新建provinceavgcmpregion.sh脚本文件,编写内容如下:

/opt/spark-3.2.3-bin-hadoop3.2/bin/spark-shell  
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' < /rgsoft/Desktop/Study/task/provinceavgcmpregion.scala

1.4编写azkaban脚本

新建computeindex234.project文件,定义工作流版本信息,编写内容如下:

azkaban-flow-version: 2.0

新建computeindex234.flow文件,定义工作流节点信息,编写内容如下:

nodes:
- name: job1
  type: command
  config:
    command: echo '开始'

- name: job2
  type: command
  dependsOn:
    - job1
  config:
    command: sh user_consumption_day_aggr.sh

- name: job3
  type: command
  dependsOn:
    - job1
  config:
    command: sh province_consumption_day_aggr.sh

- name: job4
  type: command
  dependsOn:
    - job1
  config:
    command: sh provinceavgcmpregion.sh
- name: endjob
  type: command
  dependsOn:
    - job2
    - job3
    - job4
  config:
    command: echo '结束'

1.5打包上传azkaban脚本

在终端执行如下命令,将上面的所有sh脚本文件和azkaban配置文件压缩到zip文件中。

zip  -r computeindex234.zip  computeindex234.*  *.sh

在终端执行如下命令,启动azkaban

cd  /opt/azkaban-solo-server-3.91.0/
bin/start-solo.sh

切换界面到可视化桌面

打开火狐浏览器输入http://127.0.0.1:58081/,并输入账号 azkaban和密码azkaban进行登录

在azkaban中创建工程

在azkaban的工程中上传打包好的压缩包文件

1.6执行azkaban脚本

任务要求2

根据dwd_ds_hudi层表统计每人每天下单的数量和下单的总金额,存入Hudi的dws_ds_hudi层的user_consumption_day_aggr表中(表结构如下),然后使用spark -shell按照客户主键、订单总金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

字段 类型 中文含义 备注
uuid string 随机字符 随机字符,保证不同即可,作为primaryKey
user_id int 客户主键  
user_name string 客户名称  
total_amount double 订单总金额 当天订单总金额。
total_count int 订单总数 当天订单总数。同时可作为preCombineField(作为合并字段时,无意义,因为主键为随机生成)
year int 订单产生的年,为动态分区字段
month int 订单产生的月,为动态分区字段
day int 订单产生的日,为动态分区字段

2.1实现流程概要

连接spark
spark-shell  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

2.2创建DWS层表

根据任务要求给出的表结构,在hudi中创建表,建表语句如下:

spark.sql(
 """
   |create table if not exists `dws_ds_hudi`.`user_consumption_day_aggr` (
   |`uuid` string comment '随机字符',
   |`user_id` int comment '客户主键',
   |`user_name` string comment '客户名称',
   |`total_amount` double comment '订单总金额',
   |`total_count` int comment '订单总数',
   |`year` int comment '年',
   |`month` int comment '月',
   |`day` int comment '日'
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'uuid',
   | preCombineField = 'total_count'
   |)
   |partitioned by (`year`,`month`,`day`);
   |""".stripMargin)

2.3统计每个用户每天的消费金额

也就是根据用户id和年月日分组,然后对订单的金额求和,就是每个用户每天的消费金额。需要注意的是如何从时间中获取年月日,在这里可以用SUBSTRING函数来截取时间字符串中的年、月、日,截取后再转换为整型。

cast(SUBSTRING(a.`create_time`,0,4) as int) -- 年 --
cast(SUBSTRING(a.`create_time`,5,2) as int) -- 月 --
cast(SUBSTRING(a.`create_time`,8,2) as int) -- 日 --

通过uuid()函数来生成一个随机数作为主键。由于uuid()函数生成的随机数是永远不会重复的,所以可以作为主键。 由于任务要求的表结构中必须要有用户名称,但是用户名称并不在订单表中,所以需要关联查询用户表,完整的统计sql语句如下:

spark.sql(
 """ insert overwrite `dws_ds_hudi`.`user_consumption_day_aggr`
   | select
   | uuid() as `uuid`,
   | a.`user_id`,
   | max(b.`name`) as `user_name`,
   | sum(coalesce(a.`total_amount`,0)) as `total_amount`,
   | count(a.`id`) `total_count`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) as `year`,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) as `month`,
   | cast(SUBSTRING(a.`create_time`,8,2) as int) as `day`
   | from `dwd_ds_hudi`.`fact_order_info` a
   | left join
   | (select * from `dwd_ds_hudi`.`dim_user_info` where etl_date='20230901') b
   | on a.`user_id`=b.`id`
   | group by
   | a.`user_id`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) ,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) ,
   | cast(SUBSTRING(a.`create_time`,8,2) as int)
   |""".stripMargin)

2.4查看统计结果

spark.sql("select * from `dws_ds_hudi`.`user_consumption_day_aggr` order by `user_id` desc, `total_amount` desc limit 5  ").show()

任务要求3

根据dwd_ds_hudi库中的表统计每个省每月下单的数量和下单的总金额,并按照year,month,region_id进行分组,按照total_amount逆序排序,形成sequence值,将计算结果存入Hudi的dws_ds_hudi数据库province_consumption_day_aggr表中(表结构如下),然后使用spark-shell根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,在查询时对于订单总金额字段将其转为bigint类型(避免用科学计数法展示),将SQL语句复制粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

字段 类型 中文含义 备注
uuid string 随机字符 随机字符,保证不同即可,作为primaryKey
province_id int 省份表主键  
province_name string 省份名称  
region_id int 地区主键  
region_name string 地区名称  
total_amount double 订单总金额 当月订单总金额
total_count int 订单总数 当月订单总数。同时可作为preCombineField(作为合并字段时,无意义,因为主键为随机生成)
sequence int 次序  
year int 订单产生的年,为动态分区字段
month int 订单产生的月,为动态分区字段

3.1实现流程概要

3.2创建dws层表结构

根据任务要求给出的表结构,在hudi中创建表,建表语句如下:

spark.sql(
 """
   |create table if not exists `dws_ds_hudi`.`province_consumption_day_aggr` (
   |`uuid` string comment '随机字符',
   |`province_id` int comment '省份表主键',
   |`province_name` string comment '省份名称',
   |`region_id` int comment '地区主键',
   |`region_name` string comment '地区名称',
   |`total_amount` double comment '订单总金额',
   |`total_count` int comment '订单总数',
   |`sequence` int comment '次序',
   |`year` int comment '年',
   |`month` int comment '月'
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'uuid',
   | preCombineField = 'total_count'
   |)
   |partitioned by (`year`,`month`);
   |""".stripMargin)

3.3按月统计省份订单总金额和订单数量

参考上一个任务获取年月日的方式获取年和月的值,由于任务中要求的表结构中要有省份和地区的名称和id信息,所以需要订单表关联查询地区表和省份表,才能获取到对应的信息。根据省份id,地区id,年,月分组后,对订单金额和数量求和就是我们需要的结果。sql语句如下

spark.sql(
 """ select
   | a.`province_id`,
   | max(b.`name`) as `province_name`,
   | b.`region_id`,
   | max(c.`region_name`) as region_name ,
   | sum(coalesce(a.`total_amount`,0)) as `total_amount`,
   | count(a.`id`) `total_count`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) as `year`,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) as `month`
   | from `dwd_ds_hudi`.`fact_order_info` a
   | left join
   | (select * from `dwd_ds_hudi`.`dim_province` where etl_date='20230901') b
   | on a.`province_id`=b.`id`
   | left join
   | (select * from `dwd_ds_hudi`.`dim_region` where etl_date='20230901') c
   | on b.`region_id`=c.`id`
   | group by
   | a.`province_id`,
   | b.`region_id`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) ,
   | cast(SUBSTRING(a.`create_time`,5,2) as int)
   |""".stripMargin)

3.4对统计结果生成排序并将最终结果写入表

在上面一个步骤中统计了各个省份的月订单金额和订单数量,只需要对上面的结果生成序号就是任务要求的最终结果。序号的生成可以通过row_number()函数来实现,需要根据年、月、地区分区,订单金额进行倒序排序,row_number()函数使用示例如下:

row_number() over (partition by `year`,`month`,`region_id` order by `total_amount` desc  ) as `sequence`

解决排名问题后,整合上一步骤的sql,完整的SQL语句如下:

spark.sql(
 """ insert overwrite `dws_ds_hudi`.`province_consumption_day_aggr`
   | select
   | uuid() as `uuid`,
   | d.`province_id`,
   | d.`province_name`,
   | d.`region_id`,
   | d.`region_name` ,
   | d.`total_amount`,
   | d.`total_count`,
   | row_number() over (partition by `year`,`month`,`region_id` order by `total_amount` desc ) as `sequence`,
   | d.`year`,
   | d.`month`
   | from
   | (select
   | a.`province_id`,
   | max(b.`name`) as `province_name`,
   | b.`region_id`,
   | max(c.`region_name`) as region_name ,
   | sum(coalesce(a.`total_amount`,0)) as `total_amount`,
   | count(a.`id`) `total_count`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) as `year`,
   | cast(SUBSTRING(a.`create_time`,5,2) as int) as `month`
   | from `dwd_ds_hudi`.`fact_order_info` a
   | left join
   | (select * from `dwd_ds_hudi`.`dim_province` where etl_date='20230901') b
   | on a.`province_id`=b.`id`
   | left join
   | (select * from `dwd_ds_hudi`.`dim_region` where etl_date='20230901') c
   | on b.`region_id`=c.`id`
   | group by
   | a.`province_id`,
   | b.`region_id`,
   | cast(SUBSTRING(a.`create_time`,0,4) as int) ,
   | cast(SUBSTRING(a.`create_time`,5,2) as int)
   | ) d
   |""".stripMargin)

3.5查看统计结果

spark.sql("select * from `dws_ds_hudi`.`province_consumption_day_aggr`order by `total_count` desc,`total_amount` desc,`province_id` desc limit 5 ").show()

任务要求4

请根据dws_ds_hudi库中的表计算出每个省份2020年4月的平均订单金额和该省所在地区平均订单金额相比较结果(“高/低/相同”),存入ClickHouse数据库shtd_result的provinceavgcmpregion表中(表结构如下),然后在Linux的ClickHouse命令行中根据省份表主键、省平均订单金额、地区平均订单金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

字段 类型 中文含义 备注
provinceid int 省份表主键  
provincename text 省份名称  
provinceavgconsumption double 该省平均订单金额  
region_id int 地区表主键  
region_name text 地区名称  
regionavgconsumption double 地区平均订单金额 该省所在地区平均订单金额
comparison text 比较结果 省平均订单金额和该省所在地区平均订单金额比较结果,值为:高/低/相同

4.1实现流程概要

4.2创建MySQL数据库和表

在终端执行如下命令连接MySQL

mysql -uadmin -p123456

在MySQL终端执行建库语句

create database if not exists shtd_result;

根据任务要求提供的表结构,在MySQL终端执行建表语句

create table if not exists `shtd_result`.`provinceavgcmpregion`(
`provinceid` int comment '省份表主键',
`provincename`  varchar(2000) comment '省份名称',
`provinceavgconsumption` double comment '该省平均订单金额',
`region_id` int comment '地区表主键',
`region_name`  varchar(2000) comment '地区名称',
`regionavgconsumption` double comment '地区平均订单金额',
`comparison`  varchar(2000) comment '比较结果'
)ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '' ROW_FORMAT = Dynamic;

4.3创建clickhouse数据库和表

在终端执行如下命令,启动clickhouse并查看clickhouse运行状态

service clickhouse-server start
service clickhouse-server status

在终端执行如下命令,连接clickhouse

clickhouse-client -u default --port 9800

在clickhouse终端执行建库语句

create database if not exists shtd_result;

在clickhouse终端执行建表语句,映射表到MySQL的provinceavgcmpregion表

 create table if not exists shtd_result.provinceavgcmpregion 
(provinceid int,
provincename text,
provinceavgconsumption double,
region_id int ,
region_name text,
regionavgconsumption double,
comparison text
)ENGINE = MySQL('127.0.0.1:3306','shtd_result','provinceavgcmpregion','root1','123456');

4.4连接spark

spark-shell  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

4.5创建dws表

根据任务要求提供的表结构,建立dws_ds_hudi.provinceavgcmpregion hudi表。建表语句如下:

spark.sql(
 """
   |create table if not exists `dws_ds_hudi`.`provinceavgcmpregion` (
   |`uuid` string,
   |`provinceid` int comment '省份表主键',
   |`provincename` string comment '省份名称',
   |`provinceavgconsumption` double comment '该省平均订单金额',
   |`region_id` int comment '地区表主键',
   |`region_name` string comment '地区名称',
   |`regionavgconsumption` double comment '地区平均订单金额',
   |`comparison` string comment '比较结果'
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'uuid'
   |)
   |""".stripMargin)

4.6统计省份和地区的平均金额

在上一个任务中我们已经统计和各个省份各月份的订单金额,在这个任务中我们只需要查询上一个任务的结果表进行统计即可。根据任务要求统计的是2020年4月份的地区和省份平均订单金额。 省份的平均订单金额是将4月份的订单额除以数量。

sum( case when year=2020 and month=4 then `total_amount` else 0 end) over(partition by province_id )/sum(case when year=2020 and month=4 then `total_count` else 0 end) over(partition by province_id ) as `provinceavgconsumption`

地区的平均订单金额则需要根据地区id分区后求平均值。

sum(`total_amount`) over(partition by region_id)/sum(`total_count`) over(partition by  region_id)  as `regionavgconsumption`

完整的SQL如下:

spark.sql(
 """
   |select distinct
   |province_id,
   |province_name,
   |sum( case when year=2020 and month=4 then `total_amount` else 0 end) over(partition by province_id )/sum(case when year=2020 and month=4 then `total_count` else 0 end) over(partition by province_id ) as `provinceavgconsumption`,
   |region_id,
   |region_name,
   |sum(`total_amount`) over(partition by region_id)/sum(`total_count`) over(partition by region_id) as `regionavgconsumption`
   |from `dws_ds_hudi`.`province_consumption_day_aggr`
   |""".stripMargin)

4.7比较省份和地区的平均金额

在上一步骤中已经计算出了省份和地区的平均订单金额,根据任务要求需要比较后给出“相等”、“高”、“低”的评价,在这里我们可以使用case when语句来处理。结合上一步骤的SQL语句,完整的SQL语句如下:

spark.sql(
 """
   |select uuid() as `uuid`,a.`province_id`,a.`province_name`,a.`provinceavgconsumption`,a.`region_id`,a.`region_name`,a.`regionavgconsumption`,
   | case
   |   when a.`provinceavgconsumption`=a.`regionavgconsumption` then '相等'
   |   when a.`provinceavgconsumption`>a.`regionavgconsumption` then '高'
   |   else '低'
   | end as `comparison`
   | from
   |(select distinct
   |province_id,
   |province_name,
   |sum( case when year=2020 and month=4 then `total_amount` else 0 end) over(partition by province_id )/sum(case when year=2020 and month=4 then `total_count` else 0 end) over(partition by province_id ) as `provinceavgconsumption`,
   |region_id,
   |region_name,
   |sum(`total_amount`) over(partition by region_id)/sum(`total_count`) over(partition by region_id) as `regionavgconsumption`
   |from `dws_ds_hudi`.`province_consumption_day_aggr`) a
   |
   |""".stripMargin)

4.8将结果写入dws层表

将上一步骤的结果写入dws层的表中,SQL如下:

spark.sql(
 """
   |insert overwrite `dws_ds_hudi`.`provinceavgcmpregion`
   |select uuid() as `uuid`,a.`province_id`,a.`province_name`,a.`provinceavgconsumption`,a.`region_id`,a.`region_name`,a.`regionavgconsumption`,
   | case
   |   when a.`provinceavgconsumption`=a.`regionavgconsumption` then '相等'
   |   when a.`provinceavgconsumption`>a.`regionavgconsumption` then '高'
   |   else '低'
   | end as `comparison`
   | from
   |(select distinct
   |province_id,
   |province_name,
   |sum( case when year=2020 and month=4 then `total_amount` else 0 end) over(partition by province_id )/sum(case when year=2020 and month=4 then `total_count` else 0 end) over(partition by province_id ) as `provinceavgconsumption`,
   |region_id,
   |region_name,
   |sum(`total_amount`) over(partition by region_id)/sum(`total_count`) over(partition by region_id) as `regionavgconsumption`
   |from `dws_ds_hudi`.`province_consumption_day_aggr`) a
   |
   |""".stripMargin)

4.9映射MySQL表到spark

根据任务要求,需要将数据存入clickhouse,我们的解决办法是通过MySQL的provinceavgcmpregion作为中间表,在spark中将结果数据写入到MySQL表,然后在clickhouse中映射MySQL的provinceavgcmpregion表,这样就实现了分析结果写入clickhouse。所以在spark中我们也需要映射MySQL的provinceavgcmpregion表。sql语句如下:

spark.sql(
 """
   | create temporary view tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_result?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'provinceavgcmpregion',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

4.10将DWS层分析结果数据写入MySQL表

在dws层的provinceavgcmpregion表中已经写入了统计好的结果数据,我们只需要将dws层的数据查询出来并写入MySQL中。SQL语句如下:

spark.sql(
 """
   |insert overwrite tmp
   |select
   |`provinceid`,
   |`provincename`,
   |`provinceavgconsumption`,
   |`region_id`,
   |`region_name`,
   |`regionavgconsumption`,
   |`comparison`
   |from `dws_ds_hudi`.`provinceavgcmpregion`
   |""".stripMargin)

4.11查询写入结果

在spark中查询写入到MySQL的数据,SQL语句如下:

spark.sql("select * from tmp order by `provinceid` desc,`provinceavgconsumption` desc,`regionavgconsumption` desc limit 5").show()

4.12在clickhouse中查询结果数据

在Linux终端执行如下命令,连接clickhouse

clickhouse-client -u default --port 9800

在clickhouse终端执行如下命令,查询统计结果

select * from shtd_result.provinceavgcmpregion order by provinceid desc, provinceavgconsumption  desc,regionavgconsumption  desc limit 5
© 版权声明
THE END
喜欢就支持一下吧
点赞133赞赏 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容