2024年重庆甘肃安徽等省职业院校技能大赛_大数据应用开发样题解析-模块B:数据采集-任务一:离线数据采集

任务描述

编写Scala 工程代码, 将MySQL 的ds_db01 库中表order_master、order_detail、coupon_info、coupon_use、product_browse、product_info、customer_inf 、customer_login_log 、order_cart 、customer_level_inf 、 customer_addr 的数据增量抽取到Hive 的ods 库中对应表order_master、order_detail、coupon_info、coupon_use、product_browse、product_info、customer_inf 、customer_login_log 、order_cart 、customer_level_inf 、customer_addr 中(ods 库中部分表没有数据,正常抽取即可)。

任务分析

如果这些表在后续的数据处理和计算中使用到,还情有可原;如果大部分表在后续的数据处理和计算中没有用到,这样的题目可谓是“老太太的裹脚布,又臭又长”。

任务描述中提到,“(ods 库中部分表没有数据,正常抽取即可)”,意思也就是,在ods库中部分表是没有存量数据的,对于这些表直接全量抽取即可;对于有存量数据的,则需要增量抽取。

但是,这里赛方挖了一个比赛环境的坑,即存量数据表(Hive表)是如何创建的?大家都知道,Hive的存储格式和Spark的存储格式是有所不同的,如果不知道存量数据Hive表的存储格式,则在使用Spark做ETL增量写入的时候,则会遇到格式不兼容的问题,从而造成写入失败。这时可能会遇到了类似下面这样的错误信息:

org.apache.spark.sql.AnalysisException: The format of the existing table ods.order_master is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;

综合以往赛方的出题水平,猜测赛方会使用Hive CLI手工创建ODS中的存量数据表(如果有的话),那么就涉及到一个问题,赛方创建的Hive存量表,会使用什么存储格式呢?

如果Hive表是用命令行创建的ods中的hive表,那么创建的时候可以指定不同的存储格式:TextFile、SequenceFile、RCFile、ORC、Parquet。默认情况下,不指定的话,是TextFile。

而Spark 的df.write默认的format格式是parquet + snappy。如果表是用hive命令行创建的,就不符合格式,所以就会报错。如果表是提前不存在的,那么就不会有什么问题。

那么,怎么知道已经存在的hive表是如何创建的呢?

在hive cli命令行下,执行如下命令:

hive> show create table ods.xxxx;

其中xxxx是表名称。这会显示创建表的完整语句,其中包括存储格式。

如果遇到这种情况,解决方法如下:

1)将format设置为Hive,则无论hive建表的时候使用的fileformat使用的是哪一种,都是没有关系的:

df.write.format("hive").mode("append").saveAsTable("ods.xxxx")

2)还有一种方式,就是先将DataFrame注册成临时表,然后通过SQL的方式执行insert语句进行插入:

df.createOrReplaceTempView("temp_tb")
spark.sql("insert into ods.xxxx select * from temp_tb")

准备工作

本案例在小白学苑PBCP2023平台上测试运行,当移植到官方竞赛平台时,请酌情调整。

为了测试运行能正常进行,首先进行如下准备工作:

1)启动Hadoop集群:

# start-dfs.sh
# start-yarn.sh

2)启动Hive元数据服务:

新打开一个终端窗口,运行如下命令,启动Hive元数据服务:

# hive --service metastore

不要关闭窗口,保持服务处于运行状态。

3)使用Hive CLI,创建本示例中要用到的Hive ODS数据库ss2024_ds_ods:

hive> create database ss2024_ds_ods;

4)单击右侧目录,可快速跳转到相应的子任务部分。

  • 子任务1参考解析

  • 子任务2参考解析

  • 子任务3参考解析

  • 子任务4参考解析

  • 子任务5参考解析

  • 子任务6参考解析

  • 子任务7参考解析

  • 子任务8参考解析

  • 子任务9参考解析

  • 子任务10参考解析

  • 子任务11参考解析

子任务1

子任务1描述

1、抽取ds_db01 库中order_master 的增量数据进入Hive 的ods 库中表order_master。根据ods.order_master 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段 为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.order_master命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务1分析

首先,探索MySQL ds_db01库中order_master表中的数据。

使用如下命令,查看order_master表的结构:

mysql> desc order_master;
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| Field             | Type             | Null | Key | Default           | Extra                       |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| order_id           | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| order_sn           | varchar(100)     | NO   |     | NULL             |                             |
| customer_id       | int(10) unsigned | NO   |     | NULL             |                             |
| shipping_user     | varchar(10)     | NO   |     | NULL             |                             |
| province           | varchar(200)     | NO   |     | NULL             |                             |
| city               | varchar(200)     | NO   |     | NULL             |                             |
| address           | varchar(100)     | NO   |     | NULL             |                             |
| order_source       | tinyint(4)       | NO   |     | NULL             |                             |
| payment_method     | tinyint(4)       | NO   |     | NULL             |                             |
| order_money       | decimal(8,2)     | NO   |     | NULL             |                             |
| district_money     | decimal(8,2)     | NO   |     | 0.00             |                             |
| shipping_money     | decimal(8,2)     | NO   |     | 0.00             |                             |
| payment_money     | decimal(8,2)     | NO   |     | 0.00             |                             |
| shipping_comp_name | varchar(10)     | YES |     | NULL             |                             |
| shipping_sn       | varchar(50)     | YES |     | NULL             |                             |
| create_time       | varchar(100)     | NO   |     | NULL             |                             |
| shipping_time     | varchar(100)     | NO   |     | NULL             |                             |
| pay_time           | varchar(100)     | NO   |     | NULL             |                             |
| receive_time       | varchar(100)     | NO   |     | NULL             |                             |
| order_status       | varchar(50)     | YES |     | NULL             |                             |
| order_point       | int(10) unsigned | NO   |     | 0                 |                             |
| invoice_title     | varchar(100)     | YES |     | NULL             |                             |
| modified_time     | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
23 rows in set (0.14 sec)

使用如下命令,查看order_master表中前5条记录:

