Spark合并Hive ODS离线数据和HBase实时数据,并存入Hive DWD层

任务描述

抽取 ods 库中表 table4 最新分区的数据,并结合 HBase 中 table4 offline表中的数据合并抽取到 dwd 库中 fact table4 的分区表,分区字段为etl date 且值与 ods 库的相对应表该值相等,并添加 dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd modify_time 四列,其中dwd_insert_user、dwd_modify_user 均填写“userl”,dwd_insert_time、dwd_modify_time 均填写当前操作时间(年月日必须是今天,时分秒只需在比赛时间范围内即可),抽取 HBase 中的数据时,只抽取 XXXX 年 XX 月XX日的数据 (以 rowkey 为准),并进行数据类型转换。使用 hive cli 查询modified_time为XXXX年XX月XX号当天的数据,并按照id进行升序排序,将结果截图复制粘贴至客户端桌面[Release\模块 D 提交结果docx]中对应的任务序号下;“

任务分析

这是今年(2024年)各省的省赛中普遍出现的一个任务需求。该任务的实质是模拟实现大数据架构中应用Spark对离线链路数据(Hive ODS)和实时链路数据(HBase)实现T+1的数据合并。

在这个任务中,涉及以下技术点:

  • Spark + Hive整合。

  • Spark 读写Hive表。

  • Spark + HBase整合。

  • Spark读写HBase表。

  • Spark查询HBase表时应用过滤器。

Spark整合HBase

HBase依赖于Zookeeper和Hadoop,Spark读取HBase数据需要连接到Zookeeper,通过Zookeeper访问HBase分区表(region)。在Spark中整合HBase有两种方式:

(1)第一种是将hbase-site.xml文件拷贝到项目的resources目录下。

(2)第二种是在HBaseConfiguration实例中设置。

在本教程使用的是第二种方式。

鉴于比赛中赛方可能不提供spark-hbase连接器,因此本示例使用HBase自带的API。请注意,在HBase 中有两个相关的包:org.apache.hadoop.hbase.mapred 和 org.apache.hadoop.hbase.mapreduce。其中前者使用旧式 API,后者使用新式API。 在本示例中使用新的API,使用 saveAsNewAPIHadoopDataset()方法将数据写入HBase,使用 newAPIHadoopRDD从HBase读取数据到RDD中。

本案例思路

本案例教程在PBCP2023平台上进行测试,应用内置电商数据中的订单表order_master作为示例数据。

在这个示例中,将按以下步骤展开:

(1)先从MySQL的order_master中读取2022-04-01这天的数据加载到Hive ODS中,用作存量历史数据;

(2)再从MySQL的order_master中读取2022-04-02这天的数据存储到HBase表中,模拟实时链路新增数据。

(3)编写Spark作业,从Hive ODS中加载离线批处理数据(2022-04-01这天的数据),从HBase中加载实时链路T+1数据(2022-04-02这天的数据),然后将这两者合并, 并按任务要求进行相应的转换和增加字段,最后写入到Hive 的DWD层中。

快速链接跳转

本教程包含以下几个部分,单击目录可快速跳转到相应内容:

  • 第一部分:准备Hive存量数据和HBase新增数据

  • 第二部分:合并Hive存量数据和HBase新增数据

  • 第三部分:可能遇到的异常及解决方法

准备工作

1)启动大数据环境

首先启动Hadoop集群。在终端窗口运行如下命令:

# start-all.sh

启动HBase集群。在终端窗口运行如下命令:

# start-hbase.sh

2)启动Hive Metastore服务

Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:

# hive --service metastore

这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:

# nohup hive --service metastore &

这个命令将启动Hive Metastore服务,并在后台持续运行。

3)创建HBase表

使用hbase shell,进入到hbase命令行。打开终端窗口,执行如下命令:

# hbase shell

使用create命令,创建HBase表:

hbase> create 'hbase_order_master','ods'

使用scan命令,扫描HBase表(查看HBase表内容):

hbase> scan 'hbase_order_master'

第一部分:准备Hive存量数据和HBase新增数据

