2-1.大数据国赛第2套任务B-子任务一数据抽取

MySQL表结构分析

表结构分析

user. _info表

sku_ _info表

base. _province表

base_ region表

order_ info表

order_ detail表

任务要求1

1.1实现流程概要

1.2任务分解

任务要求2

2.1创建ODS层表

2.2创建MySQL表映射

2.3增量导入数据

2.4查看分区

任务要求3

3.1创建ODS层表

3.2创建MySQL表映射

3.3增量导入数据

3.4查看分区

任务要求4

4.1创建ODS层表

4.2创建MySQL表映射

4.3增量导入数据

4.4查看分区

任务要求5

5.1创建ODS层表

5.2创建MySQL表映射

5.3增量导入数据

5.4查看分区

任务要求6

6.1创建ODS层表

6.2创建MySQL表映射

6.3增量导入数据

6.4查看分区

 

 

MySQL表结构分析

抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这

两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

表结构分析

表关系示意图如下:

user. _info表

DROP TABLE IF EXISTS user_info;
CREATE TABLE user_info(
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '编号' ,
  `login_name` VARCHAR(200)   COMMENT '用户名称' ,
  `nick_name` VARCHAR(200)   COMMENT '用户昵称' ,
  `passwd` VARCHAR(200)   COMMENT '用户密码' ,
  `name` VARCHAR(200)   COMMENT '用户姓名' ,
  `phone_num` VARCHAR(200)   COMMENT '手机号' ,
  `email` VARCHAR(200)   COMMENT '邮箱' ,
  `head_img` VARCHAR(200)   COMMENT '头像' ,
  `user_level` VARCHAR(200)   COMMENT '用户级别' ,
  `birthday` DATE   COMMENT '用户生日' ,
  `gender` VARCHAR(1)   COMMENT '性别 M男,F女' ,
  `create_time` DATETIME   COMMENT '创建时间' ,
  `operate_time` DATETIME   COMMENT '修改时间' ,
  PRIMARY KEY (id)
) COMMENT = '用户表';

sku_ _info表

DROP TABLE IF EXISTS sku_info;
CREATE TABLE sku_info(
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)' ,
  `spu_id` BIGINT   COMMENT 'spuid' ,
  `price` DECIMAL(10)   COMMENT '价格' ,
  `sku_name` VARCHAR(200)   COMMENT 'sku名称' ,
  `sku_desc` VARCHAR(2000)   COMMENT '商品规格描述' ,
  `weight` DECIMAL(10,2)   COMMENT '重量' ,
  `tm_id` BIGINT   COMMENT '品牌(冗余)' ,
  `category3_id` BIGINT   COMMENT '三级分类id(冗余)' ,
  `sku_default_img` VARCHAR(200)   COMMENT '默认显示图片(冗余)' ,
  `create_time` DATETIME   COMMENT '创建时间' ,
  PRIMARY KEY (id)
) COMMENT = '库存单元表';

“Sku” 和 “Spu” 是与商品相关的两个重要概念,通常在零售和电子商务领域中使用。

  • SPU(Standard Product Unit) – 标准产品单元:

    • 定义: SPU 是指标准产品单元,它代表一组具有相似属性的商品。通常,这组商品具有相同的基本特征,但可能存在一些细微的差异,例如颜色、尺寸等。

    • 特点: SPU 是对一类产品的抽象,不同的具体商品属于同一个 SPU,因为它们共享相似的基本特征和用途。

    • 例子: 以手机为例,一种型号的手机(如iPhone 12)可以被视为一个 SPU,因为它有相同的基本特征,如品牌、型号、屏幕大小等。

  • SKU(Stock Keeping Unit) – 存货量单位:

    • 定义: SKU 是指存货量单位,它是对具体商品的唯一标识,通常包含了商品的唯一属性和识别信息,如颜色、尺寸、版本等。

    • 特点: SKU 是对商品的具体变体或实例的描述,每个 SKU 都是唯一的。SKU 可以用于精确地跟踪和管理库存,方便售卖和库存管理。

    • 例子: 如果以iPhone 12为SPU,那么不同颜色、不同存储容量和不同尺寸的iPhone 12就可以分别用不同的 SKU 进行标识,如”iPhone 12 – 红色 – 128GB”、”iPhone 12 – 蓝色 – 256GB”等。