mysql> select * from order_master limit 5;
+----------+------------------+-------------+---------------+-----------+--------------------+------------------------------------------------------------------------------------------+--------------+----------------+-------------+----------------+----------------+---------------+--------------------+---------------+----------------+----------------+----------------+----------------+--------------+-------------+-----------------------------------+---------------------+
| order_id | order_sn         | customer_id | shipping_user | province | city               | address                                                                                 | order_source | payment_method | order_money | district_money | shipping_money | payment_money | shipping_comp_name | shipping_sn   | create_time   | shipping_time | pay_time       | receive_time   | order_status | order_point | invoice_title                     | modified_time       |
+----------+------------------+-------------+---------------+-----------+--------------------+------------------------------------------------------------------------------------------+--------------+----------------+-------------+----------------+----------------+---------------+--------------------+---------------+----------------+----------------+----------------+----------------+--------------+-------------+-----------------------------------+---------------------+
|       1 | 2022031631349116 |       9097 | 孙燕         | 上海市   | 上海市             | 上海市上海市水城南路854598号17层                                                         |           1 |             3 |     2177.28 |       1306.37 |         22.12 |       893.03 | 韵达               | 9339334312635 | 20220317170357 | NULL           | NULL           | NULL           | 已下单       |         89 | 南康信息有限公司                 | 2022-03-17 17:03:57 |
|       2 | 2022031631349116 |       9097 | 孙燕         | 上海市   | 上海市             | 上海市上海市水城南路854598号17层                                                         |           1 |             3 |     2177.28 |       1306.37 |         22.12 |       893.03 | 韵达               | 9339334312635 | 20220317170357 | NULL           | 20220317214157 | NULL           | 已付款       |         89 | 南康信息有限公司                 | 2022-03-17 21:41:57 |
|       3 | 2022031631349116 |       9097 | 孙燕         | 上海市   | 上海市             | 上海市上海市水城南路854598号17层                                                         |           1 |             3 |     2177.28 |       1306.37 |         22.12 |       893.03 | 韵达               | 9339334312635 | 20220317170357 | 20220319035357 | 20220317214157 | NULL           | 已发货       |         89 | 南康信息有限公司                 | 2022-03-19 03:53:57 |
|       4 | 2022031631349116 |       9097 | 孙燕         | 上海市   | 上海市             | 上海市上海市水城南路854598号17层                                                         |           1 |             3 |     2177.28 |       1306.37 |         22.12 |       893.03 | 韵达               | 9339334312635 | 20220317170357 | 20220319035357 | 20220317214157 | 20220320231157 | 已签收       |         89 | 南康信息有限公司                 | 2022-03-20 23:11:57 |
|       5 | 2022031625520775 |       18577 | 尹柳         | 浙江省   | 浙江省宁波市       | 浙江省宁波市中山东路10832665号世纪东方广场一楼F10162665号商铺21层                       |           1 |             4 |     3277.42 |       1310.97 |         47.82 |       2014.27 | 韵达               | 4897398474485 | 20220317041801 | NULL           | NULL           | NULL           | 已下单       |         201 | 时空盒数字传媒有限公司           | 2022-03-17 04:18:01 |
+----------+------------------+-------------+---------------+-----------+--------------------+------------------------------------------------------------------------------------------+--------------+----------------+-------------+----------------+----------------+---------------+--------------------+---------------+----------------+----------------+----------------+----------------+--------------+-------------+-----------------------------------+---------------------+
5 rows in set (0.00 sec)

使用如下命令,找出order_master表中订单的modified_time字段值表示的时间范围:

mysql> select min(modified_time),max(modified_time) from order_master;
+---------------------+---------------------+
| min(modified_time) | max(modified_time) |
+---------------------+---------------------+
| 2022-03-17 02:59:00 | 2022-09-16 09:44:51 |
+---------------------+---------------------+

假设Hive 的ODS库的存量表中存储了2022-03-17这一天的数据。为此,我们需要在Hive中手工创建ods.order_master存量表。我们将按以下步骤实现:

1)在hive cli中,开户动态分区支持。

动态分区默认是没有开启。可以通过执行以下命令开启动态分区。

hive> set hive.exec.dynamic.partition.mode=nonstrict;       // 分区模式,默认strict        
hive> set hive.exec.dynamic.partition=true;                       // 开启动态分区,默认false
hive> set hive.exec.max.dynamic.partitions=1000; //最大动态分区数, 设为1000

2)在Hive中,创建ODS数据库,命令如下:

hive> create database ss2024_ds_ods;

3)在hive cli中,创建存量表order_master(分区表)。Hive SQL命令如下:

hive> create table if not exists ss2024_ds_ods.order_master(
order_id bigint,
order_sn string,
customer_id bigint,
shipping_user string,
province string,
city string,
address string,
order_source int,
payment_method int,
order_money decimal(8,2),
district_money decimal(8,2),
shipping_money decimal(8,2),
payment_money decimal(8,2),
shipping_comp_name string,
shipping_sn string,
create_time string,
shipping_time string,
pay_time string,
receive_time string,
order_status string,
order_point bigint,
invoice_title string,
modified_time timestamp
)
partitioned by (etl_date string)
row format delimited
fields terminated by ','
null defined as 'NULL'
stored as textfile
tblproperties("serialization.null.format"='NULL');

4)在Hive CLI中,手工加载存量数据(即2022-03-17这一天的订单数据)到order_master表中:

将存量数据加载到Hive的ods.order_master分区表中:

hive> load data local inpath '/opt/data/order_master_20220317.csv' overwrite into table ss2024_ds_ods.order_master partition(etl_date='20231224');

子任务1实现

首先,创建一个Spark Maven项目(名称等自己任意取),并配置好jdk、scala sdk、hive集成,以及pom.xml文件等。

编写Spark ETL程序,从mysql的ds_db01.order_master表中抽取增量数据到Hive的ods.order_master表中。