要在Spark中访问HBase,需要在pom.xml中添加如下依赖:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>${hbase.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>${hbase.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-mapreduce</artifactId>
    <version>${hbase.version}</version>
</dependency>

编写一个Spark程序,先抽取mysql中的order_master订单表中2022-04-01这天的数据加载到Hive ODS中,用作存量历史数据; 再抽取mysql中的order_master订单表中2022-04-02这天的数据存储到HBase表中,模拟实时链路新增数据。

实现代码如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.time.format.DateTimeFormatter

object ReadWriteHBaseDemo1 {

def main(args: Array[String]): Unit = {

  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .master("local[*]")
    .appName("Spark and HBase Demo")
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 需要根据分区值,覆盖原来的分区时,需要配置的参数
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    // hive兼容格式存储
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    .enableHiveSupport()
    .getOrCreate()

  // jdbc url
  val db_url = "jdbc:mysql://192.168.190.139:3306/ds_db01?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"

  // 1)读取ds_db01.order_master中2022-04-01的数据,写入Hive ODS,模拟存量批数据
  val df_first_day = spark.read
    .format("jdbc")
    .option("url", db_url)
    .option("query", "select * from order_master where date(modified_time)='2022-04-01'")
    .option("user", "root")         // 注意替换为自己的mysql用户
    .option("password", "admin")   // 注意替换为自己的mysql密码
    .load()
    .withColumn("etl_date",lit("20231131"))
    .limit(10)   // 取10条

  // 分区写入Hive ODS
  df_first_day.write
    .format("parquet")
    .mode("overwrite")     // 覆盖
    .partitionBy("etl_date")   // 指定分区
    .saveAsTable("ods.order_master")

  // 2)读取ds_db01.order_master中2022-04-02的数据,写入HBase表,模拟实时增量数据
  val df_second_day = spark.read
    .format("jdbc")
    .option("url", db_url)
    .option("query", "select * from order_master where date(modified_time)='2022-04-02'")
    .option("user", "root")         // 注意替换为自己的mysql用户
    .option("password", "admin")   // 注意替换为自己的mysql密码
    .load()
    .limit(10)   // 也取10条

  // 将DataFrame(订单数据)存储到HBase 表中
  // 事先在HBase shell中创建表:create 'hbase_order_master','ods'
  save(df_second_day)

  // 停止会话连接
  spark.stop()
}

// 使用 saveAsNewAPIHadoopDataset 写入数据
def save(df:DataFrame):Unit = {
  val tablename = "hbase_order_master" // 要写入的hbase表名

  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum","192.168.190.139") //设置zooKeeper集群地址,也可以通过将hbase-site.xml拷贝到resources目录下
  hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //设置zookeeper连接端口,默认2181
  hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

  // 初始化job,设置输出格式是org.apache.hadoop.hbase.mapred.TableOutputFormat
  val jobConf = new JobConf(hbaseConf)
  //设置job的输出格式
  val job = Job.getInstance(jobConf)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

  val rdd = df.rdd.map{ row =>
    /* 一个Put对象就是一行记录,在构造方法中指定主键
      * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
      *
      * rowkey使用随机数(0-9) + yyyyMMddHHmmssSSS
      *
      * 注:这里使用订单的 modified_time 字段值和随机数构造rowkey。
      * */
    // 构造rowkey row.getTimestamp(22).toString
//     val currentTime = LocalDateTime.now()
    val currentTime = row.getTimestamp(22).toLocalDateTime   // 使用订单的 modified_time 字段值
    val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
    val timestamp = currentTime.format(formatter)

    // 构造rowkey
    val rowkey = scala.util.Random.nextInt(10).toString + timestamp

    // 创建一条Put,添加行键
    val put = new Put(Bytes.toBytes(rowkey))

    // 添加各列(共23列)
    // Put.addColumn 方法接收三个参数:列族,列名,数据
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_id"),Bytes.toBytes(row.getLong(0)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_sn"),Bytes.toBytes(row.getString(1)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("customer_id"),Bytes.toBytes(row.getLong(2)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_user"),Bytes.toBytes(row.getString(3)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("province"),Bytes.toBytes(row.getString(4)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("city"),Bytes.toBytes(row.getString(5)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("address"),Bytes.toBytes(row.getString(6)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_source"),Bytes.toBytes(row.getInt(7)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("payment_method"),Bytes.toBytes(row.getInt(8)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_money"),Bytes.toBytes(row.getDecimal(9)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("district_money"),Bytes.toBytes(row.getDecimal(10)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_money"),Bytes.toBytes(row.getDecimal(11)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("payment_money"),Bytes.toBytes(row.getDecimal(12)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_comp_name"),Bytes.toBytes(row.getString(13)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_sn"),Bytes.toBytes(row.getString(14)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("create_time"),Bytes.toBytes(row.getString(15)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_time"),Bytes.toBytes(row.getString(16)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("pay_time"),Bytes.toBytes(row.getString(17)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("receive_time"),Bytes.toBytes(row.getString(18)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_status"),Bytes.toBytes(row.getString(19)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_point"),Bytes.toBytes(row.getLong(20)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("invoice_title"),Bytes.toBytes(row.getString(21)))
    put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("modified_time"),Bytes.toBytes(row.getTimestamp(22).toString))
    (new ImmutableBytesWritable, put)
  }
  // 存储到HBase
  rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}

运行以上程序,然后分别到Hive Cli和 hbase shell中查看写入的数据。

第二部分:合并Hive存量数据和HBase新增数据

再编写一个Spark程序,分别从Hive ODS加载离线数据,从HBase中加载实时链路存量数据,然后合并这两个数据集,并根据任务要求进行适当的数据类型转换和数据处理。实现的代码如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.CompareOperator
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object ReadWriteHBaseDemo2 {

def main(args: Array[String]): Unit = {
  // 屏蔽不必要的日志显示在终端上
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

  // 设置Spark配置信息
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark and HBase Demo")

  // 设置使用Kryo方式序列化,目的是为了方便将数据返回到driver端测试(如无此需要,可不设置)
  sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
  // 注册使用Kryo进行序列化的类
  sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable],classOf[Result]))

  // 创建SparkSession实例
  val spark = SparkSession.builder()
    .config(conf = sparkConf)
    // 解决时区问题
//     .config("spark.sql.session.timeZone", "Asia/Shanghai") // UTC或CST或Asia/Shanghai
    // 打开Hive动态分区的标志
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    // 需要根据分区值,覆盖原来的分区时,需要配置的参数
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    // hive兼容格式存储
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    .enableHiveSupport()
    .getOrCreate()

  // 1)从Hive ODS读取存量订单数据
  val hive_ods_order_master = spark.table("ods.order_master")

//   hive_ods_order_master.printSchema()
//   hive_ods_order_master.show()
  println("hive ods ------------------------------------------")
  /*
root
|-- order_id: long (nullable = true)
|-- order_sn: string (nullable = true)
|-- customer_id: long (nullable = true)
|-- shipping_user: string (nullable = true)
|-- province: string (nullable = true)
|-- city: string (nullable = true)
|-- address: string (nullable = true)
|-- order_source: integer (nullable = true)
|-- payment_method: integer (nullable = true)
|-- order_money: decimal(8,2) (nullable = true)
|-- district_money: decimal(8,2) (nullable = true)
|-- shipping_money: decimal(8,2) (nullable = true)
|-- payment_money: decimal(8,2) (nullable = true)
|-- shipping_comp_name: string (nullable = true)
|-- shipping_sn: string (nullable = true)
|-- create_time: string (nullable = true)
|-- shipping_time: string (nullable = true)
|-- pay_time: string (nullable = true)
|-- receive_time: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_point: long (nullable = true)
|-- invoice_title: string (nullable = true)
|-- modified_time: timestamp (nullable = true)
|-- etl_date: string (nullable = true)

+--------+----------------+-----------+-------------+--------+-------------------------------+---------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+----------------------+-------------------+--------+
|order_id|       order_sn|customer_id|shipping_user|province|                           city|                         address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn|   create_time| shipping_time|     pay_time| receive_time|order_status|order_point|         invoice_title|     modified_time|etl_date|
+--------+----------------+-----------+-------------+--------+-------------------------------+---------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+----------------------+-------------------+--------+
|   2063|2022032686959471|       7193|         卢鹏| 上海市|                         上海市| 上海市上海市黄浦区九江路59587...|           1|             1|   7426.38|       742.64|         18.90|     6702.64|             中通|1177680592862|20220327090656|20220329023556|20220328030856|20220330193056|     已退款|       670|     群英网络有限公司|2022-04-01 03:26:56|20231131|
|   2140|2022032684573934|     15572|         雷建| 上海市|                         上海市| 上海市上海市吴江路1288278号20层|           1|             2|   1510.76|       604.30|         12.46|       918.92|             顺丰|5086878998859|20220327083423|20220328114623|20220328063023|20220331015823|     已退款|         92|   易动力网络有限公司|2022-04-01 05:11:23|20231131|
|   2296|2022032845756607|     10681|       张桂兰| 上海市|                         上海市|   上海市上海市长寿路11182266...|           1|             2|   7026.05|       1405.21|         30.09|     5650.93|             顺丰|9906913860087|20220328161815|20220329152015|20220329112315|20220401053815|     已签收|       565|     银嘉信息有限公司|2022-04-01 05:38:15|20231131|
|   2312|2022032987273268|     16796|         侯涛| 浙江省|浙江省杭州江干区景昙路18-262...| 浙江省杭州江干区景昙路18-262...|           1|             4|     707.45|         95.00|         17.69|       630.14|             韵达|2046035743206|20220329125319|20220330152619|20220330020919|20220401162419|     已签收|         63|济南亿次元网络有限公司|2022-04-01 16:24:19|20231131|
|   2326|2022032763122037|     19980|       魏金凤| 浙江省|                   浙江省杭州市| 浙江省杭州市西湖区古墩路58884...|           1|             3|   2228.43|       1559.90|         13.91|       682.44|             中通|1577073377562|20220328041523|20220329193723|20220329073623|20220401004923|     已签收|         68|     南康网络有限公司|2022-04-01 00:49:23|20231131|
|   2342|2022032894717891|       8972|         符强| 上海市|                         上海市| 上海市上海市沪青平公路288820...|           1|             4|   6173.30|       4321.31|         10.36|     1862.35|             顺丰|9695161512380|20220328163430|20220329123530|20220329091330|20220401044230|     已签收|       186| 方正科技信息有限公司|2022-04-01 04:42:30|20231131|
|   2350|2022032887041347|       4597|         姜岩| 江苏省|                   江苏省无锡市| 江苏省无锡市南长区南长街20121...|           1|             4|   4314.53|       2157.27|         41.27|     2198.54|             中通|5283447205985|20220328185032|20220329161332|20220329062132|20220401042032|     已签收|       220|     恩悌信息有限公司|2022-04-01 04:20:32|20231131|
|   2354|2022032733529686|     14993|         尹晶| 上海市|                         上海市|   上海市上海市四川北路1661767...|           1|             2|   8535.21|         95.00|         20.41|     8460.62|             中通|3976165724549|20220328090635|20220329220435|20220329084235|20220401015335|     已签收|       846| 立信电子传媒有限公司|2022-04-01 01:53:35|20231131|
|   2362|2022032714983348|       7925|       罗淑华| 上海市|                         上海市|   上海市上海市常德路8008918号8层|           1|             4|   5286.17|         80.00|         15.18|     5221.35|             韵达|9110286972135|20220328050040|20220329180940|20220329094040|20220401044540|     已签收|       522|   易动力科技有限公司|2022-04-01 04:45:40|20231131|
|   2370|2022032814968220|     14159|         邢涛| 上海市|                         上海市|上海市上海市嘉定区佳通路31弄34...|           1|             2|   6690.56|         80.00|         42.28|     6652.84|             顺丰|9638866501792|20220328052340|20220330002240|20220329091840|20220401041340|     已签收|       665| 凌颖信息科技有限公司|2022-04-01 04:13:40|20231131|
+--------+----------------+-----------+-------------+--------+-------------------------------+---------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+----------------------+-------------------+--------+

    */

  // 2)从HBase表中读取数据到DataFrame中
  val hbase_ods_order_master = read(spark)
    // 金额相关的列,数据类型转换(Decimal(38,18) -> Decimal(8,2)
    .withColumn("order_money", col("order_money").cast(DecimalType(8,2)))
    .withColumn("district_money", col("district_money").cast(DecimalType(8,2)))
    .withColumn("shipping_money", col("shipping_money").cast(DecimalType(8,2)))
    .withColumn("payment_money", col("payment_money").cast(DecimalType(8,2)))
    // modified_time列,数据类型转换(String -> Timestamp
    .withColumn("modified_time", col("modified_time").cast(TimestampType))
    // 增加分区列
    .withColumn("etl_date",lit("20231131"))

//   hbase_ods_order_master.printSchema()
//   hbase_ods_order_master.show()
  println("hbase ods ------------------------------------------")
  /*
root
|-- order_id: long (nullable = false)
|-- order_sn: string (nullable = true)
|-- customer_id: long (nullable = false)
|-- shipping_user: string (nullable = true)
|-- province: string (nullable = true)
|-- city: string (nullable = true)
|-- address: string (nullable = true)
|-- order_source: integer (nullable = false)
|-- payment_method: integer (nullable = false)
|-- order_money: decimal(8,2) (nullable = true)
|-- district_money: decimal(8,2) (nullable = true)
|-- shipping_money: decimal(8,2) (nullable = true)
|-- payment_money: decimal(8,2) (nullable = true)
|-- shipping_comp_name: string (nullable = true)
|-- shipping_sn: string (nullable = true)
|-- create_time: string (nullable = true)
|-- shipping_time: string (nullable = true)
|-- pay_time: string (nullable = true)
|-- receive_time: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_point: long (nullable = false)
|-- invoice_title: string (nullable = true)
|-- modified_time: timestamp (nullable = true)
|-- etl_date: string (nullable = false)

+--------+----------------+-----------+-------------+--------+------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+
|order_id|       order_sn|customer_id|shipping_user|province|             city|                             address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn|   create_time| shipping_time|     pay_time| receive_time|order_status|order_point|           invoice_title|     modified_time|etl_date|
+--------+----------------+-----------+-------------+--------+------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+
|   2334|2022032859756405|       7370|       黄丽娟| 江苏省|     江苏省苏州市|     江苏省苏州市现代大道1699409...|           1|             1|   2229.52|       100.00|         11.01|     2140.53|             中通|4278051015984|20220329082028|20220330154728|20220330073228|20220402042228|     已签收|       214|惠派国际公司信息有限公司|2022-04-02 04:22:28|20231131|
|   2403|2022032788808370|       4541|       阎桂香| 江苏省|     江苏省无锡市|     江苏省无锡市解放东路1000464...|           1|             2|   1052.92|       210.58|         13.67|       856.01|             顺丰|4476220310056|20220328173448|20220330022648|20220329012648|20220401031848|     已退款|         86|       昊嘉科技有限公司|2022-04-02 05:40:48|20231131|
|   2459|2022032955711624|     15084|         黄云| 上海市|           上海市|   上海市上海市水城南路207986号17层|           1|             2|   1720.08|       1204.06|         14.02|       530.04|             顺丰|5130441519061|20220329122210|20220331020110|20220329234010|20220402020310|     已签收|         53| 新格林耐特传媒有限公司|2022-04-02 02:03:10|20231131|
|   2501|2022032965004110|       7594|         谢帆| 贵州省|     贵州省贵阳市|贵州省贵阳市云岩区红边门店普陀路1...|           1|             5|   7990.56|       6392.45|         13.42|     1611.53|             顺丰|8154839863090|20220330170823|20220331215023|20220330233023|20220402213123|     已签收|       161|   合联电子传媒有限公司|2022-04-02 21:31:23|20231131|
|   2529|2022032866253715|     15300|         姚博| 上海市|           上海市|   上海市上海市嘉定区佳通路31弄39...|           1|             4|     515.96|       257.98|         27.30|       285.28|             韵达|4472053332463|20220329183828|20220330215828|20220329191128|20220402034728|     已签收|         29| 济南亿次元信息有限公司|2022-04-02 03:47:28|20231131|
|   2513|2022032810875003|       6753|         卿柳| 上海市|           上海市|     上海市上海市辛耕路1299124号14层|           1|             3|   2888.15|       2021.71|         17.57|       884.02|             顺丰|0370967098533|20220329062624|20220330114924|20220330080924|20220402002524|     已签收|         88|       良诺信息有限公司|2022-04-02 00:25:24|20231131|
|   2435|2022032844826283|     19014|       张桂兰| 江苏省|江苏省江苏省淮安市|江苏省江苏省淮安市清河区翔宇中路1...|           1|             1|   2602.38|         80.00|         40.23|     2562.61|             韵达|1484782939199|20220329050759|20220330202559|20220330085359|20220402030259|     已签收|       256|   网新恒天网络有限公司|2022-04-02 03:02:59|20231131|
|   2322|2022032881798097|     18548|       陈丹丹| 江苏省|     江苏省溧阳市|     江苏省溧阳市天目路123956号上...|           1|             2|   2043.64|       100.00|         14.60|     1958.24|             韵达|8595786486688|20220328061921|20220329121521|20220328194921|20220331172121|     已退款|       196|       彩虹信息有限公司|2022-04-02 03:58:21|20231131|
|   2553|2022032935583382|       6329|         刘婷| 江苏省|     江苏省苏州市|     江苏省苏州市金鸡湖路885370号...|           1|             3|   1734.24|       1387.39|         45.83|       392.68|             韵达|3143365276943|20220330091539|20220331200739|20220330190339|20220402162639|     已签收|         39|     MBP软件信息有限公司|2022-04-02 16:26:39|20231131|
|   2537|2022032818405314|     11139|         张旭| 上海市|           上海市| 上海市上海市浦东新区杨高南路428...|           1|             3|   1448.48|       100.00|         28.55|     1377.03|             顺丰|9631168546997|20220329185733|20220330201433|20220329191033|20220402035433|     已签收|       138|     飞利信信息有限公司|2022-04-02 03:54:33|20231131|
+--------+----------------+-----------+-------------+--------+------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+

  */

  // 3)合并存量数据和增量数据,写入Hive DWD层
  val fact_order_master = hive_ods_order_master
    // 垂直合并
    .union(hbase_ods_order_master)
    // 增加新的列
    .withColumn("dwd_insert_user",lit("user1"))
    .withColumn("dwd_insert_time",date_trunc("second",current_timestamp()))     // spark 2.3.0
    .withColumn("dwd_modify_user",lit("user1"))
    .withColumn("dwd_modify_time",date_trunc("second",current_timestamp()))     // spark 2.3.0

  fact_order_master.printSchema()
  fact_order_master.show()
  println("合并后 ------------------------------------------")
  /*
root
|-- order_id: long (nullable = true)
|-- order_sn: string (nullable = true)
|-- customer_id: long (nullable = true)
|-- shipping_user: string (nullable = true)
|-- province: string (nullable = true)
|-- city: string (nullable = true)
|-- address: string (nullable = true)
|-- order_source: integer (nullable = true)
|-- payment_method: integer (nullable = true)
|-- order_money: decimal(8,2) (nullable = true)
|-- district_money: decimal(8,2) (nullable = true)
|-- shipping_money: decimal(8,2) (nullable = true)
|-- payment_money: decimal(8,2) (nullable = true)
|-- shipping_comp_name: string (nullable = true)
|-- shipping_sn: string (nullable = true)
|-- create_time: string (nullable = true)
|-- shipping_time: string (nullable = true)
|-- pay_time: string (nullable = true)
|-- receive_time: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_point: long (nullable = true)
|-- invoice_title: string (nullable = true)
|-- modified_time: timestamp (nullable = true)
|-- etl_date: string (nullable = true)
|-- dwd_insert_user: string (nullable = false)
|-- dwd_insert_time: timestamp (nullable = true)
|-- dwd_modify_user: string (nullable = false)
|-- dwd_modify_time: timestamp (nullable = true)

+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+---------------+-------------------+---------------+-------------------+
|order_id|       order_sn|customer_id|shipping_user|province|                           city|                             address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn|   create_time| shipping_time|     pay_time| receive_time|order_status|order_point|           invoice_title|     modified_time|etl_date|dwd_insert_user|   dwd_insert_time|dwd_modify_user|   dwd_modify_time|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+---------------+-------------------+---------------+-------------------+
|   2063|2022032686959471|       7193|         卢鹏| 上海市|                         上海市|   上海市上海市黄浦区九江路59587...|           1|             1|   7426.38|       742.64|         18.90|     6702.64|             中通|1177680592862|20220327090656|20220329023556|20220328030856|20220330193056|     已退款|       670|       群英网络有限公司|2022-04-01 03:26:56|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2140|2022032684573934|     15572|         雷建| 上海市|                         上海市|     上海市上海市吴江路1288278号20层|           1|             2|   1510.76|       604.30|         12.46|       918.92|             顺丰|5086878998859|20220327083423|20220328114623|20220328063023|20220331015823|     已退款|         92|     易动力网络有限公司|2022-04-01 05:11:23|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2296|2022032845756607|     10681|       张桂兰| 上海市|                         上海市|       上海市上海市长寿路11182266...|           1|             2|   7026.05|       1405.21|         30.09|     5650.93|             顺丰|9906913860087|20220328161815|20220329152015|20220329112315|20220401053815|     已签收|       565|       银嘉信息有限公司|2022-04-01 05:38:15|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2312|2022032987273268|     16796|         侯涛| 浙江省|浙江省杭州江干区景昙路18-262...|     浙江省杭州江干区景昙路18-262...|           1|             4|     707.45|         95.00|         17.69|       630.14|             韵达|2046035743206|20220329125319|20220330152619|20220330020919|20220401162419|     已签收|         63| 济南亿次元网络有限公司|2022-04-01 16:24:19|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2326|2022032763122037|     19980|       魏金凤| 浙江省|                   浙江省杭州市|   浙江省杭州市西湖区古墩路58884...|           1|             3|   2228.43|       1559.90|         13.91|       682.44|             中通|1577073377562|20220328041523|20220329193723|20220329073623|20220401004923|     已签收|         68|       南康网络有限公司|2022-04-01 00:49:23|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2342|2022032894717891|       8972|         符强| 上海市|                         上海市|     上海市上海市沪青平公路288820...|           1|             4|   6173.30|       4321.31|         10.36|     1862.35|             顺丰|9695161512380|20220328163430|20220329123530|20220329091330|20220401044230|     已签收|       186|   方正科技信息有限公司|2022-04-01 04:42:30|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2350|2022032887041347|       4597|         姜岩| 江苏省|                   江苏省无锡市|   江苏省无锡市南长区南长街20121...|           1|             4|   4314.53|       2157.27|         41.27|     2198.54|             中通|5283447205985|20220328185032|20220329161332|20220329062132|20220401042032|     已签收|       220|       恩悌信息有限公司|2022-04-01 04:20:32|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2354|2022032733529686|     14993|         尹晶| 上海市|                         上海市|     上海市上海市四川北路1661767...|           1|             2|   8535.21|         95.00|         20.41|     8460.62|             中通|3976165724549|20220328090635|20220329220435|20220329084235|20220401015335|     已签收|       846|   立信电子传媒有限公司|2022-04-01 01:53:35|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2362|2022032714983348|       7925|       罗淑华| 上海市|                         上海市|     上海市上海市常德路8008918号8层|           1|             4|   5286.17|         80.00|         15.18|     5221.35|             韵达|9110286972135|20220328050040|20220329180940|20220329094040|20220401044540|     已签收|       522|     易动力科技有限公司|2022-04-01 04:45:40|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2370|2022032814968220|     14159|         邢涛| 上海市|                         上海市|   上海市上海市嘉定区佳通路31弄34...|           1|             2|   6690.56|         80.00|         42.28|     6652.84|             顺丰|9638866501792|20220328052340|20220330002240|20220329091840|20220401041340|     已签收|       665|   凌颖信息科技有限公司|2022-04-01 04:13:40|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2529|2022032866253715|     15300|         姚博| 上海市|                         上海市|   上海市上海市嘉定区佳通路31弄39...|           1|             4|     515.96|       257.98|         27.30|       285.28|             韵达|4472053332463|20220329183828|20220330215828|20220329191128|20220402034728|     已签收|         29| 济南亿次元信息有限公司|2022-04-02 03:47:28|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2513|2022032810875003|       6753|         卿柳| 上海市|                         上海市|     上海市上海市辛耕路1299124号14层|           1|             3|   2888.15|       2021.71|         17.57|       884.02|             顺丰|0370967098533|20220329062624|20220330114924|20220330080924|20220402002524|     已签收|         88|       良诺信息有限公司|2022-04-02 00:25:24|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2435|2022032844826283|     19014|       张桂兰| 江苏省|             江苏省江苏省淮安市|江苏省江苏省淮安市清河区翔宇中路1...|           1|             1|   2602.38|         80.00|         40.23|     2562.61|             韵达|1484782939199|20220329050759|20220330202559|20220330085359|20220402030259|     已签收|       256|   网新恒天网络有限公司|2022-04-02 03:02:59|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2322|2022032881798097|     18548|       陈丹丹| 江苏省|                   江苏省溧阳市|     江苏省溧阳市天目路123956号上...|           1|             2|   2043.64|       100.00|         14.60|     1958.24|             韵达|8595786486688|20220328061921|20220329121521|20220328194921|20220331172121|     已退款|       196|       彩虹信息有限公司|2022-04-02 03:58:21|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2553|2022032935583382|       6329|         刘婷| 江苏省|                   江苏省苏州市|     江苏省苏州市金鸡湖路885370号...|           1|             3|   1734.24|       1387.39|         45.83|       392.68|             韵达|3143365276943|20220330091539|20220331200739|20220330190339|20220402162639|     已签收|         39|     MBP软件信息有限公司|2022-04-02 16:26:39|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2537|2022032818405314|     11139|         张旭| 上海市|                         上海市| 上海市上海市浦东新区杨高南路428...|           1|             3|   1448.48|       100.00|         28.55|     1377.03|             顺丰|9631168546997|20220329185733|20220330201433|20220329191033|20220402035433|     已签收|       138|     飞利信信息有限公司|2022-04-02 03:54:33|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2334|2022032859756405|       7370|       黄丽娟| 江苏省|                   江苏省苏州市|     江苏省苏州市现代大道1699409...|           1|             1|   2229.52|       100.00|         11.01|     2140.53|             中通|4278051015984|20220329082028|20220330154728|20220330073228|20220402042228|     已签收|       214|惠派国际公司信息有限公司|2022-04-02 04:22:28|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2403|2022032788808370|       4541|       阎桂香| 江苏省|                   江苏省无锡市|     江苏省无锡市解放东路1000464...|           1|             2|   1052.92|       210.58|         13.67|       856.01|             顺丰|4476220310056|20220328173448|20220330022648|20220329012648|20220401031848|     已退款|         86|       昊嘉科技有限公司|2022-04-02 05:40:48|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2459|2022032955711624|     15084|         黄云| 上海市|                         上海市|   上海市上海市水城南路207986号17层|           1|             2|   1720.08|       1204.06|         14.02|       530.04|             顺丰|5130441519061|20220329122210|20220331020110|20220329234010|20220402020310|     已签收|         53| 新格林耐特传媒有限公司|2022-04-02 02:03:10|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
|   2501|2022032965004110|       7594|         谢帆| 贵州省|                   贵州省贵阳市|贵州省贵阳市云岩区红边门店普陀路1...|           1|             5|   7990.56|       6392.45|         13.42|     1611.53|             顺丰|8154839863090|20220330170823|20220331215023|20220330233023|20220402213123|     已签收|       161|   合联电子传媒有限公司|2022-04-02 21:31:23|20231131|         user1|2023-12-11 11:43:45|         user1|2023-12-11 11:43:45|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+---------------+-------------------+---------------+-------------------+
    */

  println("正在写入 Hive DWD层 ...")
  fact_order_master.write
    .format("parquet")
    .mode("overwrite")     // 覆盖
    .partitionBy("etl_date")   // 指定分区
    .saveAsTable("dwd.fact_order_master") // 保存到Hive表
  println("已完成写入 Hive DWD层")

  // 测试
  spark.table("dwd.fact_order_master").show()
  println("测试 hive dwd ------------------------------------------")
  /*
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+---------------+-------------------+---------------+-------------------+--------+
|order_id|       order_sn|customer_id|shipping_user|province|                           city|                             address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn|   create_time| shipping_time|     pay_time| receive_time|order_status|order_point|           invoice_title|     modified_time|dwd_insert_user|   dwd_insert_time|dwd_modify_user|   dwd_modify_time|etl_date|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+---------------+-------------------+---------------+-------------------+--------+
|   2063|2022032686959471|       7193|         卢鹏| 上海市|                         上海市|   上海市上海市黄浦区九江路59587...|           1|             1|   7426.38|       742.64|         18.90|     6702.64|             中通|1177680592862|20220327090656|20220329023556|20220328030856|20220330193056|     已退款|       670|       群英网络有限公司|2022-04-01 03:26:56|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2140|2022032684573934|     15572|         雷建| 上海市|                         上海市|     上海市上海市吴江路1288278号20层|           1|             2|   1510.76|       604.30|         12.46|       918.92|             顺丰|5086878998859|20220327083423|20220328114623|20220328063023|20220331015823|     已退款|         92|     易动力网络有限公司|2022-04-01 05:11:23|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2296|2022032845756607|     10681|       张桂兰| 上海市|                         上海市|       上海市上海市长寿路11182266...|           1|             2|   7026.05|       1405.21|         30.09|     5650.93|             顺丰|9906913860087|20220328161815|20220329152015|20220329112315|20220401053815|     已签收|       565|       银嘉信息有限公司|2022-04-01 05:38:15|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2312|2022032987273268|     16796|         侯涛| 浙江省|浙江省杭州江干区景昙路18-262...|     浙江省杭州江干区景昙路18-262...|           1|             4|     707.45|         95.00|         17.69|       630.14|             韵达|2046035743206|20220329125319|20220330152619|20220330020919|20220401162419|     已签收|         63| 济南亿次元网络有限公司|2022-04-01 16:24:19|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2326|2022032763122037|     19980|       魏金凤| 浙江省|                   浙江省杭州市|   浙江省杭州市西湖区古墩路58884...|           1|             3|   2228.43|       1559.90|         13.91|       682.44|             中通|1577073377562|20220328041523|20220329193723|20220329073623|20220401004923|     已签收|         68|       南康网络有限公司|2022-04-01 00:49:23|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2342|2022032894717891|       8972|         符强| 上海市|                         上海市|     上海市上海市沪青平公路288820...|           1|             4|   6173.30|       4321.31|         10.36|     1862.35|             顺丰|9695161512380|20220328163430|20220329123530|20220329091330|20220401044230|     已签收|       186|   方正科技信息有限公司|2022-04-01 04:42:30|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2350|2022032887041347|       4597|         姜岩| 江苏省|                   江苏省无锡市|   江苏省无锡市南长区南长街20121...|           1|             4|   4314.53|       2157.27|         41.27|     2198.54|             中通|5283447205985|20220328185032|20220329161332|20220329062132|20220401042032|     已签收|       220|       恩悌信息有限公司|2022-04-01 04:20:32|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2354|2022032733529686|     14993|         尹晶| 上海市|                         上海市|     上海市上海市四川北路1661767...|           1|             2|   8535.21|         95.00|         20.41|     8460.62|             中通|3976165724549|20220328090635|20220329220435|20220329084235|20220401015335|     已签收|       846|   立信电子传媒有限公司|2022-04-01 01:53:35|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2362|2022032714983348|       7925|       罗淑华| 上海市|                         上海市|     上海市上海市常德路8008918号8层|           1|             4|   5286.17|         80.00|         15.18|     5221.35|             韵达|9110286972135|20220328050040|20220329180940|20220329094040|20220401044540|     已签收|       522|     易动力科技有限公司|2022-04-01 04:45:40|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2370|2022032814968220|     14159|         邢涛| 上海市|                         上海市|   上海市上海市嘉定区佳通路31弄34...|           1|             2|   6690.56|         80.00|         42.28|     6652.84|             顺丰|9638866501792|20220328052340|20220330002240|20220329091840|20220401041340|     已签收|       665|   凌颖信息科技有限公司|2022-04-01 04:13:40|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2334|2022032859756405|       7370|       黄丽娟| 江苏省|                   江苏省苏州市|     江苏省苏州市现代大道1699409...|           1|             1|   2229.52|       100.00|         11.01|     2140.53|             中通|4278051015984|20220329082028|20220330154728|20220330073228|20220402042228|     已签收|       214|惠派国际公司信息有限公司|2022-04-02 04:22:28|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2403|2022032788808370|       4541|       阎桂香| 江苏省|                   江苏省无锡市|     江苏省无锡市解放东路1000464...|           1|             2|   1052.92|       210.58|         13.67|       856.01|             顺丰|4476220310056|20220328173448|20220330022648|20220329012648|20220401031848|     已退款|         86|       昊嘉科技有限公司|2022-04-02 05:40:48|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2459|2022032955711624|     15084|         黄云| 上海市|                         上海市|   上海市上海市水城南路207986号17层|           1|             2|   1720.08|       1204.06|         14.02|       530.04|             顺丰|5130441519061|20220329122210|20220331020110|20220329234010|20220402020310|     已签收|         53| 新格林耐特传媒有限公司|2022-04-02 02:03:10|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2501|2022032965004110|       7594|         谢帆| 贵州省|                   贵州省贵阳市|贵州省贵阳市云岩区红边门店普陀路1...|           1|             5|   7990.56|       6392.45|         13.42|     1611.53|             顺丰|8154839863090|20220330170823|20220331215023|20220330233023|20220402213123|     已签收|       161|   合联电子传媒有限公司|2022-04-02 21:31:23|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2529|2022032866253715|     15300|         姚博| 上海市|                         上海市|   上海市上海市嘉定区佳通路31弄39...|           1|             4|     515.96|       257.98|         27.30|       285.28|             韵达|4472053332463|20220329183828|20220330215828|20220329191128|20220402034728|     已签收|         29| 济南亿次元信息有限公司|2022-04-02 03:47:28|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2513|2022032810875003|       6753|         卿柳| 上海市|                         上海市|     上海市上海市辛耕路1299124号14层|           1|             3|   2888.15|       2021.71|         17.57|       884.02|             顺丰|0370967098533|20220329062624|20220330114924|20220330080924|20220402002524|     已签收|         88|       良诺信息有限公司|2022-04-02 00:25:24|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2435|2022032844826283|     19014|       张桂兰| 江苏省|             江苏省江苏省淮安市|江苏省江苏省淮安市清河区翔宇中路1...|           1|             1|   2602.38|         80.00|         40.23|     2562.61|             韵达|1484782939199|20220329050759|20220330202559|20220330085359|20220402030259|     已签收|       256|   网新恒天网络有限公司|2022-04-02 03:02:59|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2322|2022032881798097|     18548|       陈丹丹| 江苏省|                   江苏省溧阳市|     江苏省溧阳市天目路123956号上...|           1|             2|   2043.64|       100.00|         14.60|     1958.24|             韵达|8595786486688|20220328061921|20220329121521|20220328194921|20220331172121|     已退款|       196|       彩虹信息有限公司|2022-04-02 03:58:21|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2553|2022032935583382|       6329|         刘婷| 江苏省|                   江苏省苏州市|     江苏省苏州市金鸡湖路885370号...|           1|             3|   1734.24|       1387.39|         45.83|       392.68|             韵达|3143365276943|20220330091539|20220331200739|20220330190339|20220402162639|     已签收|         39|     MBP软件信息有限公司|2022-04-02 16:26:39|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
|   2537|2022032818405314|     11139|         张旭| 上海市|                         上海市| 上海市上海市浦东新区杨高南路428...|           1|             3|   1448.48|       100.00|         28.55|     1377.03|             顺丰|9631168546997|20220329185733|20220330201433|20220329191033|20220402035433|     已签收|       138|     飞利信信息有限公司|2022-04-02 03:54:33|         user1|2023-12-11 12:30:19|         user1|2023-12-11 12:30:19|20231131|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+---------------+-------------------+---------------+-------------------+--------+
  */

  // 停止会话连接
  spark.stop()
}

// 使用 newAPIHadoopRDD 读取数据
def read(spark:SparkSession):DataFrame = {

  // 1) 创建HbaseConfiguration配置信息
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "192.168.190.139") //设置zooKeeper集群地址,也可以通过将hbase-site.xml拷贝到resources目录下
  hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
  hbaseConf.set("hbase.master", "192.168.190.139:16000")     // 设置HBase HMaster

  // 设置从HBase那张表读取数据
  val tablename = "hbase_order_master" // 要读取的hbase表名
  hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename) // 注意:读取时使用TableInputFormat

  // 2)添加filter => TableMapReduceUtil.convertScanToString(getScan)
  hbaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(getScan))

  // 3)调用SparkContext中newAPIHadoopRDD读取表中的数据,构建RDD
  val resultRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(
    hbaseConf,
    classOf[TableInputFormat],
    classOf[ImmutableBytesWritable],
    classOf[Result]
  )

  // 测试获取的数据
//   println(s"count = ${resultRDD.count()}")
//   resultRDD.foreach(println)
//   resultRDD.take(2).foreach(println)
  /*
  resultRDD.take(2).foreach{
    case (_, result) =>
      println(s"rowKey = ${Bytes.toString(result.getRow)}")
        for (cell <- result.rawCells()) {
        val cf = Bytes.toString(CellUtil.cloneFamily(cell))       // 列族
        val colum = Bytes.toString(CellUtil.cloneQualifier(cell)) // 列限定符
        val value = Bytes.toString(CellUtil.cloneValue(cell))     // 列值
        println(s"\t$cf:$colum = $value")
      }
  }
    */

  // 4)对获取的值进行过滤,并转换为DataFrame返回
  import spark.implicits._
  resultRDD
    .map { t =>
      val result = t._2 // Result
      // 通过列族和列名获取列
      val order_id = Bytes.toLong(result.getValue("ods".getBytes, "order_id".getBytes))
      val order_sn = Bytes.toString(result.getValue("ods".getBytes, "order_sn".getBytes))
      val customer_id = Bytes.toLong(result.getValue("ods".getBytes, "customer_id".getBytes))
      val shipping_user = Bytes.toString(result.getValue("ods".getBytes, "shipping_user".getBytes))
      val province = Bytes.toString(result.getValue("ods".getBytes, "province".getBytes))
      val city = Bytes.toString(result.getValue("ods".getBytes, "city".getBytes))
      val address = Bytes.toString(result.getValue("ods".getBytes, "address".getBytes))
      val order_source = Bytes.toInt(result.getValue("ods".getBytes, "order_source".getBytes))
      val payment_method = Bytes.toInt(result.getValue("ods".getBytes, "payment_method".getBytes))
      val order_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "order_money".getBytes))
      val district_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "district_money".getBytes))
      val shipping_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "shipping_money".getBytes))
      val payment_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "payment_money".getBytes))
      val shipping_comp_name = Bytes.toString(result.getValue("ods".getBytes, "shipping_comp_name".getBytes))
      val shipping_sn = Bytes.toString(result.getValue("ods".getBytes, "shipping_sn".getBytes))
      val create_time = Bytes.toString(result.getValue("ods".getBytes, "create_time".getBytes))
      val shipping_time = Bytes.toString(result.getValue("ods".getBytes, "shipping_time".getBytes))
      val pay_time = Bytes.toString(result.getValue("ods".getBytes, "pay_time".getBytes))
      val receive_time = Bytes.toString(result.getValue("ods".getBytes, "receive_time".getBytes))
      val order_status = Bytes.toString(result.getValue("ods".getBytes, "order_status".getBytes))
      val order_point = Bytes.toLong(result.getValue("ods".getBytes, "order_point".getBytes))
      val invoice_title = Bytes.toString(result.getValue("ods".getBytes, "invoice_title".getBytes))
      val modified_time = Bytes.toString(result.getValue("ods".getBytes, "modified_time".getBytes))

      // 构造为OrderMaster对象实例
      OrderMaster(order_id,
            order_sn,
            customer_id,
            shipping_user,
            province,
            city,
            address,
            order_source,
            payment_method,
            order_money,
            district_money,
            shipping_money,
            payment_money,
            shipping_comp_name,
            shipping_sn,
            create_time,
            shipping_time,
            pay_time,
            receive_time,
            order_status,
            order_point,
            invoice_title,
            modified_time)
    }
    // 转换为DataFrame
    .toDF()
}