在电商业务中,SPU 和 SKU 通常用于描述商品的层次结构和变体。SPU表示一类商品,而SKU则表示这类商品中的具体变体,有助于商家更好地管理商品库存、进行销售和满足客户的个性化需求。

base. _province表

DROP TABLE IF EXISTS base_province;
CREATE TABLE base_province(
  `id` BIGINT   COMMENT 'id' ,
  `name` VARCHAR(20)   COMMENT '省名称' ,
  `region_id` VARCHAR(20)   COMMENT '大区id' ,
  `area_code` VARCHAR(20)   COMMENT '行政区位码' ,
  `iso_code` VARCHAR(20)   COMMENT '国际编码'
) COMMENT = '省份';

base_ region表

DROP TABLE IF EXISTS base_region;
CREATE TABLE base_region(
  `id` VARCHAR(20)   COMMENT '大区id' ,
  `region_name` VARCHAR(20)   COMMENT '大区名称'
) COMMENT = '地区';

order_ info表

DROP TABLE IF EXISTS order_info;
CREATE TABLE order_info(
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '编号' ,
  `consignee` VARCHAR(100)   COMMENT '收货人' ,
  `consignee_tel` VARCHAR(20)   COMMENT '收件人电话' ,
  `total_amount` DECIMAL(16,2)   COMMENT '金额(手动新增字段)' ,
  `final_total_amount` DECIMAL(16,2)   COMMENT '总金额' ,
  `order_status` VARCHAR(20)   COMMENT '订单状态' ,
  `user_id` BIGINT   COMMENT '用户id' ,
  `delivery_address` VARCHAR(1000)   COMMENT '送货地址' ,
  `order_comment` VARCHAR(200)   COMMENT '订单备注' ,
  `out_trade_no` VARCHAR(50)   COMMENT '订单交易编号(第三方支付用)' ,
  `trade_body` VARCHAR(200)   COMMENT '订单描述(第三方支付用)' ,
  `create_time` DATETIME   COMMENT '创建时间' ,
  `operate_time` DATETIME   COMMENT '操作时间' ,
  `expire_time` DATETIME   COMMENT '失效时间' ,
  `tracking_no` VARCHAR(100)   COMMENT '物流单编号' ,
  `parent_order_id` BIGINT   COMMENT '父订单编号' ,
  `img_url` VARCHAR(200)   COMMENT '图片路径' ,
  `province_id` INT   COMMENT '地区' ,
  `benefit_reduce_amount` DECIMAL(16,2)   COMMENT '优惠金额' ,
  `original_total_amount` DECIMAL(16,2)   COMMENT '原价金额' ,
  `feight_fee` DECIMAL(16,2)   COMMENT '运费' ,
  `payment_way` VARCHAR(30)   COMMENT '支付方式' ,
  PRIMARY KEY (id)
) COMMENT = '订单表 订单表';

order_ detail表

DROP TABLE IF EXISTS order_detail;
CREATE TABLE order_detail(
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '编号' ,
  `order_id` BIGINT   COMMENT '订单编号' ,
  `sku_id` BIGINT   COMMENT 'sku_id' ,
  `sku_name` VARCHAR(200)   COMMENT 'sku名称(冗余)' ,
  `img_url` VARCHAR(200)   COMMENT '图片名称(冗余)' ,
  `order_price` DECIMAL(10,2)   COMMENT '购买价格(下单时sku价格)' ,
  `sku_num` VARCHAR(200)   COMMENT '购买个数' ,
  `create_time` DATETIME   COMMENT '创建时间' ,
  PRIMARY KEY (id)
) COMMENT = '订单明细表';