抽取路径 jdbc -> dataframe -> hive ods。实现代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

    // (1) 计算出ODS库中存量数据的最大时间
  val max_modified_time_sql = "select max(modified_time) from ss2024_ds_ods.order_master"

  val df1 = spark.sql(max_modified_time_sql)
  val max_time = df1.first.getAs[java.sql.Timestamp](0)     // 2022-03-17 23:32:18.0
  // max_time

  // (2) ETL抽取

  // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // 增量查询的SQL语句
  val increment_order_master_sql = s"select * from order_master where modified_time > '${max_time}'"

  // ETL管道
  spark.read.format("jdbc")               // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver")   // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url)   // 连接url
    .option("query", increment_order_master_sql)   // 要读取的表
    .option("user", "root")   // 连接账户
    .option("password","admin")   // 连接密码
    .load()                                                                   // 加载
    .withColumn("etl_date",lit("20231225"))               // 增加分区列
    // 写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("append")               // 追加
    .saveAsTable("ss2024_ds_ods.order_master")  

    spark.stop()
}
}

子任务2

子任务2描述

2、抽取ds_db01 库中order_detail 的增量数据进入Hive 的ods 库中表order_detail。根据ods.order_detail 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.order_detail命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务2分析

首先,探索MySQL ds_db01库中order_detail表中的数据。

使用如下命令,查看order_detail表的结构:

mysql> desc order_detail;
+-----------------+------------------+------+-----+-------------------+-----------------------------+
| Field           | Type             | Null | Key | Default           | Extra                       |
+-----------------+------------------+------+-----+-------------------+-----------------------------+
| order_detail_id | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| order_sn       | varchar(100)     | NO   |     | NULL             |                             |
| product_id     | int(10) unsigned | NO   |     | NULL             |                             |
| product_name   | varchar(50)     | NO   |     | NULL             |                             |
| product_cnt     | int(11)         | NO   |     | 1                 |                             |
| product_price   | decimal(8,2)     | NO   |     | NULL             |                             |
| average_cost   | decimal(8,2)     | NO   |     | NULL             |                             |
| weight         | float           | YES |     | NULL             |                             |
| fee_money       | decimal(8,2)     | NO   |     | 0.00             |                             |
| w_id           | int(10) unsigned | NO   |     | NULL             |                             |
| create_time     | varchar(200)     | NO   |     | NULL             |                             |
| modified_time   | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-----------------+------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)

使用如下命令,查看order_master表中前5条记录:

mysql> select * from order_detail limit 5;
+-----------------+------------------+------------+--------------------------------------------------------------+-------------+---------------+--------------+---------+-----------+------+----------------+---------------------+
| order_detail_id | order_sn         | product_id | product_name                                                 | product_cnt | product_price | average_cost | weight | fee_money | w_id | create_time   | modified_time       |
+-----------------+------------------+------------+--------------------------------------------------------------+-------------+---------------+--------------+---------+-----------+------+----------------+---------------------+
|               1 | 2022031631349116 |       752 | 勒姆森无线蓝牙耳机头戴式重低音电脑通用                       |           4 |       544.32 |         0.00 | 4.94507 |     82.49 | 118 | 20220317094052 | 2022-03-18 09:17:52 |
|               2 | 2022031625520775 |     14307 | 新品无线蓝牙耳机卡通女生可爱运动跑步适用                     |           1 |       775.58 |         0.00 | 3.29853 |     72.56 | 1285 | 20220317182852 | 2022-03-18 09:14:52 |
|               3 | 2022031625520775 |       5775 | ipad蓝牙键盘鼠标苹果华为手机平板女生可爱                     |           4 |       625.46 |         0.00 | 9.80734 |     44.76 | 837 | 20220317142959 | 2022-03-18 03:03:59 |
|               4 | 2022031650972261 |     11237 | 兽角族网红发光学生头戴式女生猫耳朵无线蓝                     |           1 |       871.79 |         0.00 | 5.10348 |     24.87 | 1227 | 20220317140212 | 2022-03-18 07:36:12 |
|               5 | 2022031650972261 |     11553 | 狂热者无线大电量蓝牙耳机通用运动tws                         |           4 |       940.35 |         0.00 | 8.49109 |     38.26 | 381 | 20220317141414 | 2022-03-18 06:08:14 |
+-----------------+------------------+------------+--------------------------------------------------------------+-------------+---------------+--------------+---------+-----------+------+----------------+---------------------+
5 rows in set (0.03 sec)

使用如下命令,找出order_detail表中订单明细的modified_time字段值表示的时间范围:

mysql> select min(modified_time),max(modified_time) from order_detail;
+---------------------+---------------------+
| min(modified_time) | max(modified_time) |
+---------------------+---------------------+
| 2022-03-17 19:25:09 | 2022-09-12 14:45:45 |
+---------------------+---------------------+

假设Hive 的ODS库的存量表中存储了2022-03-17这一天的数据。为此,我们需要在Hive中手工创建ods.order_detail存量表。我们将按以下步骤实现:

1)在hive cli中,创建存量表order_detail(分区表)。Hive SQL命令如下:

hive> create table if not exists ss2024_ds_ods.order_detail(
order_detail_id bigint,
order_sn string,
product_id bigint,
product_name string,
product_cnt bigint,
product_price decimal(8,2),
average_cost decimal(8,2),
weight float,
fee_money decimal(8,2),
w_id bigint,
create_time string,
modified_time timestamp
)
partitioned by (etl_date string)
row format delimited
fields terminated by ','
null defined as 'NULL'
stored as textfile
tblproperties("serialization.null.format"='NULL');

2)将存量数据加载到Hive的ods.order_detail分区表中:

hive> load data local inpath '/opt/data/order_detail_20220317.csv' overwrite into table ss2024_ds_ods.order_detail partition(etl_date='20231224');

子任务2实现

编写Spark ETL程序,从mysql的ds_db01.order_detail表中抽取增量数据到Hive的ods.order_detail表中。

实现代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

    // (1) 计算出ODS库中存量数据的最大时间
    val max_modified_time_sql = "select max(modified_time) from ss2024_ds_ods.order_detail"

    val df1 = spark.sql(max_modified_time_sql)
    val max_time = df1.first.getAs[java.sql.Timestamp](0)     // 2022-03-17 23:48:20.0

    // (2) mysql中执行增量查询的语句

    // 定义数据库连接url
    val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

    // 增量查询的SQL语句
    val increment_order_detail_sql = s"select * from order_detail where modified_time > '${max_time}'"

    // 增量查询
    val increment_order_detail_df = spark.read.format("jdbc")               // jdbc数据源
      .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
      .option("url", db_url) // 连接url
      .option("query", increment_order_detail_sql) // 要读取的表
      .option("user", "root") // 连接账户
      .option("password","admin") // 连接密码
      .load()                                                           // 加载

    // 分区追加写入Hive ODS
    // 可以将查询出的增量数据注册为临时表,然后执行inset into ... select ...语句插入Hive ODS存量分区表中:
    increment_order_detail_df.createOrReplaceTempView("temp_tb")
    spark.sql("insert into ss2024_ds_ods.order_detail partition(etl_date='20241225') select * from temp_tb")

    spark.stop()
}
}

