2024年重庆甘肃安徽等省职业院校技能大赛_大数据应用开发样题解析-模块D:离线数据处理-任务二:离线指标计算

环境说明

Hive 的配置文件位于主节点/opt/module/hive-3.1.2/conf/

Spark 任务在Yarn 上用Client 运行,方便观察日志;

ClickHouse 的jdbc 连接端口8123,用户名/密码:default/123456

命令行客户端(tcp)端口9001;

建议使用gson 解析json 数据。

本任务共有3个子任务组成。单击以下链接,可快速跳转到相应的子任务部分:

  • 离线指标计算子任务1

  • 离线指标计算子任务2

  • 离线指标计算子任务3

子任务1

子任务1描述

1、编写Scala 工程代码,根据dwd 的订单表dwd.fact_order_master,求各省份下单时间为2022 年的支付转化率,并将计算结果按照下述表结构写入clickhouse 的ds_result 库的payment_cvr 表。 在Linux 的clickhouse 命令行中根据ranking 字段查询出转化率前三的省份,将SQL 语句与执行结果截图粘贴至客户端桌面【Release\模块D 提交结果.docx】中对应的任务序号下;

注:支付转化率= 完成支付的订单数/ 已下单数。

字段 类型 中文含义 备注
province string 省份名  
creat_order int 已下单数  
payment int 已支付的订单数  
payCVR float64 支付转化率 四舍五入保留三位小数
ranking int 转化率排名  

子任务1分析

(1) 什么是订单的支付转化率?

简单来说,支付转化率是在统计时间内,完成支付的订单数/ 已下单数,即已下单订单转化为实际支付订单的比例。

支付转化率的计算公式如下:

支付转化率 = 完成支付的订单数/ 已下单数

例如,假设有100个已下单的订单,最终其中30个完成了付款,则可知支付转化率=30/100=30%。

(2) 订单的状态都有哪些?

总共有5种订单状态,分别是:

  • 已下单

  • 已付款

  • 已发货

  • 已签收

  • 已退款

因此,我们只需要统计”已下单”和”已付款”两种类型的订单即可。

准备工作

在ClickHouse中创建名为ds_result的数据库(如果没有该数据库的话),命令如下:

xueai :) create database ds_result;

因spark jdbc的方式不支持在clickhouse中自动创建表结构,这里在插入前需要提前创建payment_cvr表:

xueai :) CREATE TABLE IF NOT EXISTS ds_result.payment_cvr(
  province String,
  creat_order UInt64,
  payment UInt64,
  payCVR Float64,
  ranking UInt64
)
ENGINE = MergeTree()
ORDER BY ranking;

子任务1实现

参考实现代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window