任务要求1

抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

1.1实现流程概要

1.2任务分解

创建ods_ds_hudi.user_info表

建表之前需要判断一下表是否存在,如果不存在才创建。涉及到两个概念,一个是ODS,一个是hudi。

  • 在数据仓库(Data Warehouse)建模中, ODS(Operational Data Store,操作性数据存储)层 作为数据仓库架构的一部分,通常用于整合来自各种操作性系统的数据。这一层的主要目的是将这些数据整合到一个一致的、标准化的格式中,并进行清洗、转换以确保数据的质量。 一般来说ODS层只需要保存从数据库中抽取来的原始数据,不做任何的更改。 DWD (Data Warehouse Detailed Data) 层用于存储原始、详细的事务性数据,这些数据通常来自于企业的各个操作系统和数据源。这一层包含了数据仓库的底层原子数据,记录了每个操作的细节。 DWS层是DWD层的上一层,用于存储已经汇总和聚合过的数据。这一层的数据是经过处理、加工和聚合的,通常用于支持高层次的分析和决策。

  • Hudi(Hadoop Upserts Deletes and Incrementals)是一种开源的数据管理框架,旨在简化和优化Apache Hadoop和Apache Spark等大数据处理引擎上的数据湖(Data Lake)操作。Hudi 提供了一种方式,让用户更方便地实现增量数据存储、更新、删除以及实时查询等操作。

任务中要求分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。也就是说要根据etl_date时间不同,将抽取的数据放到对应etl时间的分区里,那么建表的时候就需要增加etl_date字段,类型为String。 任务要求中提到将id作为primaryKey,operate_time作为preCombineField。建表的时候需要配置表使用hudi,并且将id配置为主键,操作时间作为合并的字段。也就是说hudi会根据主键和操作时间来判断数据是插入还是更新。 建表语句如下:

spark.sql(
 """
   |create table if not exists `ods_ds_hudi`.`user_info` (
   |   `id` int,
   | `login_name` string,
   | `nick_name` string,
   | `passwd` string,
   | `name` string,
   | `phone_num` string,
   | `email` string,
   | `head_img` string,
   | `user_level` string,
   | `birthday` date,
   | `gender` string,
   | `create_time` timestamp,
   | `operate_time` timestamp,
   | `etl_date` string
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'id',
   | preCombineField = 'operate_time'
   |)
   |partitioned by (`etl_date`);
   |""".stripMargin)

语句中的如下部分是关于hudi的相关配置

using hudi
options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'operate_time'
)
  • using hudi: 这一部分指定了使用 Hudi 进行数据操作。

  • options: 这是 Hudi SQL 的 options 子句,用于指定 Hudi 表的一些配置选项。

  • type = ‘cow’: 这个选项指定了 Hudi 表的写入类型,这里是 ‘cow’,表示 Copy on Write。Copy on Write 是 Hudi 的一种存储引擎,它在写入时会复制数据,保留历史版本。与之相对的是 ‘mor’(Merge on Read),它在写入时只更新最新版本,而历史版本会在查询时进行合并。

  • primaryKey = ‘id’: 这个选项指定了主键,即在 Hudi 表中用于唯一标识每条记录的字段。在这里,主键是 ‘id’。

  • preCombineField = ‘operate_time’: 这个选项指定了用于决定版本顺序的字段。Hudi 会使用这个字段的值来判断历史版本的先后顺序。在这里,使用了 ‘operate_time’ 字段。

综合起来,这条 SQL 语句表示要使用 Hudi 操作数据,表的写入类型为 Copy on Write,主键是 ‘id’,并且版本的顺序是通过 ‘operate_time’ 字段来确定的。这些配置可以根据具体的需求进行调整。