子任务3

子任务3描述

3、抽取ds_db01 库中coupon_info 的增量数据进入Hive 的ods 库中表coupon_info,根据ods.coupon_info 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段 为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.coupon_info命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务3分析

前两个子任务已经演示了增量写入Hive ODS表的方法。从本子任务开始,在后续的子任务中,为简单起见,均假设为全量抽取(即Hive ODS库中不存在相应的存量数据表)。

如果在比赛中遇到Hive ODS库中已经有了存量表的情况,请参考前两个子任务的增量抽取和追加写入方法。

在MySQL中查看源表coupon_info的数据结构和数据内容:

mysql> desc coupon_info;
+------------------+------------------+------+-----+-------------------+-----------------------------+
| Field           | Type             | Null | Key | Default           | Extra                       |
+------------------+------------------+------+-----+-------------------+-----------------------------+
| coupon_id       | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| coupon_name     | varchar(20)     | NO   |     | NULL             |                             |
| coupon_type     | tinyint(4)       | NO   |     | NULL             |                             |
| condition_amount | int(10) unsigned | NO   |     | NULL             |                             |
| condition_num   | int(10) unsigned | NO   |     | NULL             |                             |
| activity_id     | varchar(20)     | NO   |     | NULL             |                             |
| benefit_amount   | decimal(8,2)     | NO   |     | 0.00             |                             |
| benefit_discount | decimal(8,2)     | NO   |     | 0.00             |                             |
| modified_time   | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+------------------+------------------+------+-----+-------------------+-----------------------------+
9 rows in set (0.01 sec)

mysql> select * from coupon_info limit 5;
+-----------+---------------+-------------+------------------+---------------+---------------+----------------+------------------+---------------------+
| coupon_id | coupon_name   | coupon_type | condition_amount | condition_num | activity_id   | benefit_amount | benefit_discount | modified_time       |
+-----------+---------------+-------------+------------------+---------------+---------------+----------------+------------------+---------------------+
|         1 | 1折优惠券     |           1 |             1000 |             3 | 6569962031230 |           0.00 |             0.10 | 2022-09-09 06:23:07 |
|         2 | 2折优惠券     |           1 |             2000 |             3 | 4238315543752 |           0.00 |             0.20 | 2022-09-06 14:58:07 |
|         3 | 3折优惠券     |           1 |             3000 |             3 | 685749005648 |           0.00 |             0.30 | 2022-09-05 09:06:07 |
|         4 | 4折优惠券     |           1 |             4000 |             3 | 8020425915927 |           0.00 |             0.40 | 2022-09-07 10:19:07 |
|         5 | 5折优惠券     |           1 |             5000 |             3 | 4552013390624 |           0.00 |             0.50 | 2022-09-06 01:24:07 |
+-----------+---------------+-------------+------------------+---------------+---------------+----------------+------------------+---------------------+
5 rows in set (0.00 sec)

子任务3实现

编写Spark ETL程序,从mysql的ds_db01.coupon_info表中抽取增量数据到Hive的ods.coupon_info表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()
 
    // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")             // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url)           // 连接url
    .option("dbtable", "coupon_info")       // 要读取的表
    .option("user", "root")           // 连接账户
    .option("password","admin")       // 连接密码
    .load()                                               // 加载
    .withColumn("etl_date",lit("20231225")) // 增加分区列
    // 分区追加写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")             // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.coupon_info")
   
  spark.stop()
}
}

子任务4

子任务4描述

4、抽取ds_db01 库中coupon_use 的增量数据进入Hive 的ods 库中表coupon_use,增量字段取ods.coupon_use 表中get_time、used_time、pay_time 中的最大者,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用Hive Cli 查询最新分区数据总条数,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务4分析

在MySQL中查看源表coupon_use的数据结构和数据内容:

mysql> desc coupon_use;
+---------------+------------------+------+-----+---------+----------------+
| Field         | Type             | Null | Key | Default | Extra         |
+---------------+------------------+------+-----+---------+----------------+
| coupon_use_id | int(10) unsigned | NO   | PRI | NULL   | auto_increment |
| coupon_id     | int(10) unsigned | NO   |     | NULL   |               |
| customer_id   | int(10) unsigned | NO   |     | NULL   |               |
| order_sn     | varchar(100)     | NO   |     | NULL   |               |
| coupon_status | varchar(20)     | NO   |     | NULL   |               |
| get_time     | varchar(200)     | NO   |     | NULL   |               |
| used_time     | varchar(200)     | NO   |     | NULL   |               |
| pay_time     | varchar(200)     | YES |     | NULL   |               |
+---------------+------------------+------+-----+---------+----------------+
8 rows in set (0.04 sec)

mysql> select * from coupon_use limit 5;
+---------------+-----------+-------------+------------------+---------------+----------------+----------------+----------------+
| coupon_use_id | coupon_id | customer_id | order_sn         | coupon_status | get_time       | used_time     | pay_time       |
+---------------+-----------+-------------+------------------+---------------+----------------+----------------+----------------+
|             1 |       15 |       9097 | 0               | 未使用       | 20220315034155 | NULL           | NULL           |
|             2 |         6 |       9097 | 2022031631349116 | 已使用       | 20220308181755 | 20220317204555 | 20220317204555 |
|             3 |       12 |       18577 | 0               | 未使用       | 20220308214353 | NULL           | NULL           |
|             4 |         4 |       18577 | 2022031625520775 | 已使用       | 20220313163159 | 20220318015359 | 20220318015359 |
|             5 |         0 |       14486 | 0               | 未使用       | 20220312112310 | NULL           | NULL           |
+---------------+-----------+-------------+------------------+---------------+----------------+----------------+----------------+
5 rows in set (0.10 sec)