object CalculationJob01{

def main(args: Array[String]): Unit = {

  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("Data Calculation")
    .// 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", true)
    .enableHiveSupport()
    .getOrCreate()  

// 加载dwd的fact_order_master表,并过滤出2022年已下单和已付款的订单
spark.table("ss2024_ds_dwd.fact_order_master").createOrReplaceTempView("fact_order_master_tb")

val sql1 =
  """
  |select
  |   order_id,order_sn,order_status,province,create_time
  |from
  |   fact_order_master_tb
  |where
  |   (order_status='已下单' or order_status='已付款') and substr(create_time,0,4)='2022'
  |""".stripMargin

// 将过滤出的数据注册到临时视图
spark.sql(sql1).createOrReplaceTempView("order_2022_tb")

// 分别统计各省已下单数和各省已付款单数,然后join连接
val sql2 =
  """
  |with tb1 as(
  | select province, count(1) as creat_order
  | from   order_2022_tb
  | where order_status='已下单'
  | group by province
  |), tb2 as(
  | select province, count(1) as payment
  | from   order_2022_tb
  | where order_status='已付款'
  | group by province
  |)
  |select tb1.province, creat_order, payment
  |from tb1 join tb2 on tb1.province=tb2.province
  |""".stripMargin

spark.sql(sql2).show

/*
+--------+-----------+-------+
|province|creat_order|payment|
+--------+-----------+-------+
| 浙江省|     14727| 14727|
| 贵州省|       1557|   1557|
| 广东省|       279|   279|
| 上海市|     47652| 47651|
| 江苏省|     20945| 20945|
+--------+-----------+-------+
*/

// 从上面的输出可知,样本数据有点问题,计算的支付转换率都是100%
// 因此,人为制造不同的支付转化率(演示目的,比赛时的数据应当不会如此)
val grouped_order_df = spark.sql(sql2).withColumn("payment", $"payment"-(floor(rand() * 250) + 100))

grouped_order_df.show
/*
+--------+-----------+-------+
|province|creat_order|payment|
+--------+-----------+-------+
| 浙江省|     14727| 14554|
| 贵州省|       1557|   1451|
| 广东省|       279|     12|
| 上海市|     47652| 47394|
| 江苏省|     20945| 20842|
+--------+-----------+-------+
*/

// 统计支付转化率
val result_order_df = grouped_order_df.withColumn("payCVR", round($"payment"/$"creat_order",3))

// 计算排名-使用窗口函数
val windowSpec = Window.orderBy($"payCVR".desc)
val rank_order_df = result_order_df.withColumn("ranking",rank().over(windowSpec))

rank_order_df.show()
/*
+--------+-----------+-------+------+-------+
|province|creat_order|payment|payCVR|ranking|
+--------+-----------+-------+------+-------+
| 上海市|     47652| 47394| 0.995|     1|
| 江苏省|     20945| 20842| 0.995|     1|
| 浙江省|     14727| 14554| 0.988|     3|
| 贵州省|       1557|   1451| 0.932|     4|
| 广东省|       279|     12| 0.043|     5|
+--------+-----------+-------+------+-------+
*/

  // 将计算结果写入clickhouse 的ds_result 库的payment_cvr 表
  // 使用官方clickhouse驱动程序和连接信息
  val ckUrl = "jdbc:clickhouse://192.168.190.139:8123/ds_result" // 数据库连接url
  val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序
  val ckUser = "default"
  val ckPassword = ""
  val ckTable = "ds_result.payment_cvr"

  val props = new Properties
  props.put("driver", ckDriver)
  props.put("user", ckUser)
  props.put("password", ckPassword)

  // 写入clickhouse
  result.write.mode("append").jdbc(ckUrl,ckTable,props)
}
}

代码执行城后,在Linux 的clickhouse 命令行中根据ranking 字段查询出转化率前三的省份:

xueai :) select * from ds_result.payment_cvr where ranking<=3; 

查询结果如下:

xueai :) select * from ds_result.payment_cvr where ranking<=3; SELECT * FROM ds_result.payment_cvr WHERE ranking <= 3 ┌─province─┬─creat_order─┬─payment─┬─payCVR─┬─ranking─┐ │ 上海市 │ 47652 │ 47508 │ 0.997 │ 1 │ │ 浙江省 │ 14727 │ 14590 │ 0.991 │ 2 │ │ 江苏省 │ 20945 │ 20657 │ 0.986 │ 3 │ └──────────┴─────────────┴─────────┴────────┴─────────┘ 3 rows in set. Elapsed: 1.397 sec. 

子任务2

子任务2描述

2、编写Scala 工程代码,根据dwd 的fact_order_master 表最新分区关联fact_order_detail 表,计算所有订单中各商品所有订单(若该订单存在“已退款”状态则该订单不做计算,其余情况都参与计算)总销售金额(购买商品单价*购买商品数量)排名,并将计算结果按照下述表结构写入clickhouse的ds_result 库的sales_amount_rank 表。然后在Linux 的clickhouse 命令行中根据sales_rank 升序查询前5 行,将SQL 语句与执行结果截图粘贴至客户端桌面【Release\模块D 提交结果.docx】中对应的任务序号下;

sales_amount_rank 表结构:

字段 类型 中文含义 备注
product_id int 订单商品ID  
sales_amount float64 商品总销售金额 四舍五入保留两位小数
product_totalcnt int 商品销售总数  
sales_rank int 销售金额排名  

子任务2分析