如何在spark中查询MySQL中的数据

需要将MySQL中表映射到sparksql中。

spark.sql(
 """
   | create temporary view user_info_tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'user_info',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)
  • create temporary view user_info_tmp: 这部分创建了一个临时视图(temporary view)叫做 “user_info_tmp”。这个视图用于在 Spark 中表示和查询 “user_info” 表的数据。

  • using org.apache.spark.sql.jdbc: 这表明使用了 JDBC 数据源来读取外部数据。具体地,这里使用的是 Spark SQL 的 JDBC 数据源。

  • options(…): 这里指定了连接 JDBC 数据源时的一些配置选项。

    • url ‘jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8’: 指定了 MySQL 数据库的连接信息,包括数据库地址、端口以及其他连接参数。

    • dbtable ‘user_info’: 指定了要读取的数据库表,这里是 “user_info”。

    • user ‘root1’: 指定了连接数据库所使用的用户名。

    • password ‘123456’: 指定了连接数据库所使用的密码。

  • stripMargin: 这是 Scala 语言中的一个特性,用于在多行字符串中去掉每行前面的空格。

获取比赛前一天新增的数据

从MySQL中查询出比赛前一天新增的数据,也就是创建时间是比赛前一天的数据。

select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp where create_time >= '2023-09-01 00:00:00';
获取比赛前一天更新的数据

从MySQL中查询出比赛前一天更新的数据,也就是更新时间是比赛前一天的数据。

select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp where operate_time >= '2023-09-01 00:00:00';
整合新增和更新的查询语句

将更新和新增的查询语句合并成一条sql语句,实现若operate_time为空,则用create_time填充的业务需求,sql如下:

select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp where coalesce(`operate_time`, `create_time`) >= '2023-09-01 00:00:00';

coalesce(…, ”):COALESCE 函数用于返回参数列表中的第一个非空表达式。在这里,如果 ‘operate_time’字段值为空,那么返回 ‘create_time’字段的值。

如何确定更新多少数据

思考两个问题: 第一次导入的时候ods层中没有历史数据,如何处理? 某段时间没有定时导入数据到ods层中,新增和更新的数据时间使用数据抽取的时间是否合理? 以上两种问题就会导致数据导入不完整,故而任务要求中说“根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入”。意思就是说要根据ODS层中历史数据的最近的操作时间(operate_time或create_time谁最新)来确定从MySQL中抽取哪段时间的数据。所以需要从ods层中查询出最大的操作时间。

select
case
  when max(`operate_time`) > max(`create_time`)
  then max(`operate_time`)
  else max(`create_time`)
end
from `ods_ds_hudi`.`user_info`

case when max(operate_time) > max(create_time) then max(operate_time) else max(create_time) end: 这是一个 CASE 语句,用于根据条件判断选择不同的值。具体地,它比较 operate_time 列的最大值和 create_time 列的最大值,如果 operate_time 大于 create_time,则选择 operate_time 的最大值,否则选择 create_time 的最大值。

如果ods层的表中没有数据,那么这时候查到的值为空,这时候需要给个默认值,sql如下:

select
  coalesce(
    case
      when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
      else max(`create_time`)
    end, ''
  )
from `ods_ds_hudi`.`user_info`

coalesce(…, ”):COALESCE 函数用于返回参数列表中的第一个非空表达式。在这里,如果 CASE 语句的结果为空,那么返回空字符串 ” 在sql中日期的比较可以转换成字符串来比较,将查询到的最大时间转换为字符串,sql如下:

select
cast(
  coalesce(
    case
      when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
      else max(`create_time`)
    end, ''
  ) as string
)
from `ods_ds_hudi`.`user_info`

cast(… as string):CAST 函数用于将结果转换为指定的数据类型。在这里,将前面的结果强制转换为字符串类型。 综合起来,整个查询的目的是从 ods_ds_hudi 数据库的 user_info 表中选择一个字符串值,该值是根据比较 operate_time 和 create_time 列的最大值而确定的。如果 operate_time 的最大值大于 create_time 的最大值,则选择 operate_time 的最大值,否则选择 create_time 的最大值。如果这个值为空,那么返回一个空字符串。

从MySQL中查询操作时间大于等于ODS层中最大时间的数据

将查询的ods层中最大的时间作为查询更新数据的参数,将最大时间查询作为子查询放到where条件中,sql如下:

select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp
where
cast(coalesce(`operate_time`, `create_time`) as string) >=
(
   select
    cast(
      coalesce(
        case
          when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
          else max(`create_time`)
        end, ''
      ) as string
    )
   from `ods_ds_hudi`.`user_info`
);

where cast(coalesce(operate_time, create_time) as string) >= …: 这是一个 WHERE 子句,用于过滤数据。它选择了满足条件的行,条件是 operate_time 列或 create_time 列的非空最大值(作为字符串形式)要大于等于子查询的结果。 select cast(coalesce(case when max(operate_time) > max(create_time) then max(operate_time) else max(create_time) end, ”) as string) from ods_ds_hudi.user_info“: 这是一个子查询,它计算了 ods_ds_hudi 数据库中 user_info 表中的 operate_time 和 create_time 列的非空最大值。这个值将被用于与外部查询中的条件比较。

将查询到的更新数据插入到ods_ds_hudi.user_info表中

结合上面的操作,只需要将查询结果直接插入到ods层的表中即可,sql如下:

insert into `ods_ds_hudi`.`user_info` partition(etl_date = '20230901')
select
`id`,
`login_name`,
`nick_name`,
`passwd`,
`name`,
`phone_num`,
`email`,
`head_img`,
`user_level`,
`birthday`,
`gender`,
`create_time`,
`operate_time`
from user_info_tmp
where
cast(coalesce(`operate_time`, `create_time`) as string) >=
(
   select
    cast(
      coalesce(
        case
          when max(`operate_time`) > max(`create_time`) then max(`operate_time`)
          else max(`create_time`)
        end, ''
      ) as string
    )
   from `ods_ds_hudi`.`user_info`
);

insert into ods_ds_hudi.user_info partition(etl_date = ‘20230901’): 这部分指定了数据插入的目标表,即 ods_ds_hudi.user_info,并指定了插入的分区为 etl_date = ‘20230901’。这表示数据将插入到指定分区中。

查看分区

spark.sql("show partitions `ods_ds_hudi`.`user_info` ").show()

任务要求2

抽取shtd_store库中sku_info的增量数据进入Hudi的ods_ds_hudi库中表sku_info。根据ods_ds_hudi.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.sku_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

解题思路同任务要求1

2.1创建ODS层表

spark.sql(
 """
   |create table if not exists `ods_ds_hudi`.`sku_info` (
   | `id` int,
   | `spu_id` int,
   | `price` decimal,
   | `sku_name` string,
   | `sku_desc` string,
   | `weight` decimal,
   | `tm_id` int,
   | `category3_id` int,
   | `sku_default_img` string,
   | `create_time` timestamp,
   | `etl_date` string
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'id',
   | preCombineField = 'create_time'
   |)
   |partitioned by (`etl_date`);
   |""".stripMargin)

2.2创建MySQL表映射

spark.sql(
 """
   | create temporary view sku_info_tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'sku_info',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

2.3增量导入数据

spark.sql("alter table `ods_ds_hudi`.`sku_info` drop partition(etl_date = '20230901') ")
spark.sql(
 """
   |insert into `ods_ds_hudi`.`sku_info` partition(etl_date = '20230901')
   |select
   | `id`,
   | `spu_id`,
   | `price`,
   | `sku_name`,
   | `sku_desc`,
   | `weight`,
   | `tm_id`,
   | `category3_id`,
   | `sku_default_img`,
   | `create_time`
   | from sku_info_tmp
   | where
   | cast(`create_time` as string)>(select cast(coalesce(max(`create_time`),'') as string) from `ods_ds_hudi`.`sku_info` )
   |""".stripMargin)

2.4查看分区

spark.sql("show partitions `ods_ds_hudi`.`sku_info`").show()

任务要求3

抽取shtd_store库中base_province的增量数据进入Hudi的ods_ds_hudi库中表base_province。根据ods_ds_hudi.base_province表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.base_province命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

解题思路同任务要求1

3.1创建ODS层表

spark.sql(
 """
   |create table if not exists `ods_ds_hudi`.`base_province` (
   | `id` int,
   | `name` string,
   | `region_id` string,
   | `area_code` string,
   | `iso_code` string,
   | `create_time` timestamp,
   | `etl_date` string
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'id',
   | preCombineField = 'create_time'
   |)
   |partitioned by (`etl_date`);
   |""".stripMargin)

3.2创建MySQL表映射

spark.sql(
 """
   | create temporary view base_province_tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'base_province',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

3.3增量导入数据

spark.sql("alter table `ods_ds_hudi`.`base_province` drop partition(etl_date = '20230901') ")
spark.sql(
 """
   |insert into `ods_ds_hudi`.`base_province` partition(etl_date = '20230901')
   |select
   | `id`,
   | `name`,
   | `region_id`,
   | `area_code`,
   | `iso_code`,
   | current_timestamp as `create_time`
   | from base_province_tmp
   | where
   | cast(`id` as string)>(select cast(coalesce(max(`id`),'') as string) from `ods_ds_hudi`.`base_province` )
   |""".stripMargin)

3.4查看分区

spark.sql("show partitions `ods_ds_hudi`.`base_province`").show()

任务要求4

抽取shtd_store库中base_region的增量数据进入Hudi的ods_ds_hudi库中表base_region。根据ods_ds_hudi.base_region表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.base_region命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

解题思路同任务要求1

4.1创建ODS层表

spark.sql(
 """
   |create table if not exists `ods_ds_hudi`.`base_region` (
   | `id` string,
   | `region_name` string,
   | `create_time` timestamp,
   | `etl_date` string
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'id',
   | preCombineField = 'create_time'
   |)
   |partitioned by (`etl_date`);
   |""".stripMargin)

4.2创建MySQL表映射

spark.sql(
 """
   | create temporary view base_region_tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'base_region',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

4.3增量导入数据

spark.sql("alter table `ods_ds_hudi`.`base_region` drop partition(etl_date = '20230901') ")
spark.sql(
 """
   |insert into `ods_ds_hudi`.`base_region` partition(etl_date = '20230901')
   |select
   | `id`,
   | `region_name`,
   | current_timestamp as `create_time`
   | from base_region_tmp
   | where
   | cast(`id` as string)>(select cast(coalesce(max(`id`),'') as string) from `ods_ds_hudi`.`base_region` )
   |""".stripMargin)

4.4查看分区

spark.sql("show partitions `ods_ds_hudi`.`base_region`").show()

任务要求5

抽取shtd_store库中order_info的增量数据进入Hudi的ods_ds_hudi库中表order_info,根据ods_ds_hudi.order_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.order_info命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下;

解题思路同任务要求1

5.1创建ODS层表

spark.sql(
 """
   |create table if not exists `ods_ds_hudi`.`order_info` (
   | `id` int,
   | `consignee` string,
   | `consignee_tel` string,
   | `total_amount` decimal,
   | `order_status` string,
   | `user_id` int,
   | `delivery_address` string,
   | `order_comment` string,
   | `out_trade_no` string,
   | `trade_body` string,
   | `create_time` timestamp,
   | `operate_time` timestamp,
   | `expire_time` timestamp,
   | `tracking_no` string,
   | `parent_order_id` int,
   | `img_url` string,
   | `province_id` int,
   | `benefit_reduce_amount` decimal,
   | `original_total_amount` decimal,
   | `feight_fee` decimal,
   | `payment_way` string,
   | `etl_date` string
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'id',
   | preCombineField = 'operate_time'
   |)
   |partitioned by (`etl_date`);
   |""".stripMargin)

5.2创建MySQL表映射

spark.sql(
 """
   | create temporary view order_info_tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'order_info',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

5.3增量导入数据

spark.sql("alter table `ods_ds_hudi`.`order_info` drop partition(etl_date = '20230901') ")
spark.sql(
 """
   |insert into `ods_ds_hudi`.`order_info` partition(etl_date = '20230901')
   |select
   | `id`,
   | `consignee` ,
   | `consignee_tel`,
   | `total_amount`,
   | `order_status`,
   | `user_id`,
   | `delivery_address`,
   | `order_comment`,
   | `out_trade_no`,
   | `trade_body`,
   | `create_time`,
   | `operate_time`,
   | `expire_time`,
   | `tracking_no`,
   | `parent_order_id`,
   | `img_url`,
   | `province_id`,
   | `benefit_reduce_amount`,
   | `original_total_amount`,
   | `feight_fee`,
   | `payment_way`
   | from order_info_tmp
   | where
   | cast(coalesce(`operate_time`,`create_time`) as string) >(select cast(coalesce(case when max(`operate_time`) > max(`create_time`) then max(`operate_time`) else max(`create_time`) end,'') as string) from `ods_ds_hudi`.`order_info` )
   |""".stripMargin)

5.4查看分区

spark.sql("show partitions `ods_ds_hudi`.`order_info`").show()

任务要求6

抽取shtd_store库中order_detail的增量数据进入Hudi的ods_ds_hudi库中表order_detail,根据ods_ds_hudi.order_detail表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,create_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.order_detail命令,将结果截图粘贴至客户端桌面【Release任务B提交结果.docx】中对应的任务序号下。

解题思路同任务要求1

6.1创建ODS层表

spark.sql(
 """
   |create table if not exists `ods_ds_hudi`.`order_detail` (
   | `id` int,
   | `order_id` int,
   | `sku_id` int,
   | `sku_name` string,
   | `img_url` string,
   | `order_price` decimal,
   | `sku_num` int,
   | `create_time` timestamp,
   | `etl_date` string
   | )using hudi
   |options (
   | type = 'cow',
   | primaryKey = 'id',
   | preCombineField = 'create_time'
   |)
   |partitioned by (`etl_date`);
   |""".stripMargin)

6.2创建MySQL表映射

spark.sql(
 """
   | create temporary view order_detail_tmp
   |   using org.apache.spark.sql.jdbc
   |options(
   |   url 'jdbc:mysql://127.0.0.1:3306/shtd_store?useSSL=false&useUnicode=true&characterEncoding=utf8',
   |   dbtable 'order_detail',
   |   user 'root1',
   |   password '123456'
   |)
   |""".stripMargin)

6.3增量导入数据

spark.sql("alter table `ods_ds_hudi`.`order_detail` drop partition(etl_date = '20230901') ")
spark.sql(
 """
   |insert into `ods_ds_hudi`.`order_detail` partition(etl_date = '20230901')
   |select
   | `id`,
   | `order_id`,
   | `sku_id`,
   | `sku_name`,
   | `img_url`,
   | `order_price`,
   | `sku_num`,
   | `create_time`
   | from order_detail_tmp
   | where
   | cast(`create_time` as string) >(select   cast(coalesce(max(`create_time`),'') as string) from `ods_ds_hudi`.`order_detail` )
   |""".stripMargin)

6.4查看分区

spark.sql("show partitions `ods_ds_hudi`.`order_detail`").show()
© 版权声明
THE END
喜欢就支持一下吧
点赞248赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容