请注意两点:

1)“增量字段取ods.coupon_use 表中get_time、used_time、pay_time 中的最大者”,有人会问如果地增量抽取,这怎么实现?可以试一下:greatest(max(get_time),max(used_time),max(pay_time))。

2)get_time、used_time、pay_time三个字段中可能有NULL字符串值。是否要处理?如果后续没有涉及什么计算,可以不处理。

子任务4实现

编写Spark ETL程序,从mysql的ds_db01.coupon_info表中抽取增量数据到Hive的ods.coupon_info表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

      // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")             // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url) // 连接url
    .option("dbtable", "coupon_use") // 要读取的表
    .option("user", "root") // 连接账户
    .option("password","admin") // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225"))   // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")   // 指定分区
    .format("hive")               // 注意这里
    .mode("overwrite")         // 覆盖写入
    .saveAsTable("ss2024_ds_ods.coupon_use")

  spark.stop()
}
}

子任务5

子任务5描述

5、抽取ds_db01 库中product_browse 的增量数据进入Hive 的ods 库中表product_browse,根据ods.product_browse 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd ) 。使用hive cli 执行show partitions ods.product_browse 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下。

子任务5分析

在MySQL中查看源表product_browse的数据结构和数据内容:

mysql> desc product_browse;
+---------------+------------------+------+-----+-------------------+-----------------------------+
| Field         | Type             | Null | Key | Default           | Extra                       |
+---------------+------------------+------+-----+-------------------+-----------------------------+
| log_id       | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| product_id   | int(10) unsigned | NO   |     | NULL             |                             |
| customer_id   | int(10) unsigned | NO   |     | NULL             |                             |
| gen_order     | int(10) unsigned | NO   |     | NULL             |                             |
| order_sn     | varchar(100)     | NO   |     | NULL             |                             |
| modified_time | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------------+------+-----+-------------------+-----------------------------+
6 rows in set (0.00 sec)

mysql> select * from product_browse limit 5;
+--------+------------+-------------+-----------+------------------+---------------------+
| log_id | product_id | customer_id | gen_order | order_sn         | modified_time       |
+--------+------------+-------------+-----------+------------------+---------------------+
|     1 |       752 |       9097 |         0 | 0               | 2022-03-15 00:54:50 |
|     2 |     12218 |       9097 |         0 | 0               | 2022-03-15 00:24:53 |
|     3 |     10756 |       9097 |         0 | 0               | 2022-03-15 14:26:55 |
|     4 |     14307 |       18577 |         1 | 2022031625520775 | 2022-03-15 04:59:50 |
|     5 |       8047 |       18577 |         0 | 0               | 2022-03-15 16:38:53 |
+--------+------------+-------------+-----------+------------------+---------------------+
5 rows in set (0.14 sec)

这里假设Hive ODS库中没有product_browse的存量数据表,因此仅做全量抽取。

子任务5实现

编写Spark ETL程序,从mysql的ds_db01.product_browse表中抽取增量数据到Hive的ods.product_browse表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()
   
  // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")           // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url) // 连接url
    .option("dbtable", "product_browse") // 要读取的表
    .option("user", "root") // 连接账户
    .option("password","admin") // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225")) // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.product_browse")

  spark.stop()
}
}

子任务6

子任务6描述

6、抽取ds_db01 库中product_info 的增量数据进入Hive 的ods 库中表product_info,根据ods.product_info 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.product_info命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务6分析

在MySQL中查看源表product_browse的数据结构和数据内容:

mysql> desc product_info;
+-------------------+--------------------------------------+------+-----+-------------------+-----------------------------+
| Field             | Type                                 | Null | Key | Default           | Extra                       |
+-------------------+--------------------------------------+------+-----+-------------------+-----------------------------+
| product_id       | int(10) unsigned                     | NO   | PRI | NULL             | auto_increment             |
| product_core     | char(16)                             | NO   |     | NULL             |                             |
| product_name     | varchar(200)                         | NO   |     | NULL             |                             |
| bar_code         | varchar(50)                         | NO   |     | NULL             |                             |
| brand_id         | int(10) unsigned                     | NO   |     | NULL             |                             |
| one_category_id   | smallint(5) unsigned                 | NO   |     | NULL             |                             |
| two_category_id   | smallint(5) unsigned                 | NO   |     | NULL             |                             |
| three_category_id | smallint(5) unsigned                 | NO   |     | NULL             |                             |
| supplier_id       | int(10) unsigned                     | NO   |     | NULL             |                             |
| price             | decimal(8,2)                         | NO   |     | NULL             |                             |
| average_cost     | decimal(18,2)                       | NO   |     | NULL             |                             |
| publish_status   | tinyint(4)                           | NO   |     | 0                 |                             |
| audit_status     | tinyint(4)                           | NO   |     | 0                 |                             |
| weight           | float                               | YES |     | NULL             |                             |
| length           | float                               | YES |     | NULL             |                             |
| height           | float                               | YES |     | NULL             |                             |
| width             | float                               | YES |     | NULL             |                             |
| color_type       | enum('red','yellow','blue','blacks') | YES |     | NULL             |                             |
| production_date   | datetime                             | NO   |     | NULL             |                             |
| shelf_life       | int(11)                             | NO   |     | NULL             |                             |
| descript         | text                                 | NO   |     | NULL             |                             |
| indate           | timestamp                           | NO   |     | CURRENT_TIMESTAMP |                             |
| modified_time     | timestamp                           | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------------+--------------------------------------+------+-----+-------------------+-----------------------------+
23 rows in set (0.05 sec)