本子任务的重点是“计算所有订单中各商品所有订单总销售金额排名”。分析这个题意,我理解为要求“计算每个商品的总销售金额排名”。即同一类商品在各个订单中都可能有,我们要把所有订单中同一类商品归为一组,然后统计该类商品的总销售金额,最后按总销售金额进行排名。

实现过程可分为两步:

  • 第1步,先统计每个商品在所有订单中的总销售金额(先计算小计金额,再group分组sum汇总金额);

  • 第2步,使用窗口排名函数对总销售金额进行排名。

对总销售金额进行排名,需要用到Spark的窗口排名函数rank或dense_rank。

从上一个子任务的分析中可以得知,总共有5种订单状态。而要统计每个商品的销售指标,需要考虑”已付款”和”已退款”两种状态的订单。

考虑到状态信息位于fact_order_master表中,而商品及商品销售数量、单价均位于fact_order_detail表中,所以需要关联 两个表来获取数据:

  • 1)使用order_sn做为连接列;

  • 2)从order_master表中找出所有订单状态为”已付款”或”已退款”的订单;

  • 3)从”已付款”订单中剔除”已退款”的订单;

  • 4)以order_sn做为连接列,关联order_master和order_detail表,找出所有已付款的订单明细;

  • 5)按product_id进行分组,分组统计每个商品的销售金额和销售数量;

  • 6)对最后的商品销售金额进行排名。

准备工作

在ClickHouse中创建名为ds_result的数据库(如果没有该数据库的话),命令如下:

xueai :) create database ds_result;

因spark jdbc的方式不支持在clickhouse中自动创建表结构,这里在插入前需要提前创建sales_amount_rank表:

xueai :) CREATE TABLE IF NOT EXISTS ds_result.sales_amount_rank(
  product_id UInt64,
  sales_amount Float64,
  product_totalcnt UInt64,
  sales_rank UInt64
)
ENGINE = MergeTree()
ORDER BY sales_rank;

子任务2实现

实现代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
import java.util.Properties

object CalculationJob02{

def main(args: Array[String]): Unit = {

// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("Data Calculation")
.// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", true)
.enableHiveSupport()
.getOrCreate()

// 加载order_master表中已付款的订单记录
val paymented_orders_df = spark.sql("select order_sn from ss2024_ds_dwd.fact_order_master where order_status='已付款'")

// 加载order_master表中已退款的订单记录
val refunded_orders_df = spark.sql("select order_sn from ss2024_ds_dwd.fact_order_master where order_status='已退款'")

// 从"已付款"订单中剔除"已退款"的订单
val valid_order_df = paymented_orders_df.except(refunded_orders_df)

// 从fact_order_detail中查询出所有已付款订单的明细
val valid_order_detail_df = spark
.table("ss2024_ds_dwd.fact_order_detail")
.select("order_sn","product_id","product_cnt","product_price")
.join(valid_order_df, "order_sn") // 关联到已付款订单编号

// 计算每个商品的小计金额:product_cnt * product_price。
// 以及每个商品的累计销售总金额和销售总数量
val product_sales = valid_order_detail_df
.withColumn("product_subtotal", col("product_cnt") * col("product_price"))
.groupBy("product_id")
.agg(sum("product_subtotal").as("sales_amount"), sum("product_cnt").as("product_totalcnt"))

// 定义窗口规范
val windowSpec = Window.orderBy(col("sales_amount").desc)

// 按销售金额进行排名
val product_sales_rank = product_sales.withColumn("sales_rank",rank().over(windowSpec))
product_sales_rank.show()
/*
+----------+------------+----------------+----------+
|product_id|sales_amount|product_totalcnt|sales_rank|
+----------+------------+----------------+----------+
| 599| 77042.24| 25| 1|
| 11976| 66334.68| 20| 2|
| 9193| 63572.52| 21| 3|
| 9344| 63331.29| 21| 4|
| 8048| 62307.63| 21| 5|
| 7138| 62303.85| 17| 6|
| 644| 61944.12| 18| 7|
| 7122| 61841.95| 19| 8|
| 8095| 61794.16| 17| 9|
| 1083| 61700.94| 18| 10|
| 889| 61529.16| 18| 11|
| 1250| 61260.80| 19| 12|
| 12609| 60857.26| 18| 13|
| 5437| 60768.68| 19| 14|
| 12483| 60382.08| 19| 15|
| 7489| 59980.20| 17| 16|
| 13563| 59397.51| 19| 17|
| 651| 59235.60| 16| 18|
| 14444| 58651.45| 19| 19|
| 6558| 58593.60| 20| 20|
+----------+------------+----------------+----------+
only showing top 20 rows
*/

// 将统计结果写入clickhouse
// 使用官方clickhouse驱动程序和连接信息
val ckUrl = "jdbc:clickhouse://192.168.190.139:8123/ds_result" // 数据库连接url
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序
val ckUser = "default"
val ckPassword = ""
val ckTable = "ds_result.sales_amount_rank"

val props = new Properties
props.put("driver", ckDriver)
props.put("user", ckUser)
props.put("password", ckPassword)
product_sales_rank.write.mode("append").jdbc(ckUrl,ckTable ,props)
}
}