// case class
case class OrderMaster(
                        order_id: Long,
                        order_sn: String,
                        customer_id: Long,
                        shipping_user: String,
                        province: String,
                        city: String,
                        address: String,
                        order_source: Int,
                        payment_method: Int,
                        order_money: BigDecimal,   // BigDecimal 默认为(38,18)
                        district_money: BigDecimal,
                        shipping_money: BigDecimal,
                        payment_money: BigDecimal,
                        shipping_comp_name: String,
                        shipping_sn: String,
                        create_time: String,
                        shipping_time: String,
                        pay_time: String,
                        receive_time: String,
                        order_status: String,
                        order_point: Long,
                        invoice_title: String,
                        modified_time: String   // 注意这个字段
                    )

/* 创建scan过滤, 过滤出rowkey包含'20220402'内容的行
  * 假设rowkey格式为:创建日期_发布日期_ID_TITLE
  * 目标:查找 rowkey中包含 20220402 的数据
  * 使用行过滤器:new RowFilter(CompareOp.EQUAL , new SubstringComparator("20220402"))
  * 返回 org.apache.hadoop.hbase.client.Scan
  */
def getScan: Scan = {
  // 创建Scan
  val scan = new Scan()
  // 指定行过滤器(行键必须包含子字符串"20220402")
  val rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator("20220402"))
  // scan 设置过滤器
  scan.setFilter(rowFilter)
  scan.withStartRow(Bytes.toBytes("020220402000000000"))   // 起始行键
  scan.withStopRow(Bytes.toBytes("920220402235959999"))   // 结束行键
  scan
}

}

运行以上程序,然后到Hive Cli中查看DWD中写入的数据。

第三部分:可能遇到的异常及解决方法

在Spark集成HBase过程中,可能会遇到各种异常。以下是一些常见异常及解决方法。

异常1:

java.lang.NoSuchMethodError: org.apache.hadoop.security.HadoopKerberosName.setRuleMechanism(Ljava/lang/String;)V

解决方法:

在pom.xml文件中,添加如下依赖:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-auth</artifactId>
  <version>3.1.2</version>
</dependency>

异常2:

Exception in thread “main” java.lang.NoClassDefFoundError: com/fasterxml/jackson/core/exc/InputCoercionException

解决方法:

在pom.xml文件中,添加如下依赖:

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.10.5</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-core</artifactId>
  <version>2.10.5</version>
</dependency>

异常3:在hbase shell中查看表内容时,看不到中文。

这其实不是异常,因为HBase默认是查看编码以后的中文。

解决方法:使用以下方法查看hbase表:

hbase> scan 'hbase_master_order',{FORMATTER => 'toString'}
© 版权声明
THE END
喜欢就支持一下吧
点赞177赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容