mysql> select * from product_info limit 5;
+------------+---------------+--------------------------------------------------------------+---------------+----------+-----------------+-----------------+-------------------+-------------+--------+--------------+----------------+--------------+--------+--------+--------+-------+------------+---------------------+------------+-----------------------------------------------------------------------------------------------------------------+---------------------+---------------------+
| product_id | product_core | product_name                                                 | bar_code     | brand_id | one_category_id | two_category_id | three_category_id | supplier_id | price | average_cost | publish_status | audit_status | weight | length | height | width | color_type | production_date     | shelf_life | descript                                                                                                       | indate             | modified_time       |
+------------+---------------+--------------------------------------------------------------+---------------+----------+-----------------+-----------------+-------------------+-------------+--------+--------------+----------------+--------------+--------+--------+--------+-------+------------+---------------------+------------+-----------------------------------------------------------------------------------------------------------------+---------------------+---------------------+
|         1 | 1571120092163 | 蓝牙耳机无线双耳vivo适用苹果入耳式5.0                       | 1080918117235 |       7 |               1 |               5 |                 8 |         792 | 983.47 |       123.82 |             0 |           1 |   6.11 |   5.35 |   2.06 | 5.21 | yellow     | 2022-01-16 00:51:05 |         36 | 其实部分时候.行业日期国内地区但是以后帮助.                                                                     | 2022-08-28 05:13:45 | 2022-09-04 22:56:45 |
|         2 | 2255059528717 | 无线蓝牙耳机挂耳式单耳超长待机续航听歌入                     | 3973831768388 |       7 |               1 |               5 |                 8 |         792 | 728.01 |       466.48 |             1 |           1 |   3.4 |   0.52 |   7.06 | 3.06 | blacks     | 2022-01-14 18:28:56 |         36 | 选择就是任何只有.美国通过投资的人如何详细.                                                                     | 2022-08-30 19:12:45 | 2022-09-05 04:20:45 |
|         3 | 5366373717125 | 2020爆款真无线蓝牙耳机双耳学生可爱运动跑                     | 2660524385069 |       7 |               1 |               5 |                 8 |         792 | 687.65 |       232.00 |             0 |           0 |   7.24 |   6.4 |   6.22 | 9.47 | blacks     | 2022-04-30 08:25:52 |         12 | 报告直接提高这么.                                                                                               | 2022-08-30 11:14:45 | 2022-09-09 15:19:45 |
|         4 | 4472671214875 | 夏新无线蓝牙耳机5.1单双耳一对迷你隐形                       | 5825790330138 |       7 |               1 |               5 |                 8 |         792 | 962.75 |       199.66 |             0 |           1 |   9.58 |   4.49 |   9.19 | 9.71 | blacks     | 2022-08-25 01:18:23 |         12 | 怎么事情他们数据这么一直继续.出来产品经验.有关上海女人内容增加阅读投资中国.                                     | 2022-09-01 05:44:45 | 2022-09-06 21:36:45 |
|         5 | 6614983799697 | 新品无线蓝牙耳机卡通女生可爱运动跑步适用                     | 1422282276239 |       7 |               1 |               5 |                 8 |         792 | 699.05 |       168.48 |             0 |           1 |   3.3 |   4.54 |   6.58 | 8.86 | blue       | 2022-05-24 02:45:11 |         12 | 两个网站发展一次不能.项目觉得推荐.                                                                             | 2022-09-03 15:21:45 | 2022-09-10 12:12:45 |
+------------+---------------+--------------------------------------------------------------+---------------+----------+-----------------+-----------------+-------------------+-------------+--------+--------------+----------------+--------------+--------+--------+--------+-------+------------+---------------------+------------+-----------------------------------------------------------------------------------------------------------------+---------------------+---------------------+
5 rows in set (0.28 sec)

这里假设Hive ODS库中没有product_info的存量数据表,因此仅做全量抽取。

子任务6实现

编写Spark ETL程序,从mysql的ds_db01.product_info表中抽取增量数据到Hive的ods.product_info表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

    // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // 从数据库中抽取,并增加分区列
  spark.read.format("jdbc")           // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url) // 连接url
    .option("dbtable", "product_info") // 要读取的表
    .option("user", "root") // 连接账户
    .option("password","admin") // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225")) // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.product_info")

  spark.stop()  
}
}

子任务7

子任务7描述

7、抽取ds_db01 库中customer_inf 的增量数据进入Hive 的ods 库中表customer_inf,根据ods.customer_inf 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.customer_inf命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务7分析

在MySQL中查看源表customer_inf的数据结构和数据内容:

mysql> desc customer_inf;
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| Field             | Type             | Null | Key | Default           | Extra                       |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| customer_inf_id   | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| customer_id       | int(10) unsigned | NO   |     | NULL             |                             |
| customer_name     | varchar(20)     | NO   |     | NULL             |                             |
| identity_card_type | tinyint(4)       | NO   |     | 1                 |                             |
| identity_card_no   | varchar(20)     | YES |     | NULL             |                             |
| mobile_phone       | varchar(50)     | YES |     | NULL             |                             |
| customer_email     | varchar(50)     | YES |     | NULL             |                             |
| gender             | char(1)         | YES |     | NULL             |                             |
| customer_point     | int(11)         | NO   |     | 0                 |                             |
| register_time     | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| birthday           | datetime         | YES |     | NULL             |                             |
| customer_level     | tinyint(4)       | NO   |     | 1                 |                             |
| customer_money     | decimal(8,2)     | NO   |     | 0.00             |                             |
| modified_time     | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
14 rows in set (0.12 sec)

mysql> select * from customer_inf limit 5;
+-----------------+-------------+---------------+--------------------+--------------------+--------------+-------------------------+--------+----------------+---------------------+---------------------+----------------+----------------+---------------------+
| customer_inf_id | customer_id | customer_name | identity_card_type | identity_card_no   | mobile_phone | customer_email         | gender | customer_point | register_time       | birthday           | customer_level | customer_money | modified_time       |
+-----------------+-------------+---------------+--------------------+--------------------+--------------+-------------------------+--------+----------------+---------------------+---------------------+----------------+----------------+---------------------+
|               1 |           0 | 鞠桂芝       |                 1 | 511325198211210472 | 18572811239 | songjuan@gmail.com     |       |         10564 | 2022-08-16 08:48:36 | 1984-07-15 00:00:00 |             2 |       13675.00 | 2022-08-22 03:45:36 |
|               2 |           1 | 郝桂花       |                 2 | 533324198602022478 | 18654171793 | yanshi@yahoo.com       |       |           988 | 2022-08-18 17:52:36 | 1907-01-21 00:00:00 |             5 |       15431.00 | 2022-08-23 13:22:36 |
|               3 |           2 | 王志强       |                 3 | 451223193609298673 | 15387045553 | jingmeng@gmail.com     |       |         10264 | 2022-08-16 12:01:36 | 1964-05-12 00:00:00 |             5 |       23947.00 | 2022-08-21 18:23:36 |
|               4 |           3 | 朱健         |                 3 | 421087199311093328 | 13923458889 | weicheng@yahoo.com     | M     |           1335 | 2022-08-16 07:48:36 | 1995-01-18 00:00:00 |             1 |       24922.00 | 2022-08-21 11:01:36 |
|               5 |           4 | 徐桂芝       |                 3 | 152900194809225063 | 13453563818 | tianxiuying@hotmail.com | M     |           4973 | 2022-08-15 13:04:36 | 1931-11-03 00:00:00 |             3 |       22209.00 | 2022-08-23 09:59:36 |
+-----------------+-------------+---------------+--------------------+--------------------+--------------+-------------------------+--------+----------------+---------------------+---------------------+----------------+----------------+---------------------+
5 rows in set (0.13 sec)