执行以上代码,完成后在Linux 的clickhouse 命令行中根据sales_rank 升序查询前5 行:

xueai :) select * from ds_result.sales_amount_rank limit 5;

可以得到类似下面这样的查询结果:

xueai :) select * from ds_result.sales_amount_rank limit 5;

SELECT *
FROM ds_result.sales_amount_rank
LIMIT 5

┌─product_id─┬─sales_amount─┬─product_totalcnt─┬─sales_rank─┐
│       599 │     77042.24 │               25 │         1 │
│     11976 │     66334.68 │               20 │         2 │
│       9193 │     63572.52 │               21 │         3 │
│       9344 │     63331.29 │               21 │         4 │
│       8048 │     62307.63 │               21 │         5 │
└────────────┴──────────────┴──────────────────┴────────────┘

5 rows in set. Elapsed: 0.019 sec.

子任务3

子任务3描述

1、编写Scala 工程代码,根据dwd 的登录日志表dwd.log_customer_login,求login_time 字段值为2022-08-10 的最近连续三周登录的用户数,并将计算结果按照下述表结构写入clickhouse 的ds_result 库的continuous_3week表。然后在Linux 的clickhouse 命令行中根据active_total 降序查询,将SQL 语句与执行结果截图粘贴至客户端桌面【Release\模块D 提交结果.docx】中对应的任务序号下。

continuous_3week 表结构如下:

字段 类型 中文含义 备注
end_date string 数据统计日期 2022-08-10
active_total int 活跃用户数  
date_range string 统计周期 格式:统计开始时间_结束时间

date_range: 例:假设统计2022 年9 月8 日的连续三周登录用户数,则该字段值 应该为2022-08-22_2022-09-11。

子任务3分析

“求最近连续三周登录的用户数”,解题方法可参考Spark教程中的案例_统计连续三周登录的用户数

类似的任务,在离线指标计算部分经常遇到,解决的思路是相似的:

  • (1)找出最近三周登录的用户记录;

  • (2)进一步筛出这三周每周都登录的用户记录

  • (3)最后对筛出的记录按用户id去重、统计数量。

1)例如,找出最近三周登录的用户记录

登录MySQL,执行如下SQL语句:

mysql> select min(login_time),max(login_time) from customer_login_log;

输出内容如下:

+---------------------+---------------------+
| min(login_time)     | max(login_time)     |
+---------------------+---------------------+
| 2022-03-16 17:58:59 | 2022-09-11 05:35:03 |
+---------------------+---------------------+
1 row in set (0.35 sec)

假定统计日期(即现在)是2022-09-10(恰好是周六),则:

  • (1)确定当前 2022-09-10 是周几,然后求得周日的日期(也就是这三周的最后一天)。

  • (2)拿到 2022-09-10 这周的周日时间后,获取两周前的开始日期(也就是这三周的第一天)。

  • (3)筛选范围,计算每位用户是否符合三周的连续性。连续三周登录的用户:在当前日期之前三周的周活表中,此用户都存在。

任务中要求的是”2022-08-10 的最近连续三周”,即【2022-07-25,2022-08-14】。

2)疑问:

