任务描述
编写Scala 工程代码, 将MySQL 的ds_db01 库中表order_master、order_detail、coupon_info、coupon_use、product_browse、product_info、customer_inf 、customer_login_log 、order_cart 、customer_level_inf 、 customer_addr 的数据增量抽取到Hive 的ods 库中对应表order_master、order_detail、coupon_info、coupon_use、product_browse、product_info、customer_inf 、customer_login_log 、order_cart 、customer_level_inf 、customer_addr 中(ods 库中部分表没有数据,正常抽取即可)。
任务分析
如果这些表在后续的数据处理和计算中使用到,还情有可原;如果大部分表在后续的数据处理和计算中没有用到,这样的题目可谓是“老太太的裹脚布,又臭又长”。
任务描述中提到,“(ods 库中部分表没有数据,正常抽取即可)”,意思也就是,在ods库中部分表是没有存量数据的,对于这些表直接全量抽取即可;对于有存量数据的,则需要增量抽取。
但是,这里赛方挖了一个比赛环境的坑,即存量数据表(Hive表)是如何创建的?大家都知道,Hive的存储格式和Spark的存储格式是有所不同的,如果不知道存量数据Hive表的存储格式,则在使用Spark做ETL增量写入的时候,则会遇到格式不兼容的问题,从而造成写入失败。这时可能会遇到了类似下面这样的错误信息:
org.apache.spark.sql.AnalysisException: The format of the existing table ods.order_master is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
综合以往赛方的出题水平,猜测赛方会使用Hive CLI手工创建ODS中的存量数据表(如果有的话),那么就涉及到一个问题,赛方创建的Hive存量表,会使用什么存储格式呢?
如果Hive表是用命令行创建的ods中的hive表,那么创建的时候可以指定不同的存储格式:TextFile、SequenceFile、RCFile、ORC、Parquet。默认情况下,不指定的话,是TextFile。
而Spark 的df.write默认的format格式是parquet + snappy。如果表是用hive命令行创建的,就不符合格式,所以就会报错。如果表是提前不存在的,那么就不会有什么问题。
那么,怎么知道已经存在的hive表是如何创建的呢?
在hive cli命令行下,执行如下命令:
hive> show create table ods.xxxx;
其中xxxx是表名称。这会显示创建表的完整语句,其中包括存储格式。
如果遇到这种情况,解决方法如下:
1)将format设置为Hive,则无论hive建表的时候使用的fileformat使用的是哪一种,都是没有关系的:
df.write.format("hive").mode("append").saveAsTable("ods.xxxx")
2)还有一种方式,就是先将DataFrame注册成临时表,然后通过SQL的方式执行insert语句进行插入:
df.createOrReplaceTempView("temp_tb")
spark.sql("insert into ods.xxxx select * from temp_tb")
准备工作
本案例在小白学苑PBCP2023平台上测试运行,当移植到官方竞赛平台时,请酌情调整。
为了测试运行能正常进行,首先进行如下准备工作:
1)启动Hadoop集群:
# start-dfs.sh
# start-yarn.sh
2)启动Hive元数据服务:
新打开一个终端窗口,运行如下命令,启动Hive元数据服务:
# hive --service metastore
不要关闭窗口,保持服务处于运行状态。
3)使用Hive CLI,创建本示例中要用到的Hive ODS数据库ss2024_ds_ods:
hive> create database ss2024_ds_ods;
4)单击右侧目录,可快速跳转到相应的子任务部分。
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
-
参考解析
子任务1
子任务1描述
1、抽取ds_db01 库中order_master 的增量数据进入Hive 的ods 库中表order_master。根据ods.order_master 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段 为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.order_master命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务1分析
首先,探索MySQL ds_db01库中order_master表中的数据。
使用如下命令,查看order_master表的结构:
mysql> desc order_master;
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| order_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| order_sn | varchar(100) | NO | | NULL | |
| customer_id | int(10) unsigned | NO | | NULL | |
| shipping_user | varchar(10) | NO | | NULL | |
| province | varchar(200) | NO | | NULL | |
| city | varchar(200) | NO | | NULL | |
| address | varchar(100) | NO | | NULL | |
| order_source | tinyint(4) | NO | | NULL | |
| payment_method | tinyint(4) | NO | | NULL | |
| order_money | decimal(8,2) | NO | | NULL | |
| district_money | decimal(8,2) | NO | | 0.00 | |
| shipping_money | decimal(8,2) | NO | | 0.00 | |
| payment_money | decimal(8,2) | NO | | 0.00 | |
| shipping_comp_name | varchar(10) | YES | | NULL | |
| shipping_sn | varchar(50) | YES | | NULL | |
| create_time | varchar(100) | NO | | NULL | |
| shipping_time | varchar(100) | NO | | NULL | |
| pay_time | varchar(100) | NO | | NULL | |
| receive_time | varchar(100) | NO | | NULL | |
| order_status | varchar(50) | YES | | NULL | |
| order_point | int(10) unsigned | NO | | 0 | |
| invoice_title | varchar(100) | YES | | NULL | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
23 rows in set (0.14 sec)
使用如下命令,查看order_master表中前5条记录:
mysql> select * from order_master limit 5;
+----------+------------------+-------------+---------------+-----------+--------------------+------------------------------------------------------------------------------------------+--------------+----------------+-------------+----------------+----------------+---------------+--------------------+---------------+----------------+----------------+----------------+----------------+--------------+-------------+-----------------------------------+---------------------+
| order_id | order_sn | customer_id | shipping_user | province | city | address | order_source | payment_method | order_money | district_money | shipping_money | payment_money | shipping_comp_name | shipping_sn | create_time | shipping_time | pay_time | receive_time | order_status | order_point | invoice_title | modified_time |
+----------+------------------+-------------+---------------+-----------+--------------------+------------------------------------------------------------------------------------------+--------------+----------------+-------------+----------------+----------------+---------------+--------------------+---------------+----------------+----------------+----------------+----------------+--------------+-------------+-----------------------------------+---------------------+
| 1 | 2022031631349116 | 9097 | 孙燕 | 上海市 | 上海市 | 上海市上海市水城南路854598号17层 | 1 | 3 | 2177.28 | 1306.37 | 22.12 | 893.03 | 韵达 | 9339334312635 | 20220317170357 | NULL | NULL | NULL | 已下单 | 89 | 南康信息有限公司 | 2022-03-17 17:03:57 |
| 2 | 2022031631349116 | 9097 | 孙燕 | 上海市 | 上海市 | 上海市上海市水城南路854598号17层 | 1 | 3 | 2177.28 | 1306.37 | 22.12 | 893.03 | 韵达 | 9339334312635 | 20220317170357 | NULL | 20220317214157 | NULL | 已付款 | 89 | 南康信息有限公司 | 2022-03-17 21:41:57 |
| 3 | 2022031631349116 | 9097 | 孙燕 | 上海市 | 上海市 | 上海市上海市水城南路854598号17层 | 1 | 3 | 2177.28 | 1306.37 | 22.12 | 893.03 | 韵达 | 9339334312635 | 20220317170357 | 20220319035357 | 20220317214157 | NULL | 已发货 | 89 | 南康信息有限公司 | 2022-03-19 03:53:57 |
| 4 | 2022031631349116 | 9097 | 孙燕 | 上海市 | 上海市 | 上海市上海市水城南路854598号17层 | 1 | 3 | 2177.28 | 1306.37 | 22.12 | 893.03 | 韵达 | 9339334312635 | 20220317170357 | 20220319035357 | 20220317214157 | 20220320231157 | 已签收 | 89 | 南康信息有限公司 | 2022-03-20 23:11:57 |
| 5 | 2022031625520775 | 18577 | 尹柳 | 浙江省 | 浙江省宁波市 | 浙江省宁波市中山东路10832665号世纪东方广场一楼F10162665号商铺21层 | 1 | 4 | 3277.42 | 1310.97 | 47.82 | 2014.27 | 韵达 | 4897398474485 | 20220317041801 | NULL | NULL | NULL | 已下单 | 201 | 时空盒数字传媒有限公司 | 2022-03-17 04:18:01 |
+----------+------------------+-------------+---------------+-----------+--------------------+------------------------------------------------------------------------------------------+--------------+----------------+-------------+----------------+----------------+---------------+--------------------+---------------+----------------+----------------+----------------+----------------+--------------+-------------+-----------------------------------+---------------------+
5 rows in set (0.00 sec)
使用如下命令,找出order_master表中订单的modified_time字段值表示的时间范围:
mysql> select min(modified_time),max(modified_time) from order_master;
+---------------------+---------------------+
| min(modified_time) | max(modified_time) |
+---------------------+---------------------+
| 2022-03-17 02:59:00 | 2022-09-16 09:44:51 |
+---------------------+---------------------+
假设Hive 的ODS库的存量表中存储了2022-03-17这一天的数据。为此,我们需要在Hive中手工创建ods.order_master存量表。我们将按以下步骤实现:
1)在hive cli中,开户动态分区支持。
动态分区默认是没有开启。可以通过执行以下命令开启动态分区。
hive> set hive.exec.dynamic.partition.mode=nonstrict; // 分区模式,默认strict
hive> set hive.exec.dynamic.partition=true; // 开启动态分区,默认false
hive> set hive.exec.max.dynamic.partitions=1000; //最大动态分区数, 设为1000
2)在Hive中,创建ODS数据库,命令如下:
hive> create database ss2024_ds_ods;
3)在hive cli中,创建存量表order_master(分区表)。Hive SQL命令如下:
hive> create table if not exists ss2024_ds_ods.order_master(
order_id bigint,
order_sn string,
customer_id bigint,
shipping_user string,
province string,
city string,
address string,
order_source int,
payment_method int,
order_money decimal(8,2),
district_money decimal(8,2),
shipping_money decimal(8,2),
payment_money decimal(8,2),
shipping_comp_name string,
shipping_sn string,
create_time string,
shipping_time string,
pay_time string,
receive_time string,
order_status string,
order_point bigint,
invoice_title string,
modified_time timestamp
)
partitioned by (etl_date string)
row format delimited
fields terminated by ','
null defined as 'NULL'
stored as textfile
tblproperties("serialization.null.format"='NULL');
4)在Hive CLI中,手工加载存量数据(即2022-03-17这一天的订单数据)到order_master表中:
将存量数据加载到Hive的ods.order_master分区表中:
hive> load data local inpath '/opt/data/order_master_20220317.csv' overwrite into table ss2024_ds_ods.order_master partition(etl_date='20231224');
子任务1实现
首先,创建一个Spark Maven项目(名称等自己任意取),并配置好jdk、scala sdk、hive集成,以及pom.xml文件等。
编写Spark ETL程序,从mysql的ds_db01.order_master表中抽取增量数据到Hive的ods.order_master表中。
抽取路径 jdbc -> dataframe -> hive ods。实现代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// (1) 计算出ODS库中存量数据的最大时间
val max_modified_time_sql = "select max(modified_time) from ss2024_ds_ods.order_master"
val df1 = spark.sql(max_modified_time_sql)
val max_time = df1.first.getAs[java.sql.Timestamp](0) // 2022-03-17 23:32:18.0
// max_time
// (2) ETL抽取
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// 增量查询的SQL语句
val increment_order_master_sql = s"select * from order_master where modified_time > '${max_time}'"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("query", increment_order_master_sql) // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("append") // 追加
.saveAsTable("ss2024_ds_ods.order_master")
spark.stop()
}
}
子任务2
子任务2描述
2、抽取ds_db01 库中order_detail 的增量数据进入Hive 的ods 库中表order_detail。根据ods.order_detail 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.order_detail命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务2分析
首先,探索MySQL ds_db01库中order_detail表中的数据。
使用如下命令,查看order_detail表的结构:
mysql> desc order_detail;
+-----------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+------------------+------+-----+-------------------+-----------------------------+
| order_detail_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| order_sn | varchar(100) | NO | | NULL | |
| product_id | int(10) unsigned | NO | | NULL | |
| product_name | varchar(50) | NO | | NULL | |
| product_cnt | int(11) | NO | | 1 | |
| product_price | decimal(8,2) | NO | | NULL | |
| average_cost | decimal(8,2) | NO | | NULL | |
| weight | float | YES | | NULL | |
| fee_money | decimal(8,2) | NO | | 0.00 | |
| w_id | int(10) unsigned | NO | | NULL | |
| create_time | varchar(200) | NO | | NULL | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-----------------+------------------+------+-----+-------------------+-----------------------------+
12 rows in set (0.00 sec)
使用如下命令,查看order_master表中前5条记录:
mysql> select * from order_detail limit 5;
+-----------------+------------------+------------+--------------------------------------------------------------+-------------+---------------+--------------+---------+-----------+------+----------------+---------------------+
| order_detail_id | order_sn | product_id | product_name | product_cnt | product_price | average_cost | weight | fee_money | w_id | create_time | modified_time |
+-----------------+------------------+------------+--------------------------------------------------------------+-------------+---------------+--------------+---------+-----------+------+----------------+---------------------+
| 1 | 2022031631349116 | 752 | 勒姆森无线蓝牙耳机头戴式重低音电脑通用 | 4 | 544.32 | 0.00 | 4.94507 | 82.49 | 118 | 20220317094052 | 2022-03-18 09:17:52 |
| 2 | 2022031625520775 | 14307 | 新品无线蓝牙耳机卡通女生可爱运动跑步适用 | 1 | 775.58 | 0.00 | 3.29853 | 72.56 | 1285 | 20220317182852 | 2022-03-18 09:14:52 |
| 3 | 2022031625520775 | 5775 | ipad蓝牙键盘鼠标苹果华为手机平板女生可爱 | 4 | 625.46 | 0.00 | 9.80734 | 44.76 | 837 | 20220317142959 | 2022-03-18 03:03:59 |
| 4 | 2022031650972261 | 11237 | 兽角族网红发光学生头戴式女生猫耳朵无线蓝 | 1 | 871.79 | 0.00 | 5.10348 | 24.87 | 1227 | 20220317140212 | 2022-03-18 07:36:12 |
| 5 | 2022031650972261 | 11553 | 狂热者无线大电量蓝牙耳机通用运动tws | 4 | 940.35 | 0.00 | 8.49109 | 38.26 | 381 | 20220317141414 | 2022-03-18 06:08:14 |
+-----------------+------------------+------------+--------------------------------------------------------------+-------------+---------------+--------------+---------+-----------+------+----------------+---------------------+
5 rows in set (0.03 sec)
使用如下命令,找出order_detail表中订单明细的modified_time字段值表示的时间范围:
mysql> select min(modified_time),max(modified_time) from order_detail;
+---------------------+---------------------+
| min(modified_time) | max(modified_time) |
+---------------------+---------------------+
| 2022-03-17 19:25:09 | 2022-09-12 14:45:45 |
+---------------------+---------------------+
假设Hive 的ODS库的存量表中存储了2022-03-17这一天的数据。为此,我们需要在Hive中手工创建ods.order_detail存量表。我们将按以下步骤实现:
1)在hive cli中,创建存量表order_detail(分区表)。Hive SQL命令如下:
hive> create table if not exists ss2024_ds_ods.order_detail(
order_detail_id bigint,
order_sn string,
product_id bigint,
product_name string,
product_cnt bigint,
product_price decimal(8,2),
average_cost decimal(8,2),
weight float,
fee_money decimal(8,2),
w_id bigint,
create_time string,
modified_time timestamp
)
partitioned by (etl_date string)
row format delimited
fields terminated by ','
null defined as 'NULL'
stored as textfile
tblproperties("serialization.null.format"='NULL');
2)将存量数据加载到Hive的ods.order_detail分区表中:
hive> load data local inpath '/opt/data/order_detail_20220317.csv' overwrite into table ss2024_ds_ods.order_detail partition(etl_date='20231224');
子任务2实现
编写Spark ETL程序,从mysql的ds_db01.order_detail表中抽取增量数据到Hive的ods.order_detail表中。
实现代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// (1) 计算出ODS库中存量数据的最大时间
val max_modified_time_sql = "select max(modified_time) from ss2024_ds_ods.order_detail"
val df1 = spark.sql(max_modified_time_sql)
val max_time = df1.first.getAs[java.sql.Timestamp](0) // 2022-03-17 23:48:20.0
// (2) mysql中执行增量查询的语句
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// 增量查询的SQL语句
val increment_order_detail_sql = s"select * from order_detail where modified_time > '${max_time}'"
// 增量查询
val increment_order_detail_df = spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("query", increment_order_detail_sql) // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
// 分区追加写入Hive ODS
// 可以将查询出的增量数据注册为临时表,然后执行inset into ... select ...语句插入Hive ODS存量分区表中:
increment_order_detail_df.createOrReplaceTempView("temp_tb")
spark.sql("insert into ss2024_ds_ods.order_detail partition(etl_date='20241225') select * from temp_tb")
spark.stop()
}
}
子任务3
子任务3描述
3、抽取ds_db01 库中coupon_info 的增量数据进入Hive 的ods 库中表coupon_info,根据ods.coupon_info 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段 为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.coupon_info命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务3分析
前两个子任务已经演示了增量写入Hive ODS表的方法。从本子任务开始,在后续的子任务中,为简单起见,均假设为全量抽取(即Hive ODS库中不存在相应的存量数据表)。
如果在比赛中遇到Hive ODS库中已经有了存量表的情况,请参考前两个子任务的增量抽取和追加写入方法。
在MySQL中查看源表coupon_info的数据结构和数据内容:
mysql> desc coupon_info;
+------------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------------------+------+-----+-------------------+-----------------------------+
| coupon_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| coupon_name | varchar(20) | NO | | NULL | |
| coupon_type | tinyint(4) | NO | | NULL | |
| condition_amount | int(10) unsigned | NO | | NULL | |
| condition_num | int(10) unsigned | NO | | NULL | |
| activity_id | varchar(20) | NO | | NULL | |
| benefit_amount | decimal(8,2) | NO | | 0.00 | |
| benefit_discount | decimal(8,2) | NO | | 0.00 | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+------------------+------------------+------+-----+-------------------+-----------------------------+
9 rows in set (0.01 sec)
mysql> select * from coupon_info limit 5;
+-----------+---------------+-------------+------------------+---------------+---------------+----------------+------------------+---------------------+
| coupon_id | coupon_name | coupon_type | condition_amount | condition_num | activity_id | benefit_amount | benefit_discount | modified_time |
+-----------+---------------+-------------+------------------+---------------+---------------+----------------+------------------+---------------------+
| 1 | 1折优惠券 | 1 | 1000 | 3 | 6569962031230 | 0.00 | 0.10 | 2022-09-09 06:23:07 |
| 2 | 2折优惠券 | 1 | 2000 | 3 | 4238315543752 | 0.00 | 0.20 | 2022-09-06 14:58:07 |
| 3 | 3折优惠券 | 1 | 3000 | 3 | 685749005648 | 0.00 | 0.30 | 2022-09-05 09:06:07 |
| 4 | 4折优惠券 | 1 | 4000 | 3 | 8020425915927 | 0.00 | 0.40 | 2022-09-07 10:19:07 |
| 5 | 5折优惠券 | 1 | 5000 | 3 | 4552013390624 | 0.00 | 0.50 | 2022-09-06 01:24:07 |
+-----------+---------------+-------------+------------------+---------------+---------------+----------------+------------------+---------------------+
5 rows in set (0.00 sec)
子任务3实现
编写Spark ETL程序,从mysql的ds_db01.coupon_info表中抽取增量数据到Hive的ods.coupon_info表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "coupon_info") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区追加写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.coupon_info")
spark.stop()
}
}
子任务4
子任务4描述
4、抽取ds_db01 库中coupon_use 的增量数据进入Hive 的ods 库中表coupon_use,增量字段取ods.coupon_use 表中get_time、used_time、pay_time 中的最大者,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用Hive Cli 查询最新分区数据总条数,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务4分析
在MySQL中查看源表coupon_use的数据结构和数据内容:
mysql> desc coupon_use;
+---------------+------------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+------------------+------+-----+---------+----------------+
| coupon_use_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| coupon_id | int(10) unsigned | NO | | NULL | |
| customer_id | int(10) unsigned | NO | | NULL | |
| order_sn | varchar(100) | NO | | NULL | |
| coupon_status | varchar(20) | NO | | NULL | |
| get_time | varchar(200) | NO | | NULL | |
| used_time | varchar(200) | NO | | NULL | |
| pay_time | varchar(200) | YES | | NULL | |
+---------------+------------------+------+-----+---------+----------------+
8 rows in set (0.04 sec)
mysql> select * from coupon_use limit 5;
+---------------+-----------+-------------+------------------+---------------+----------------+----------------+----------------+
| coupon_use_id | coupon_id | customer_id | order_sn | coupon_status | get_time | used_time | pay_time |
+---------------+-----------+-------------+------------------+---------------+----------------+----------------+----------------+
| 1 | 15 | 9097 | 0 | 未使用 | 20220315034155 | NULL | NULL |
| 2 | 6 | 9097 | 2022031631349116 | 已使用 | 20220308181755 | 20220317204555 | 20220317204555 |
| 3 | 12 | 18577 | 0 | 未使用 | 20220308214353 | NULL | NULL |
| 4 | 4 | 18577 | 2022031625520775 | 已使用 | 20220313163159 | 20220318015359 | 20220318015359 |
| 5 | 0 | 14486 | 0 | 未使用 | 20220312112310 | NULL | NULL |
+---------------+-----------+-------------+------------------+---------------+----------------+----------------+----------------+
5 rows in set (0.10 sec)
请注意两点:
1)“增量字段取ods.coupon_use 表中get_time、used_time、pay_time 中的最大者”,有人会问如果地增量抽取,这怎么实现?可以试一下:greatest(max(get_time),max(used_time),max(pay_time))。
2)get_time、used_time、pay_time三个字段中可能有NULL字符串值。是否要处理?如果后续没有涉及什么计算,可以不处理。
子任务4实现
编写Spark ETL程序,从mysql的ds_db01.coupon_info表中抽取增量数据到Hive的ods.coupon_info表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "coupon_use") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.coupon_use")
spark.stop()
}
}
子任务5
子任务5描述
5、抽取ds_db01 库中product_browse 的增量数据进入Hive 的ods 库中表product_browse,根据ods.product_browse 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd ) 。使用hive cli 执行show partitions ods.product_browse 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下。
子任务5分析
在MySQL中查看源表product_browse的数据结构和数据内容:
mysql> desc product_browse;
+---------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+------------------+------+-----+-------------------+-----------------------------+
| log_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| product_id | int(10) unsigned | NO | | NULL | |
| customer_id | int(10) unsigned | NO | | NULL | |
| gen_order | int(10) unsigned | NO | | NULL | |
| order_sn | varchar(100) | NO | | NULL | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+------------------+------+-----+-------------------+-----------------------------+
6 rows in set (0.00 sec)
mysql> select * from product_browse limit 5;
+--------+------------+-------------+-----------+------------------+---------------------+
| log_id | product_id | customer_id | gen_order | order_sn | modified_time |
+--------+------------+-------------+-----------+------------------+---------------------+
| 1 | 752 | 9097 | 0 | 0 | 2022-03-15 00:54:50 |
| 2 | 12218 | 9097 | 0 | 0 | 2022-03-15 00:24:53 |
| 3 | 10756 | 9097 | 0 | 0 | 2022-03-15 14:26:55 |
| 4 | 14307 | 18577 | 1 | 2022031625520775 | 2022-03-15 04:59:50 |
| 5 | 8047 | 18577 | 0 | 0 | 2022-03-15 16:38:53 |
+--------+------------+-------------+-----------+------------------+---------------------+
5 rows in set (0.14 sec)
这里假设Hive ODS库中没有product_browse的存量数据表,因此仅做全量抽取。
子任务5实现
编写Spark ETL程序,从mysql的ds_db01.product_browse表中抽取增量数据到Hive的ods.product_browse表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "product_browse") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.product_browse")
spark.stop()
}
}
子任务6
子任务6描述
6、抽取ds_db01 库中product_info 的增量数据进入Hive 的ods 库中表product_info,根据ods.product_info 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.product_info命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务6分析
在MySQL中查看源表product_browse的数据结构和数据内容:
mysql> desc product_info;
+-------------------+--------------------------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+--------------------------------------+------+-----+-------------------+-----------------------------+
| product_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| product_core | char(16) | NO | | NULL | |
| product_name | varchar(200) | NO | | NULL | |
| bar_code | varchar(50) | NO | | NULL | |
| brand_id | int(10) unsigned | NO | | NULL | |
| one_category_id | smallint(5) unsigned | NO | | NULL | |
| two_category_id | smallint(5) unsigned | NO | | NULL | |
| three_category_id | smallint(5) unsigned | NO | | NULL | |
| supplier_id | int(10) unsigned | NO | | NULL | |
| price | decimal(8,2) | NO | | NULL | |
| average_cost | decimal(18,2) | NO | | NULL | |
| publish_status | tinyint(4) | NO | | 0 | |
| audit_status | tinyint(4) | NO | | 0 | |
| weight | float | YES | | NULL | |
| length | float | YES | | NULL | |
| height | float | YES | | NULL | |
| width | float | YES | | NULL | |
| color_type | enum('red','yellow','blue','blacks') | YES | | NULL | |
| production_date | datetime | NO | | NULL | |
| shelf_life | int(11) | NO | | NULL | |
| descript | text | NO | | NULL | |
| indate | timestamp | NO | | CURRENT_TIMESTAMP | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------------+--------------------------------------+------+-----+-------------------+-----------------------------+
23 rows in set (0.05 sec)
mysql> select * from product_info limit 5;
+------------+---------------+--------------------------------------------------------------+---------------+----------+-----------------+-----------------+-------------------+-------------+--------+--------------+----------------+--------------+--------+--------+--------+-------+------------+---------------------+------------+-----------------------------------------------------------------------------------------------------------------+---------------------+---------------------+
| product_id | product_core | product_name | bar_code | brand_id | one_category_id | two_category_id | three_category_id | supplier_id | price | average_cost | publish_status | audit_status | weight | length | height | width | color_type | production_date | shelf_life | descript | indate | modified_time |
+------------+---------------+--------------------------------------------------------------+---------------+----------+-----------------+-----------------+-------------------+-------------+--------+--------------+----------------+--------------+--------+--------+--------+-------+------------+---------------------+------------+-----------------------------------------------------------------------------------------------------------------+---------------------+---------------------+
| 1 | 1571120092163 | 蓝牙耳机无线双耳vivo适用苹果入耳式5.0 | 1080918117235 | 7 | 1 | 5 | 8 | 792 | 983.47 | 123.82 | 0 | 1 | 6.11 | 5.35 | 2.06 | 5.21 | yellow | 2022-01-16 00:51:05 | 36 | 其实部分时候.行业日期国内地区但是以后帮助. | 2022-08-28 05:13:45 | 2022-09-04 22:56:45 |
| 2 | 2255059528717 | 无线蓝牙耳机挂耳式单耳超长待机续航听歌入 | 3973831768388 | 7 | 1 | 5 | 8 | 792 | 728.01 | 466.48 | 1 | 1 | 3.4 | 0.52 | 7.06 | 3.06 | blacks | 2022-01-14 18:28:56 | 36 | 选择就是任何只有.美国通过投资的人如何详细. | 2022-08-30 19:12:45 | 2022-09-05 04:20:45 |
| 3 | 5366373717125 | 2020爆款真无线蓝牙耳机双耳学生可爱运动跑 | 2660524385069 | 7 | 1 | 5 | 8 | 792 | 687.65 | 232.00 | 0 | 0 | 7.24 | 6.4 | 6.22 | 9.47 | blacks | 2022-04-30 08:25:52 | 12 | 报告直接提高这么. | 2022-08-30 11:14:45 | 2022-09-09 15:19:45 |
| 4 | 4472671214875 | 夏新无线蓝牙耳机5.1单双耳一对迷你隐形 | 5825790330138 | 7 | 1 | 5 | 8 | 792 | 962.75 | 199.66 | 0 | 1 | 9.58 | 4.49 | 9.19 | 9.71 | blacks | 2022-08-25 01:18:23 | 12 | 怎么事情他们数据这么一直继续.出来产品经验.有关上海女人内容增加阅读投资中国. | 2022-09-01 05:44:45 | 2022-09-06 21:36:45 |
| 5 | 6614983799697 | 新品无线蓝牙耳机卡通女生可爱运动跑步适用 | 1422282276239 | 7 | 1 | 5 | 8 | 792 | 699.05 | 168.48 | 0 | 1 | 3.3 | 4.54 | 6.58 | 8.86 | blue | 2022-05-24 02:45:11 | 12 | 两个网站发展一次不能.项目觉得推荐. | 2022-09-03 15:21:45 | 2022-09-10 12:12:45 |
+------------+---------------+--------------------------------------------------------------+---------------+----------+-----------------+-----------------+-------------------+-------------+--------+--------------+----------------+--------------+--------+--------+--------+-------+------------+---------------------+------------+-----------------------------------------------------------------------------------------------------------------+---------------------+---------------------+
5 rows in set (0.28 sec)
这里假设Hive ODS库中没有product_info的存量数据表,因此仅做全量抽取。
子任务6实现
编写Spark ETL程序,从mysql的ds_db01.product_info表中抽取增量数据到Hive的ods.product_info表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// 从数据库中抽取,并增加分区列
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "product_info") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.product_info")
spark.stop()
}
}
子任务7
子任务7描述
7、抽取ds_db01 库中customer_inf 的增量数据进入Hive 的ods 库中表customer_inf,根据ods.customer_inf 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.customer_inf命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务7分析
在MySQL中查看源表customer_inf的数据结构和数据内容:
mysql> desc customer_inf;
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| customer_inf_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| customer_id | int(10) unsigned | NO | | NULL | |
| customer_name | varchar(20) | NO | | NULL | |
| identity_card_type | tinyint(4) | NO | | 1 | |
| identity_card_no | varchar(20) | YES | | NULL | |
| mobile_phone | varchar(50) | YES | | NULL | |
| customer_email | varchar(50) | YES | | NULL | |
| gender | char(1) | YES | | NULL | |
| customer_point | int(11) | NO | | 0 | |
| register_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| birthday | datetime | YES | | NULL | |
| customer_level | tinyint(4) | NO | | 1 | |
| customer_money | decimal(8,2) | NO | | 0.00 | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
14 rows in set (0.12 sec)
mysql> select * from customer_inf limit 5;
+-----------------+-------------+---------------+--------------------+--------------------+--------------+-------------------------+--------+----------------+---------------------+---------------------+----------------+----------------+---------------------+
| customer_inf_id | customer_id | customer_name | identity_card_type | identity_card_no | mobile_phone | customer_email | gender | customer_point | register_time | birthday | customer_level | customer_money | modified_time |
+-----------------+-------------+---------------+--------------------+--------------------+--------------+-------------------------+--------+----------------+---------------------+---------------------+----------------+----------------+---------------------+
| 1 | 0 | 鞠桂芝 | 1 | 511325198211210472 | 18572811239 | songjuan@gmail.com | | 10564 | 2022-08-16 08:48:36 | 1984-07-15 00:00:00 | 2 | 13675.00 | 2022-08-22 03:45:36 |
| 2 | 1 | 郝桂花 | 2 | 533324198602022478 | 18654171793 | yanshi@yahoo.com | | 988 | 2022-08-18 17:52:36 | 1907-01-21 00:00:00 | 5 | 15431.00 | 2022-08-23 13:22:36 |
| 3 | 2 | 王志强 | 3 | 451223193609298673 | 15387045553 | jingmeng@gmail.com | | 10264 | 2022-08-16 12:01:36 | 1964-05-12 00:00:00 | 5 | 23947.00 | 2022-08-21 18:23:36 |
| 4 | 3 | 朱健 | 3 | 421087199311093328 | 13923458889 | weicheng@yahoo.com | M | 1335 | 2022-08-16 07:48:36 | 1995-01-18 00:00:00 | 1 | 24922.00 | 2022-08-21 11:01:36 |
| 5 | 4 | 徐桂芝 | 3 | 152900194809225063 | 13453563818 | tianxiuying@hotmail.com | M | 4973 | 2022-08-15 13:04:36 | 1931-11-03 00:00:00 | 3 | 22209.00 | 2022-08-23 09:59:36 |
+-----------------+-------------+---------------+--------------------+--------------------+--------------+-------------------------+--------+----------------+---------------------+---------------------+----------------+----------------+---------------------+
5 rows in set (0.13 sec)
这里假设Hive ODS库中没有customer_inf的存量数据表,因此仅做全量抽取。
子任务7实现
编写Spark ETL程序,从mysql的ds_db01.customer_inf表中抽取增量数据到Hive的ods.customer_inf表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "customer_inf") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.customer_inf")
spark.stop()
}
}
子任务8
子任务8描述
8、抽取ds_db01 库中customer_login_log 的增量数据进入Hive 的ods 库中表customer_login_log,根据ods.customer_login_log 表中login_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.customer_login_log 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务8分析
在MySQL中查看源表customer_login_log的数据结构和数据内容:
mysql> desc customer_login_log;
+-------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------------------+------+-----+-------------------+-----------------------------+
| login_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| customer_id | int(10) unsigned | NO | | NULL | |
| login_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| login_ip | varchar(100) | NO | | NULL | |
| login_type | tinyint(4) | NO | | NULL | |
+-------------+------------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.05 sec)
mysql> select * from customer_login_log limit 5;
+----------+-------------+---------------------+-----------------+------------+
| login_id | customer_id | login_time | login_ip | login_type |
+----------+-------------+---------------------+-----------------+------------+
| 1 | 12132 | 2022-03-16 23:09:48 | 146.23.254.255 | 0 |
| 2 | 13449 | 2022-03-16 22:19:48 | 8.148.174.180 | 0 |
| 3 | 7203 | 2022-03-16 22:23:48 | 84.35.22.35 | 0 |
| 4 | 4639 | 2022-03-16 19:28:48 | 52.194.3.131 | 0 |
| 5 | 9023 | 2022-03-16 21:34:49 | 201.111.203.250 | 0 |
+----------+-------------+---------------------+-----------------+------------+
5 rows in set (0.16 sec)
这里假设Hive ODS库中没有customer_login_log的存量数据表,因此仅做全量抽取。
子任务8实现
编写Spark ETL程序,从mysql的ds_db01.customer_login_log表中抽取增量数据到Hive的ods.customer_login_log表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "customer_login_log") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.customer_login_log")
spark.stop()
}
}
子任务9
子任务9描述
9、抽取ds_db01 库中order_cart 的增量数据进入Hive 的ods 库中表order_cart,根据ods.order_cart 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为 etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行show partitions ods.order_cart 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
子任务9分析
在MySQL中查看源表product_browse的数据结构和数据内容:
mysql> desc order_cart;
+----------------+------------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+----------------+------------------+------+-----+---------+----------------+
| cart_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| customer_id | int(10) unsigned | NO | | NULL | |
| product_id | int(10) unsigned | NO | | NULL | |
| product_amount | int(11) | NO | | NULL | |
| price | decimal(8,2) | NO | | NULL | |
| add_time | varchar(200) | NO | | NULL | |
| modified_time | timestamp | YES | | NULL | |
+----------------+------------------+------+-----+---------+----------------+
7 rows in set (0.19 sec)
mysql> select * from order_cart limit 5;
+---------+-------------+------------+----------------+--------+----------------+---------------------+
| cart_id | customer_id | product_id | product_amount | price | add_time | modified_time |
+---------+-------------+------------+----------------+--------+----------------+---------------------+
| 1 | 9097 | 752 | 4 | 544.32 | 20220317064851 | 2022-03-21 06:17:51 |
| 2 | 18577 | 14307 | 11 | 775.58 | 20220317082751 | 2022-03-21 18:33:51 |
| 3 | 18577 | 8047 | 9 | 945.32 | 20220318072056 | 2022-03-21 06:53:56 |
| 4 | 18577 | 5775 | 9 | 625.46 | 20220317123558 | 2022-03-21 08:59:58 |
| 5 | 14486 | 11237 | 16 | 871.79 | 20220317012711 | 2022-03-21 14:45:11 |
+---------+-------------+------------+----------------+--------+----------------+---------------------+
5 rows in set (0.17 sec)
这里假设Hive ODS库中没有order_cart的存量数据表,因此仅做全量抽取。
子任务9实现
编写Spark ETL程序,从mysql的ds_db01.order_cart表中抽取增量数据到Hive的ods.order_cart表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// 从数据库中抽取,并增加分区列
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "order_cart") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.order_cart")
spark.stop()
}
}
子任务10
子任务10描述
10、抽取ds_db01 库中customer_addr 的增量数据进入Hive 的ods 库中表customer_addr,根据ods.customer_addr 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd ) 。使用hive cli 执行show partitions ods.customer_addr 命令,将执行结果截图粘贴至客户端桌面【Release\模块B提交结果.docx】中对应的任务序号下;
子任务10分析
在MySQL中查看源表customer_addr的数据结构和数据内容:
mysql> desc customer_addr;
+------------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------------------+------+-----+-------------------+-----------------------------+
| customer_addr_id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| customer_id | int(10) unsigned | NO | | NULL | |
| zip | int(11) | NO | | NULL | |
| province | varchar(200) | NO | | NULL | |
| city | varchar(200) | NO | | NULL | |
| address | varchar(200) | NO | | NULL | |
| is_default | tinyint(4) | NO | | NULL | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+------------------+------------------+------+-----+-------------------+-----------------------------+
8 rows in set (0.15 sec)
mysql> select * from customer_addr limit 5;
+------------------+-------------+--------+-----------+--------------------+-----------------------------------------------------------------------------------------+------------+---------------------+
| customer_addr_id | customer_id | zip | province | city | address | is_default | modified_time |
+------------------+-------------+--------+-----------+--------------------+-----------------------------------------------------------------------------------------+------------+---------------------+
| 1 | 1 | 877100 | 上海市 | 上海市 | 上海市上海市肇嘉浜路2124598号明珠大酒店大堂2层 | 1 | 2022-08-26 21:59:12 |
| 2 | 2 | 209549 | 江苏省 | 江苏省无锡市 | 江苏省无锡市南长区红星路229-73009号20层 | 0 | 2022-08-21 08:34:12 |
| 3 | 3 | 810666 | 上海市 | 上海市 | 上海市上海市南京西路20663863号01C商铺19层 | 1 | 2022-08-24 06:58:12 |
| 4 | 4 | 417668 | 江苏省 | 江苏省南京市 | 江苏省南京市中山路3215585号鼓楼医院南扩新院区门诊大楼一楼18层 | 0 | 2022-08-27 16:53:12 |
| 5 | 5 | 442733 | 江苏省 | 江苏省南京市 | 江苏省南京市玄武区徐庄软件园苏宁大道11014号苏宁总部大楼1F18层 | 0 | 2022-08-26 17:21:12 |
+------------------+-------------+--------+-----------+--------------------+-----------------------------------------------------------------------------------------+------------+---------------------+
5 rows in set (0.16 sec)
这里假设Hive ODS库中没有customer_addr的存量数据表,因此仅做全量抽取。
子任务10实现
编写Spark ETL程序,从mysql的ds_db01.customer_addr表中抽取增量数据到Hive的ods.customer_addr表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "customer_addr") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.customer_addr")
spark.stop()
}
}
子任务11
子任务11描述
11、抽取ds_db01 库中customer_level_inf 的增量数据进入Hive 的ods 库中表customer_level_inf , 根据ods.customer_level_inf 表中modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变, 同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli 执行showpartitions ods.customer_level_inf 命令,将执行结果截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下。
子任务11分析
在MySQL中查看源表customer_level_inf的数据结构和数据内容:
mysql> desc customer_level_inf;
+----------------+------------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+----------------+------------------+------+-----+-------------------+-----------------------------+
| customer_level | tinyint(4) | NO | PRI | NULL | auto_increment |
| level_name | varchar(10) | NO | | NULL | |
| min_point | int(10) unsigned | NO | | 0 | |
| max_point | int(10) unsigned | NO | | 0 | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+----------------+------------------+------+-----+-------------------+-----------------------------+
5 rows in set (0.13 sec)
mysql> select * from customer_level_inf limit 5;
+----------------+--------------+-----------+-----------+---------------------+
| customer_level | level_name | min_point | max_point | modified_time |
+----------------+--------------+-----------+-----------+---------------------+
| 1 | 普通会员 | 0 | 10000 | 2022-09-10 16:30:12 |
| 2 | 青铜 | 10000 | 20000 | 2022-09-10 16:31:12 |
| 3 | 白银 | 20000 | 30000 | 2022-09-10 16:32:12 |
| 4 | 黄金 | 30000 | 40000 | 2022-09-10 16:33:12 |
+----------------+--------------+-----------+-----------+---------------------+
4 rows in set (0.20 sec)
这里假设Hive ODS库中没有customer_level_inf的存量数据表,因此仅做全量抽取。
子任务11实现
编写Spark ETL程序,从mysql的ds_db01.customer_level_inf表中抽取增量数据到Hive的ods.customer_level_inf表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object EtlJob {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("ETL Job")
// 兼容Hive Parquet存储格式
.config("spark.sql.parquet.writeLegacyFormat", "true")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 启用动态分区覆盖模式(根据分区值,覆盖原来的分区)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()
// 定义数据库连接url
val db_url = "jdbc:mysql://localhost:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai"
// ETL管道
spark.read.format("jdbc") // jdbc数据源
.option("driver", "com.mysql.jdbc.Driver") // 数据库驱动程序类名 如果是mysql 7/8,则为com.mysql.cj.jdbc.Driver
.option("url", db_url) // 连接url
.option("dbtable", "customer_level_inf") // 要读取的表
.option("user", "root") // 连接账户
.option("password","admin") // 连接密码
.load() // 加载
.withColumn("etl_date",lit("20231225")) // 增加分区列
// 分区写入Hive ODS
.write
.partitionBy("etl_date") // 指定分区
.format("hive") // 注意这里
.mode("overwrite") // 覆盖写入
.saveAsTable("ss2024_ds_ods.customer_level_inf")
spark.stop()
}
}
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容