这里假设Hive ODS库中没有customer_inf的存量数据表,因此仅做全量抽取。

子任务7实现

编写Spark ETL程序,从mysql的ds_db01.customer_inf表中抽取增量数据到Hive的ods.customer_inf表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

    // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")           // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url) // 连接url
    .option("dbtable", "customer_inf") // 要读取的表
    .option("user", "root") // 连接账户
    .option("password","admin") // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225"))   // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.customer_inf")  

  spark.stop()
}
}

子任务8

子任务8描述

8、抽取ds_db01 库中customer_login_log 的增量数据进入Hive 的ods 库中表customer_login_log,根据ods.customer_login_log 表中login_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.customer_login_log 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务8分析

在MySQL中查看源表customer_login_log的数据结构和数据内容:

mysql> desc customer_login_log;
+-------------+------------------+------+-----+-------------------+-----------------------------+
| Field       | Type             | Null | Key | Default           | Extra                       |
+-------------+------------------+------+-----+-------------------+-----------------------------+
| login_id   | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| customer_id | int(10) unsigned | NO   |     | NULL             |                             |
| login_time | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| login_ip   | varchar(100)     | NO   |     | NULL             |                             |
| login_type | tinyint(4)       | NO   |     | NULL             |                             |
+-------------+------------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.05 sec)

mysql> select * from customer_login_log limit 5;
+----------+-------------+---------------------+-----------------+------------+
| login_id | customer_id | login_time         | login_ip       | login_type |
+----------+-------------+---------------------+-----------------+------------+
|       1 |       12132 | 2022-03-16 23:09:48 | 146.23.254.255 |         0 |
|       2 |       13449 | 2022-03-16 22:19:48 | 8.148.174.180   |         0 |
|       3 |       7203 | 2022-03-16 22:23:48 | 84.35.22.35     |         0 |
|       4 |       4639 | 2022-03-16 19:28:48 | 52.194.3.131   |         0 |
|       5 |       9023 | 2022-03-16 21:34:49 | 201.111.203.250 |         0 |
+----------+-------------+---------------------+-----------------+------------+
5 rows in set (0.16 sec)

这里假设Hive ODS库中没有customer_login_log的存量数据表,因此仅做全量抽取。

子任务8实现

编写Spark ETL程序,从mysql的ds_db01.customer_login_log表中抽取增量数据到Hive的ods.customer_login_log表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

  // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")       // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url) // 连接url
    .option("dbtable", "customer_login_log") // 要读取的表
    .option("user", "root") // 连接账户
    .option("password","admin") // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225"))   // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.customer_login_log")  

  spark.stop()  
}
}

子任务9

子任务9描述

9、抽取ds_db01 库中order_cart 的增量数据进入Hive 的ods 库中表order_cart,根据ods.order_cart 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为 etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.order_cart 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;

子任务9分析

在MySQL中查看源表product_browse的数据结构和数据内容:

mysql> desc order_cart;
+----------------+------------------+------+-----+---------+----------------+
| Field         | Type             | Null | Key | Default | Extra         |
+----------------+------------------+------+-----+---------+----------------+
| cart_id       | int(10) unsigned | NO   | PRI | NULL   | auto_increment |
| customer_id   | int(10) unsigned | NO   |     | NULL   |               |
| product_id     | int(10) unsigned | NO   |     | NULL   |               |
| product_amount | int(11)         | NO   |     | NULL   |               |
| price         | decimal(8,2)     | NO   |     | NULL   |               |
| add_time       | varchar(200)     | NO   |     | NULL   |               |
| modified_time | timestamp       | YES |     | NULL   |               |
+----------------+------------------+------+-----+---------+----------------+
7 rows in set (0.19 sec)

mysql> select * from order_cart limit 5;
+---------+-------------+------------+----------------+--------+----------------+---------------------+
| cart_id | customer_id | product_id | product_amount | price | add_time       | modified_time       |
+---------+-------------+------------+----------------+--------+----------------+---------------------+
|       1 |       9097 |       752 |             4 | 544.32 | 20220317064851 | 2022-03-21 06:17:51 |
|       2 |       18577 |     14307 |             11 | 775.58 | 20220317082751 | 2022-03-21 18:33:51 |
|       3 |       18577 |       8047 |             9 | 945.32 | 20220318072056 | 2022-03-21 06:53:56 |
|       4 |       18577 |       5775 |             9 | 625.46 | 20220317123558 | 2022-03-21 08:59:58 |
|       5 |       14486 |     11237 |             16 | 871.79 | 20220317012711 | 2022-03-21 14:45:11 |
+---------+-------------+------------+----------------+--------+----------------+---------------------+
5 rows in set (0.17 sec)

这里假设Hive ODS库中没有order_cart的存量数据表,因此仅做全量抽取。

子任务9实现

编写Spark ETL程序,从mysql的ds_db01.order_cart表中抽取增量数据到Hive的ods.order_cart表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

  // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // 从数据库中抽取,并增加分区列
  spark.read.format("jdbc")             // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url) // 连接url
    .option("dbtable", "order_cart") // 要读取的表
    .option("user", "root") // 连接账户
    .option("password","admin") // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225"))   // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.order_cart")  

  spark.stop()  
}
}

子任务10

子任务10描述