需求中要求的是“求最近连续三周登录的用户数”,得到的结果应该是最近连续三周登录的用户数,这时再“根据active_total降序查询”有什么意义呢???

准备工作

在ClickHouse中创建名为shtd_result的数据库(如果还没有该数据库的话),命令如下:

xueai :) create database ds_result;

因spark jdbc的方式不支持在clickhouse中自动创建表结构,这里在插入前需要提前创建表:

xueai :) CREATE TABLE IF NOT EXISTS ds_result.continuous_3week(
  end_date String,
  active_total UInt64,
  date_range String
)
ENGINE = MergeTree()
ORDER BY active_total;

子任务3实现

实现代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import java.util.Properties

object CalculationJob03{

def main(args: Array[String]): Unit = {

  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("Data Calculation")
    .// 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", true)
    .enableHiveSupport()
    .getOrCreate()  

  // 1)读取数据源
  // 读取DWD中的用户登录日志表数据到DataFrame中,并注册到临时视图中
  spark.table("ss2024_ds_dwd.log_customer_login").createOrReplaceTempView("log_data")

  // 2) 获取2022-08-10 这周的周日时间
  spark.sql(
        """
          |select
          |   customer_id,
          |   date(login_time) log_date,
          |   next_day("2022-08-10", "Sunday") date_end
          |from
          |   log_data
          |""".stripMargin).createOrReplaceTempView("first_data")

  // 3)筛选出最近三周的数据
  // 获取两周前的起始日期,筛选符合要求的数据,并计算每个登录日期所属周的周日日期
  spark.sql(
        """
          |select
          |   *
          |from
          |   (select
          |     customer_id,
          |     log_date,
          |     if(dayofweek(log_date)=1,log_date,next_day(log_date,"sunday")) log_date_week_end,
          |     date_sub(date_end,20) date_begin,
          |     date_end
          |   from
          |     first_data
          |   )t1
          |where
          |   log_date <= date_end
          |   and
          |   log_date >= date_begin
          |""".stripMargin)
      // 按user_id和log_date_week_end去重
      .dropDuplicates("customer_id","log_date_week_end")
      // 注册为临时表
      .createOrReplaceTempView("second_data")

  // 4)然后,统计最近三周连续登录3次的用户数:
  val result = spark.sql(
        """
          |select
          |   end_date,
          |   count(distinct customer_id) active_total,
          |   date_range
          |from
          |   (select
          |         "2022-08-10" end_date,
          |         customer_id,
          |         concat(date_begin,"_",date_end) date_range,
          |         count(*) loging_week_cnt
          |   from
          |         second_data
          |   group by
          |         end_date,
          |         customer_id,
          |         date_range
          |   ) t1
          |where
          |   loging_week_cnt=3
          |group by
          |   end_date,
          |   date_range
          |""".stripMargin)
       
  // 5)将求出的数据存入clickhouse。
  // 然后执行以下代码,将计算结果写入ClickHouse的ds_result库的continuous_3week表中
  // 使用官方clickhouse驱动程序和连接信息
  val ckUrl = "jdbc:clickhouse://192.168.190.139:8123/ds_result" // 数据库连接url
  val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver" // 驱动程序
  val ckUser = "default"
  val ckPassword = ""
  val ckTable = "ds_result.continuous_3week"

  val props = new Properties
  props.put("driver", ckDriver)
  props.put("user", ckUser)
  props.put("password", ckPassword)

  // 写入clickhouse
  result.write.mode("append").jdbc(ckUrl,ckTable,props)
}
}

然后在Linux 的clickhouse 命令行中根据active_total 降序查询:

xueai :) select * from ds_result.continuous_3week order by active_total desc;

查询结果如下:

xueai :) select * from ds_result.continuous_3week;

SELECT *
FROM ds_result.continuous_3week

┌─end_date───┬─active_total─┬─date_range────────────┐
│ 2022-08-10 │           2 │ 2022-07-25_2022-08-14 │
└────────────┴──────────────┴───────────────────────┘

1 rows in set. Elapsed: 0.046 sec.
© 版权声明
THE END
喜欢就支持一下吧
点赞133赞赏 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容