10、抽取ds_db01 库中customer_addr 的增量数据进入Hive 的ods 库中表customer_addr,根据ods.customer_addr 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd ) 。使用hive cli 执行show partitions ods.customer_addr 命令,将执行结果截图粘贴至客户端桌面【Release\模块B提交结果.docx】中对应的任务序号下;

子任务10分析

在MySQL中查看源表customer_addr的数据结构和数据内容:

mysql> desc customer_addr;
+------------------+------------------+------+-----+-------------------+-----------------------------+
| Field           | Type             | Null | Key | Default           | Extra                       |
+------------------+------------------+------+-----+-------------------+-----------------------------+
| customer_addr_id | int(10) unsigned | NO   | PRI | NULL             | auto_increment             |
| customer_id     | int(10) unsigned | NO   |     | NULL             |                             |
| zip             | int(11)         | NO   |     | NULL             |                             |
| province         | varchar(200)     | NO   |     | NULL             |                             |
| city             | varchar(200)     | NO   |     | NULL             |                             |
| address         | varchar(200)     | NO   |     | NULL             |                             |
| is_default       | tinyint(4)       | NO   |     | NULL             |                             |
| modified_time   | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+------------------+------------------+------+-----+-------------------+-----------------------------+
8 rows in set (0.15 sec)

mysql> select * from customer_addr limit 5;
+------------------+-------------+--------+-----------+--------------------+-----------------------------------------------------------------------------------------+------------+---------------------+
| customer_addr_id | customer_id | zip   | province | city               | address                                                                                 | is_default | modified_time       |
+------------------+-------------+--------+-----------+--------------------+-----------------------------------------------------------------------------------------+------------+---------------------+
|               1 |           1 | 877100 | 上海市   | 上海市             | 上海市上海市肇嘉浜路2124598号明珠大酒店大堂2层                                         |         1 | 2022-08-26 21:59:12 |
|               2 |           2 | 209549 | 江苏省   | 江苏省无锡市       | 江苏省无锡市南长区红星路229-73009号20层                                                 |         0 | 2022-08-21 08:34:12 |
|               3 |           3 | 810666 | 上海市   | 上海市             | 上海市上海市南京西路20663863号01C商铺19层                                               |         1 | 2022-08-24 06:58:12 |
|               4 |           4 | 417668 | 江苏省   | 江苏省南京市       | 江苏省南京市中山路3215585号鼓楼医院南扩新院区门诊大楼一楼18层                           |         0 | 2022-08-27 16:53:12 |
|               5 |           5 | 442733 | 江苏省   | 江苏省南京市       | 江苏省南京市玄武区徐庄软件园苏宁大道11014号苏宁总部大楼1F18层                           |         0 | 2022-08-26 17:21:12 |
+------------------+-------------+--------+-----------+--------------------+-----------------------------------------------------------------------------------------+------------+---------------------+
5 rows in set (0.16 sec)

这里假设Hive ODS库中没有customer_addr的存量数据表,因此仅做全量抽取。

子任务10实现

编写Spark ETL程序,从mysql的ds_db01.customer_addr表中抽取增量数据到Hive的ods.customer_addr表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

  // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")             // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url)           // 连接url
    .option("dbtable", "customer_addr") // 要读取的表
    .option("user", "root")           // 连接账户
    .option("password","admin")       // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225"))   // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.customer_addr")    

  spark.stop()
}
}

子任务11

子任务11描述

11、抽取ds_db01 库中customer_level_inf 的增量数据进入Hive 的ods 库中表customer_level_inf , 根据ods.customer_level_inf 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变, 同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行showpartitions ods.customer_level_inf 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下。

子任务11分析

在MySQL中查看源表customer_level_inf的数据结构和数据内容:

mysql> desc customer_level_inf;
+----------------+------------------+------+-----+-------------------+-----------------------------+
| Field         | Type             | Null | Key | Default           | Extra                       |
+----------------+------------------+------+-----+-------------------+-----------------------------+
| customer_level | tinyint(4)       | NO   | PRI | NULL             | auto_increment             |
| level_name     | varchar(10)     | NO   |     | NULL             |                             |
| min_point     | int(10) unsigned | NO   |     | 0                 |                             |
| max_point     | int(10) unsigned | NO   |     | 0                 |                             |
| modified_time | timestamp       | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+----------------+------------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.13 sec)

mysql> select * from customer_level_inf limit 5;
+----------------+--------------+-----------+-----------+---------------------+
| customer_level | level_name   | min_point | max_point | modified_time       |
+----------------+--------------+-----------+-----------+---------------------+
|             1 | 普通会员     |         0 |     10000 | 2022-09-10 16:30:12 |
|             2 | 青铜         |     10000 |     20000 | 2022-09-10 16:31:12 |
|             3 | 白银         |     20000 |     30000 | 2022-09-10 16:32:12 |
|             4 | 黄金         |     30000 |     40000 | 2022-09-10 16:33:12 |
+----------------+--------------+-----------+-----------+---------------------+
4 rows in set (0.20 sec)

这里假设Hive ODS库中没有customer_level_inf的存量数据表,因此仅做全量抽取。

子任务11实现

编写Spark ETL程序,从mysql的ds_db01.customer_level_inf表中抽取增量数据到Hive的ods.customer_level_inf表中。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object EtlJob {
def main(args: Array[String]): Unit = {
  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("ETL Job")
    // 兼容Hive Parquet存储格式
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()

  // 定义数据库连接url
  val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"

  // ETL管道
  spark.read.format("jdbc")       // jdbc数据源
    .option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
    .option("url", db_url)           // 连接url
    .option("dbtable", "customer_level_inf") // 要读取的表
    .option("user", "root")           // 连接账户
    .option("password","admin")       // 连接密码
    .load()                                     // 加载
    .withColumn("etl_date",lit("20231225"))   // 增加分区列
    // 分区写入Hive ODS
    .write
    .partitionBy("etl_date")     // 指定分区
    .format("hive")                 // 注意这里
    .mode("overwrite")           // 覆盖写入
    .saveAsTable("ss2024_ds_ods.customer_level_inf")

  spark.stop()
}
}
© 版权声明
THE END
喜欢就支持一下吧
点赞211赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容