任务描述
抽取 ods 库中表 table4 最新分区的数据,并结合 HBase 中 table4 offline表中的数据合并抽取到 dwd 库中 fact table4 的分区表,分区字段为etl date 且值与 ods 库的相对应表该值相等,并添加 dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd modify_time 四列,其中dwd_insert_user、dwd_modify_user 均填写“userl”,dwd_insert_time、dwd_modify_time 均填写当前操作时间(年月日必须是今天,时分秒只需在比赛时间范围内即可),抽取 HBase 中的数据时,只抽取 XXXX 年 XX 月XX日的数据 (以 rowkey 为准),并进行数据类型转换。使用 hive cli 查询modified_time为XXXX年XX月XX号当天的数据,并按照id进行升序排序,将结果截图复制粘贴至客户端桌面[Release\模块 D 提交结果docx]中对应的任务序号下;“
任务分析
这是今年(2024年)各省的省赛中普遍出现的一个任务需求。该任务的实质是模拟实现大数据架构中应用Spark对离线链路数据(Hive ODS)和实时链路数据(HBase)实现T+1的数据合并。
在这个任务中,涉及以下技术点:
-
Spark + Hive整合。
-
Spark 读写Hive表。
-
Spark + HBase整合。
-
Spark读写HBase表。
-
Spark查询HBase表时应用过滤器。
Spark整合HBase
HBase依赖于Zookeeper和Hadoop,Spark读取HBase数据需要连接到Zookeeper,通过Zookeeper访问HBase分区表(region)。在Spark中整合HBase有两种方式:
(1)第一种是将hbase-site.xml文件拷贝到项目的resources目录下。
(2)第二种是在HBaseConfiguration实例中设置。
在本教程使用的是第二种方式。
鉴于比赛中赛方可能不提供spark-hbase连接器,因此本示例使用HBase自带的API。请注意,在HBase 中有两个相关的包:org.apache.hadoop.hbase.mapred 和 org.apache.hadoop.hbase.mapreduce。其中前者使用旧式 API,后者使用新式API。 在本示例中使用新的API,使用 saveAsNewAPIHadoopDataset()方法将数据写入HBase,使用 newAPIHadoopRDD从HBase读取数据到RDD中。
本案例思路
本案例教程在PBCP2023平台上进行测试,应用内置电商数据中的订单表order_master作为示例数据。
在这个示例中,将按以下步骤展开:
(1)先从MySQL的order_master中读取2022-04-01这天的数据加载到Hive ODS中,用作存量历史数据;
(2)再从MySQL的order_master中读取2022-04-02这天的数据存储到HBase表中,模拟实时链路新增数据。
(3)编写Spark作业,从Hive ODS中加载离线批处理数据(2022-04-01这天的数据),从HBase中加载实时链路T+1数据(2022-04-02这天的数据),然后将这两者合并, 并按任务要求进行相应的转换和增加字段,最后写入到Hive 的DWD层中。
快速链接跳转
本教程包含以下几个部分,单击目录可快速跳转到相应内容:
准备工作
1)启动大数据环境
首先启动Hadoop集群。在终端窗口运行如下命令:
# start-all.sh
启动HBase集群。在终端窗口运行如下命令:
# start-hbase.sh
2)启动Hive Metastore服务
Spark读写Hive表,需要访问Metastore服务。在终端中执行如下命令:
# hive --service metastore
这将保持Hive Metastore服务一直运行,请勿关闭终端。如果要将其作为后台服务启动,则可以使用下面的命令:
# nohup hive --service metastore &
这个命令将启动Hive Metastore服务,并在后台持续运行。
3)创建HBase表
使用hbase shell,进入到hbase命令行。打开终端窗口,执行如下命令:
# hbase shell
使用create命令,创建HBase表:
hbase> create 'hbase_order_master','ods'
使用scan命令,扫描HBase表(查看HBase表内容):
hbase> scan 'hbase_order_master'
第一部分:准备Hive存量数据和HBase新增数据
要在Spark中访问HBase,需要在pom.xml中添加如下依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
编写一个Spark程序,先抽取mysql中的order_master订单表中2022-04-01这天的数据加载到Hive ODS中,用作存量历史数据; 再抽取mysql中的order_master订单表中2022-04-02这天的数据存储到HBase表中,模拟实时链路新增数据。
实现代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.time.format.DateTimeFormatter
object ReadWriteHBaseDemo1 {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark and HBase Demo")
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 需要根据分区值,覆盖原来的分区时,需要配置的参数
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
// hive兼容格式存储
.config("spark.sql.parquet.writeLegacyFormat", "true")
.enableHiveSupport()
.getOrCreate()
// jdbc url
val db_url = "jdbc:mysql://192.168.190.139:3306/ds_db01?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
// 1)读取ds_db01.order_master中2022-04-01的数据,写入Hive ODS,模拟存量批数据
val df_first_day = spark.read
.format("jdbc")
.option("url", db_url)
.option("query", "select * from order_master where date(modified_time)='2022-04-01'")
.option("user", "root") // 注意替换为自己的mysql用户
.option("password", "admin") // 注意替换为自己的mysql密码
.load()
.withColumn("etl_date",lit("20231131"))
.limit(10) // 取10条
// 分区写入Hive ODS
df_first_day.write
.format("parquet")
.mode("overwrite") // 覆盖
.partitionBy("etl_date") // 指定分区
.saveAsTable("ods.order_master")
// 2)读取ds_db01.order_master中2022-04-02的数据,写入HBase表,模拟实时增量数据
val df_second_day = spark.read
.format("jdbc")
.option("url", db_url)
.option("query", "select * from order_master where date(modified_time)='2022-04-02'")
.option("user", "root") // 注意替换为自己的mysql用户
.option("password", "admin") // 注意替换为自己的mysql密码
.load()
.limit(10) // 也取10条
// 将DataFrame(订单数据)存储到HBase 表中
// 事先在HBase shell中创建表:create 'hbase_order_master','ods'
save(df_second_day)
// 停止会话连接
spark.stop()
}
// 使用 saveAsNewAPIHadoopDataset 写入数据
def save(df:DataFrame):Unit = {
val tablename = "hbase_order_master" // 要写入的hbase表名
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","192.168.190.139") //设置zooKeeper集群地址,也可以通过将hbase-site.xml拷贝到resources目录下
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
// 初始化job,设置输出格式是org.apache.hadoop.hbase.mapred.TableOutputFormat
val jobConf = new JobConf(hbaseConf)
//设置job的输出格式
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val rdd = df.rdd.map{ row =>
/* 一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
*
* rowkey使用随机数(0-9) + yyyyMMddHHmmssSSS
*
* 注:这里使用订单的 modified_time 字段值和随机数构造rowkey。
* */
// 构造rowkey row.getTimestamp(22).toString
// val currentTime = LocalDateTime.now()
val currentTime = row.getTimestamp(22).toLocalDateTime // 使用订单的 modified_time 字段值
val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
val timestamp = currentTime.format(formatter)
// 构造rowkey
val rowkey = scala.util.Random.nextInt(10).toString + timestamp
// 创建一条Put,添加行键
val put = new Put(Bytes.toBytes(rowkey))
// 添加各列(共23列)
// Put.addColumn 方法接收三个参数:列族,列名,数据
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_id"),Bytes.toBytes(row.getLong(0)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_sn"),Bytes.toBytes(row.getString(1)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("customer_id"),Bytes.toBytes(row.getLong(2)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_user"),Bytes.toBytes(row.getString(3)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("province"),Bytes.toBytes(row.getString(4)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("city"),Bytes.toBytes(row.getString(5)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("address"),Bytes.toBytes(row.getString(6)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_source"),Bytes.toBytes(row.getInt(7)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("payment_method"),Bytes.toBytes(row.getInt(8)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_money"),Bytes.toBytes(row.getDecimal(9)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("district_money"),Bytes.toBytes(row.getDecimal(10)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_money"),Bytes.toBytes(row.getDecimal(11)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("payment_money"),Bytes.toBytes(row.getDecimal(12)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_comp_name"),Bytes.toBytes(row.getString(13)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_sn"),Bytes.toBytes(row.getString(14)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("create_time"),Bytes.toBytes(row.getString(15)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("shipping_time"),Bytes.toBytes(row.getString(16)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("pay_time"),Bytes.toBytes(row.getString(17)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("receive_time"),Bytes.toBytes(row.getString(18)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_status"),Bytes.toBytes(row.getString(19)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("order_point"),Bytes.toBytes(row.getLong(20)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("invoice_title"),Bytes.toBytes(row.getString(21)))
put.addColumn(Bytes.toBytes("ods"),Bytes.toBytes("modified_time"),Bytes.toBytes(row.getTimestamp(22).toString))
(new ImmutableBytesWritable, put)
}
// 存储到HBase
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
运行以上程序,然后分别到Hive Cli和 hbase shell中查看写入的数据。
第二部分:合并Hive存量数据和HBase新增数据
再编写一个Spark程序,分别从Hive ODS加载离线数据,从HBase中加载实时链路存量数据,然后合并这两个数据集,并根据任务要求进行适当的数据类型转换和数据处理。实现的代码如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.CompareOperator
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object ReadWriteHBaseDemo2 {
def main(args: Array[String]): Unit = {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
// 设置Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark and HBase Demo")
// 设置使用Kryo方式序列化,目的是为了方便将数据返回到driver端测试(如无此需要,可不设置)
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
// 注册使用Kryo进行序列化的类
sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable],classOf[Result]))
// 创建SparkSession实例
val spark = SparkSession.builder()
.config(conf = sparkConf)
// 解决时区问题
// .config("spark.sql.session.timeZone", "Asia/Shanghai") // UTC或CST或Asia/Shanghai
// 打开Hive动态分区的标志
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 需要根据分区值,覆盖原来的分区时,需要配置的参数
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
// hive兼容格式存储
.config("spark.sql.parquet.writeLegacyFormat", "true")
.enableHiveSupport()
.getOrCreate()
// 1)从Hive ODS读取存量订单数据
val hive_ods_order_master = spark.table("ods.order_master")
// hive_ods_order_master.printSchema()
// hive_ods_order_master.show()
println("hive ods ------------------------------------------")
/*
root
|-- order_id: long (nullable = true)
|-- order_sn: string (nullable = true)
|-- customer_id: long (nullable = true)
|-- shipping_user: string (nullable = true)
|-- province: string (nullable = true)
|-- city: string (nullable = true)
|-- address: string (nullable = true)
|-- order_source: integer (nullable = true)
|-- payment_method: integer (nullable = true)
|-- order_money: decimal(8,2) (nullable = true)
|-- district_money: decimal(8,2) (nullable = true)
|-- shipping_money: decimal(8,2) (nullable = true)
|-- payment_money: decimal(8,2) (nullable = true)
|-- shipping_comp_name: string (nullable = true)
|-- shipping_sn: string (nullable = true)
|-- create_time: string (nullable = true)
|-- shipping_time: string (nullable = true)
|-- pay_time: string (nullable = true)
|-- receive_time: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_point: long (nullable = true)
|-- invoice_title: string (nullable = true)
|-- modified_time: timestamp (nullable = true)
|-- etl_date: string (nullable = true)
+--------+----------------+-----------+-------------+--------+-------------------------------+---------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+----------------------+-------------------+--------+
|order_id| order_sn|customer_id|shipping_user|province| city| address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn| create_time| shipping_time| pay_time| receive_time|order_status|order_point| invoice_title| modified_time|etl_date|
+--------+----------------+-----------+-------------+--------+-------------------------------+---------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+----------------------+-------------------+--------+
| 2063|2022032686959471| 7193| 卢鹏| 上海市| 上海市| 上海市上海市黄浦区九江路59587...| 1| 1| 7426.38| 742.64| 18.90| 6702.64| 中通|1177680592862|20220327090656|20220329023556|20220328030856|20220330193056| 已退款| 670| 群英网络有限公司|2022-04-01 03:26:56|20231131|
| 2140|2022032684573934| 15572| 雷建| 上海市| 上海市| 上海市上海市吴江路1288278号20层| 1| 2| 1510.76| 604.30| 12.46| 918.92| 顺丰|5086878998859|20220327083423|20220328114623|20220328063023|20220331015823| 已退款| 92| 易动力网络有限公司|2022-04-01 05:11:23|20231131|
| 2296|2022032845756607| 10681| 张桂兰| 上海市| 上海市| 上海市上海市长寿路11182266...| 1| 2| 7026.05| 1405.21| 30.09| 5650.93| 顺丰|9906913860087|20220328161815|20220329152015|20220329112315|20220401053815| 已签收| 565| 银嘉信息有限公司|2022-04-01 05:38:15|20231131|
| 2312|2022032987273268| 16796| 侯涛| 浙江省|浙江省杭州江干区景昙路18-262...| 浙江省杭州江干区景昙路18-262...| 1| 4| 707.45| 95.00| 17.69| 630.14| 韵达|2046035743206|20220329125319|20220330152619|20220330020919|20220401162419| 已签收| 63|济南亿次元网络有限公司|2022-04-01 16:24:19|20231131|
| 2326|2022032763122037| 19980| 魏金凤| 浙江省| 浙江省杭州市| 浙江省杭州市西湖区古墩路58884...| 1| 3| 2228.43| 1559.90| 13.91| 682.44| 中通|1577073377562|20220328041523|20220329193723|20220329073623|20220401004923| 已签收| 68| 南康网络有限公司|2022-04-01 00:49:23|20231131|
| 2342|2022032894717891| 8972| 符强| 上海市| 上海市| 上海市上海市沪青平公路288820...| 1| 4| 6173.30| 4321.31| 10.36| 1862.35| 顺丰|9695161512380|20220328163430|20220329123530|20220329091330|20220401044230| 已签收| 186| 方正科技信息有限公司|2022-04-01 04:42:30|20231131|
| 2350|2022032887041347| 4597| 姜岩| 江苏省| 江苏省无锡市| 江苏省无锡市南长区南长街20121...| 1| 4| 4314.53| 2157.27| 41.27| 2198.54| 中通|5283447205985|20220328185032|20220329161332|20220329062132|20220401042032| 已签收| 220| 恩悌信息有限公司|2022-04-01 04:20:32|20231131|
| 2354|2022032733529686| 14993| 尹晶| 上海市| 上海市| 上海市上海市四川北路1661767...| 1| 2| 8535.21| 95.00| 20.41| 8460.62| 中通|3976165724549|20220328090635|20220329220435|20220329084235|20220401015335| 已签收| 846| 立信电子传媒有限公司|2022-04-01 01:53:35|20231131|
| 2362|2022032714983348| 7925| 罗淑华| 上海市| 上海市| 上海市上海市常德路8008918号8层| 1| 4| 5286.17| 80.00| 15.18| 5221.35| 韵达|9110286972135|20220328050040|20220329180940|20220329094040|20220401044540| 已签收| 522| 易动力科技有限公司|2022-04-01 04:45:40|20231131|
| 2370|2022032814968220| 14159| 邢涛| 上海市| 上海市|上海市上海市嘉定区佳通路31弄34...| 1| 2| 6690.56| 80.00| 42.28| 6652.84| 顺丰|9638866501792|20220328052340|20220330002240|20220329091840|20220401041340| 已签收| 665| 凌颖信息科技有限公司|2022-04-01 04:13:40|20231131|
+--------+----------------+-----------+-------------+--------+-------------------------------+---------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+----------------------+-------------------+--------+
*/
// 2)从HBase表中读取数据到DataFrame中
val hbase_ods_order_master = read(spark)
// 金额相关的列,数据类型转换(Decimal(38,18) -> Decimal(8,2)
.withColumn("order_money", col("order_money").cast(DecimalType(8,2)))
.withColumn("district_money", col("district_money").cast(DecimalType(8,2)))
.withColumn("shipping_money", col("shipping_money").cast(DecimalType(8,2)))
.withColumn("payment_money", col("payment_money").cast(DecimalType(8,2)))
// modified_time列,数据类型转换(String -> Timestamp
.withColumn("modified_time", col("modified_time").cast(TimestampType))
// 增加分区列
.withColumn("etl_date",lit("20231131"))
// hbase_ods_order_master.printSchema()
// hbase_ods_order_master.show()
println("hbase ods ------------------------------------------")
/*
root
|-- order_id: long (nullable = false)
|-- order_sn: string (nullable = true)
|-- customer_id: long (nullable = false)
|-- shipping_user: string (nullable = true)
|-- province: string (nullable = true)
|-- city: string (nullable = true)
|-- address: string (nullable = true)
|-- order_source: integer (nullable = false)
|-- payment_method: integer (nullable = false)
|-- order_money: decimal(8,2) (nullable = true)
|-- district_money: decimal(8,2) (nullable = true)
|-- shipping_money: decimal(8,2) (nullable = true)
|-- payment_money: decimal(8,2) (nullable = true)
|-- shipping_comp_name: string (nullable = true)
|-- shipping_sn: string (nullable = true)
|-- create_time: string (nullable = true)
|-- shipping_time: string (nullable = true)
|-- pay_time: string (nullable = true)
|-- receive_time: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_point: long (nullable = false)
|-- invoice_title: string (nullable = true)
|-- modified_time: timestamp (nullable = true)
|-- etl_date: string (nullable = false)
+--------+----------------+-----------+-------------+--------+------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+
|order_id| order_sn|customer_id|shipping_user|province| city| address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn| create_time| shipping_time| pay_time| receive_time|order_status|order_point| invoice_title| modified_time|etl_date|
+--------+----------------+-----------+-------------+--------+------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+
| 2334|2022032859756405| 7370| 黄丽娟| 江苏省| 江苏省苏州市| 江苏省苏州市现代大道1699409...| 1| 1| 2229.52| 100.00| 11.01| 2140.53| 中通|4278051015984|20220329082028|20220330154728|20220330073228|20220402042228| 已签收| 214|惠派国际公司信息有限公司|2022-04-02 04:22:28|20231131|
| 2403|2022032788808370| 4541| 阎桂香| 江苏省| 江苏省无锡市| 江苏省无锡市解放东路1000464...| 1| 2| 1052.92| 210.58| 13.67| 856.01| 顺丰|4476220310056|20220328173448|20220330022648|20220329012648|20220401031848| 已退款| 86| 昊嘉科技有限公司|2022-04-02 05:40:48|20231131|
| 2459|2022032955711624| 15084| 黄云| 上海市| 上海市| 上海市上海市水城南路207986号17层| 1| 2| 1720.08| 1204.06| 14.02| 530.04| 顺丰|5130441519061|20220329122210|20220331020110|20220329234010|20220402020310| 已签收| 53| 新格林耐特传媒有限公司|2022-04-02 02:03:10|20231131|
| 2501|2022032965004110| 7594| 谢帆| 贵州省| 贵州省贵阳市|贵州省贵阳市云岩区红边门店普陀路1...| 1| 5| 7990.56| 6392.45| 13.42| 1611.53| 顺丰|8154839863090|20220330170823|20220331215023|20220330233023|20220402213123| 已签收| 161| 合联电子传媒有限公司|2022-04-02 21:31:23|20231131|
| 2529|2022032866253715| 15300| 姚博| 上海市| 上海市| 上海市上海市嘉定区佳通路31弄39...| 1| 4| 515.96| 257.98| 27.30| 285.28| 韵达|4472053332463|20220329183828|20220330215828|20220329191128|20220402034728| 已签收| 29| 济南亿次元信息有限公司|2022-04-02 03:47:28|20231131|
| 2513|2022032810875003| 6753| 卿柳| 上海市| 上海市| 上海市上海市辛耕路1299124号14层| 1| 3| 2888.15| 2021.71| 17.57| 884.02| 顺丰|0370967098533|20220329062624|20220330114924|20220330080924|20220402002524| 已签收| 88| 良诺信息有限公司|2022-04-02 00:25:24|20231131|
| 2435|2022032844826283| 19014| 张桂兰| 江苏省|江苏省江苏省淮安市|江苏省江苏省淮安市清河区翔宇中路1...| 1| 1| 2602.38| 80.00| 40.23| 2562.61| 韵达|1484782939199|20220329050759|20220330202559|20220330085359|20220402030259| 已签收| 256| 网新恒天网络有限公司|2022-04-02 03:02:59|20231131|
| 2322|2022032881798097| 18548| 陈丹丹| 江苏省| 江苏省溧阳市| 江苏省溧阳市天目路123956号上...| 1| 2| 2043.64| 100.00| 14.60| 1958.24| 韵达|8595786486688|20220328061921|20220329121521|20220328194921|20220331172121| 已退款| 196| 彩虹信息有限公司|2022-04-02 03:58:21|20231131|
| 2553|2022032935583382| 6329| 刘婷| 江苏省| 江苏省苏州市| 江苏省苏州市金鸡湖路885370号...| 1| 3| 1734.24| 1387.39| 45.83| 392.68| 韵达|3143365276943|20220330091539|20220331200739|20220330190339|20220402162639| 已签收| 39| MBP软件信息有限公司|2022-04-02 16:26:39|20231131|
| 2537|2022032818405314| 11139| 张旭| 上海市| 上海市| 上海市上海市浦东新区杨高南路428...| 1| 3| 1448.48| 100.00| 28.55| 1377.03| 顺丰|9631168546997|20220329185733|20220330201433|20220329191033|20220402035433| 已签收| 138| 飞利信信息有限公司|2022-04-02 03:54:33|20231131|
+--------+----------------+-----------+-------------+--------+------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+
*/
// 3)合并存量数据和增量数据,写入Hive DWD层
val fact_order_master = hive_ods_order_master
// 垂直合并
.union(hbase_ods_order_master)
// 增加新的列
.withColumn("dwd_insert_user",lit("user1"))
.withColumn("dwd_insert_time",date_trunc("second",current_timestamp())) // spark 2.3.0
.withColumn("dwd_modify_user",lit("user1"))
.withColumn("dwd_modify_time",date_trunc("second",current_timestamp())) // spark 2.3.0
fact_order_master.printSchema()
fact_order_master.show()
println("合并后 ------------------------------------------")
/*
root
|-- order_id: long (nullable = true)
|-- order_sn: string (nullable = true)
|-- customer_id: long (nullable = true)
|-- shipping_user: string (nullable = true)
|-- province: string (nullable = true)
|-- city: string (nullable = true)
|-- address: string (nullable = true)
|-- order_source: integer (nullable = true)
|-- payment_method: integer (nullable = true)
|-- order_money: decimal(8,2) (nullable = true)
|-- district_money: decimal(8,2) (nullable = true)
|-- shipping_money: decimal(8,2) (nullable = true)
|-- payment_money: decimal(8,2) (nullable = true)
|-- shipping_comp_name: string (nullable = true)
|-- shipping_sn: string (nullable = true)
|-- create_time: string (nullable = true)
|-- shipping_time: string (nullable = true)
|-- pay_time: string (nullable = true)
|-- receive_time: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_point: long (nullable = true)
|-- invoice_title: string (nullable = true)
|-- modified_time: timestamp (nullable = true)
|-- etl_date: string (nullable = true)
|-- dwd_insert_user: string (nullable = false)
|-- dwd_insert_time: timestamp (nullable = true)
|-- dwd_modify_user: string (nullable = false)
|-- dwd_modify_time: timestamp (nullable = true)
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+---------------+-------------------+---------------+-------------------+
|order_id| order_sn|customer_id|shipping_user|province| city| address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn| create_time| shipping_time| pay_time| receive_time|order_status|order_point| invoice_title| modified_time|etl_date|dwd_insert_user| dwd_insert_time|dwd_modify_user| dwd_modify_time|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+---------------+-------------------+---------------+-------------------+
| 2063|2022032686959471| 7193| 卢鹏| 上海市| 上海市| 上海市上海市黄浦区九江路59587...| 1| 1| 7426.38| 742.64| 18.90| 6702.64| 中通|1177680592862|20220327090656|20220329023556|20220328030856|20220330193056| 已退款| 670| 群英网络有限公司|2022-04-01 03:26:56|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2140|2022032684573934| 15572| 雷建| 上海市| 上海市| 上海市上海市吴江路1288278号20层| 1| 2| 1510.76| 604.30| 12.46| 918.92| 顺丰|5086878998859|20220327083423|20220328114623|20220328063023|20220331015823| 已退款| 92| 易动力网络有限公司|2022-04-01 05:11:23|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2296|2022032845756607| 10681| 张桂兰| 上海市| 上海市| 上海市上海市长寿路11182266...| 1| 2| 7026.05| 1405.21| 30.09| 5650.93| 顺丰|9906913860087|20220328161815|20220329152015|20220329112315|20220401053815| 已签收| 565| 银嘉信息有限公司|2022-04-01 05:38:15|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2312|2022032987273268| 16796| 侯涛| 浙江省|浙江省杭州江干区景昙路18-262...| 浙江省杭州江干区景昙路18-262...| 1| 4| 707.45| 95.00| 17.69| 630.14| 韵达|2046035743206|20220329125319|20220330152619|20220330020919|20220401162419| 已签收| 63| 济南亿次元网络有限公司|2022-04-01 16:24:19|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2326|2022032763122037| 19980| 魏金凤| 浙江省| 浙江省杭州市| 浙江省杭州市西湖区古墩路58884...| 1| 3| 2228.43| 1559.90| 13.91| 682.44| 中通|1577073377562|20220328041523|20220329193723|20220329073623|20220401004923| 已签收| 68| 南康网络有限公司|2022-04-01 00:49:23|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2342|2022032894717891| 8972| 符强| 上海市| 上海市| 上海市上海市沪青平公路288820...| 1| 4| 6173.30| 4321.31| 10.36| 1862.35| 顺丰|9695161512380|20220328163430|20220329123530|20220329091330|20220401044230| 已签收| 186| 方正科技信息有限公司|2022-04-01 04:42:30|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2350|2022032887041347| 4597| 姜岩| 江苏省| 江苏省无锡市| 江苏省无锡市南长区南长街20121...| 1| 4| 4314.53| 2157.27| 41.27| 2198.54| 中通|5283447205985|20220328185032|20220329161332|20220329062132|20220401042032| 已签收| 220| 恩悌信息有限公司|2022-04-01 04:20:32|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2354|2022032733529686| 14993| 尹晶| 上海市| 上海市| 上海市上海市四川北路1661767...| 1| 2| 8535.21| 95.00| 20.41| 8460.62| 中通|3976165724549|20220328090635|20220329220435|20220329084235|20220401015335| 已签收| 846| 立信电子传媒有限公司|2022-04-01 01:53:35|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2362|2022032714983348| 7925| 罗淑华| 上海市| 上海市| 上海市上海市常德路8008918号8层| 1| 4| 5286.17| 80.00| 15.18| 5221.35| 韵达|9110286972135|20220328050040|20220329180940|20220329094040|20220401044540| 已签收| 522| 易动力科技有限公司|2022-04-01 04:45:40|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2370|2022032814968220| 14159| 邢涛| 上海市| 上海市| 上海市上海市嘉定区佳通路31弄34...| 1| 2| 6690.56| 80.00| 42.28| 6652.84| 顺丰|9638866501792|20220328052340|20220330002240|20220329091840|20220401041340| 已签收| 665| 凌颖信息科技有限公司|2022-04-01 04:13:40|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2529|2022032866253715| 15300| 姚博| 上海市| 上海市| 上海市上海市嘉定区佳通路31弄39...| 1| 4| 515.96| 257.98| 27.30| 285.28| 韵达|4472053332463|20220329183828|20220330215828|20220329191128|20220402034728| 已签收| 29| 济南亿次元信息有限公司|2022-04-02 03:47:28|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2513|2022032810875003| 6753| 卿柳| 上海市| 上海市| 上海市上海市辛耕路1299124号14层| 1| 3| 2888.15| 2021.71| 17.57| 884.02| 顺丰|0370967098533|20220329062624|20220330114924|20220330080924|20220402002524| 已签收| 88| 良诺信息有限公司|2022-04-02 00:25:24|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2435|2022032844826283| 19014| 张桂兰| 江苏省| 江苏省江苏省淮安市|江苏省江苏省淮安市清河区翔宇中路1...| 1| 1| 2602.38| 80.00| 40.23| 2562.61| 韵达|1484782939199|20220329050759|20220330202559|20220330085359|20220402030259| 已签收| 256| 网新恒天网络有限公司|2022-04-02 03:02:59|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2322|2022032881798097| 18548| 陈丹丹| 江苏省| 江苏省溧阳市| 江苏省溧阳市天目路123956号上...| 1| 2| 2043.64| 100.00| 14.60| 1958.24| 韵达|8595786486688|20220328061921|20220329121521|20220328194921|20220331172121| 已退款| 196| 彩虹信息有限公司|2022-04-02 03:58:21|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2553|2022032935583382| 6329| 刘婷| 江苏省| 江苏省苏州市| 江苏省苏州市金鸡湖路885370号...| 1| 3| 1734.24| 1387.39| 45.83| 392.68| 韵达|3143365276943|20220330091539|20220331200739|20220330190339|20220402162639| 已签收| 39| MBP软件信息有限公司|2022-04-02 16:26:39|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2537|2022032818405314| 11139| 张旭| 上海市| 上海市| 上海市上海市浦东新区杨高南路428...| 1| 3| 1448.48| 100.00| 28.55| 1377.03| 顺丰|9631168546997|20220329185733|20220330201433|20220329191033|20220402035433| 已签收| 138| 飞利信信息有限公司|2022-04-02 03:54:33|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2334|2022032859756405| 7370| 黄丽娟| 江苏省| 江苏省苏州市| 江苏省苏州市现代大道1699409...| 1| 1| 2229.52| 100.00| 11.01| 2140.53| 中通|4278051015984|20220329082028|20220330154728|20220330073228|20220402042228| 已签收| 214|惠派国际公司信息有限公司|2022-04-02 04:22:28|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2403|2022032788808370| 4541| 阎桂香| 江苏省| 江苏省无锡市| 江苏省无锡市解放东路1000464...| 1| 2| 1052.92| 210.58| 13.67| 856.01| 顺丰|4476220310056|20220328173448|20220330022648|20220329012648|20220401031848| 已退款| 86| 昊嘉科技有限公司|2022-04-02 05:40:48|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2459|2022032955711624| 15084| 黄云| 上海市| 上海市| 上海市上海市水城南路207986号17层| 1| 2| 1720.08| 1204.06| 14.02| 530.04| 顺丰|5130441519061|20220329122210|20220331020110|20220329234010|20220402020310| 已签收| 53| 新格林耐特传媒有限公司|2022-04-02 02:03:10|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
| 2501|2022032965004110| 7594| 谢帆| 贵州省| 贵州省贵阳市|贵州省贵阳市云岩区红边门店普陀路1...| 1| 5| 7990.56| 6392.45| 13.42| 1611.53| 顺丰|8154839863090|20220330170823|20220331215023|20220330233023|20220402213123| 已签收| 161| 合联电子传媒有限公司|2022-04-02 21:31:23|20231131| user1|2023-12-11 11:43:45| user1|2023-12-11 11:43:45|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+--------+---------------+-------------------+---------------+-------------------+
*/
println("正在写入 Hive DWD层 ...")
fact_order_master.write
.format("parquet")
.mode("overwrite") // 覆盖
.partitionBy("etl_date") // 指定分区
.saveAsTable("dwd.fact_order_master") // 保存到Hive表
println("已完成写入 Hive DWD层")
// 测试
spark.table("dwd.fact_order_master").show()
println("测试 hive dwd ------------------------------------------")
/*
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+---------------+-------------------+---------------+-------------------+--------+
|order_id| order_sn|customer_id|shipping_user|province| city| address|order_source|payment_method|order_money|district_money|shipping_money|payment_money|shipping_comp_name| shipping_sn| create_time| shipping_time| pay_time| receive_time|order_status|order_point| invoice_title| modified_time|dwd_insert_user| dwd_insert_time|dwd_modify_user| dwd_modify_time|etl_date|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+---------------+-------------------+---------------+-------------------+--------+
| 2063|2022032686959471| 7193| 卢鹏| 上海市| 上海市| 上海市上海市黄浦区九江路59587...| 1| 1| 7426.38| 742.64| 18.90| 6702.64| 中通|1177680592862|20220327090656|20220329023556|20220328030856|20220330193056| 已退款| 670| 群英网络有限公司|2022-04-01 03:26:56| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2140|2022032684573934| 15572| 雷建| 上海市| 上海市| 上海市上海市吴江路1288278号20层| 1| 2| 1510.76| 604.30| 12.46| 918.92| 顺丰|5086878998859|20220327083423|20220328114623|20220328063023|20220331015823| 已退款| 92| 易动力网络有限公司|2022-04-01 05:11:23| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2296|2022032845756607| 10681| 张桂兰| 上海市| 上海市| 上海市上海市长寿路11182266...| 1| 2| 7026.05| 1405.21| 30.09| 5650.93| 顺丰|9906913860087|20220328161815|20220329152015|20220329112315|20220401053815| 已签收| 565| 银嘉信息有限公司|2022-04-01 05:38:15| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2312|2022032987273268| 16796| 侯涛| 浙江省|浙江省杭州江干区景昙路18-262...| 浙江省杭州江干区景昙路18-262...| 1| 4| 707.45| 95.00| 17.69| 630.14| 韵达|2046035743206|20220329125319|20220330152619|20220330020919|20220401162419| 已签收| 63| 济南亿次元网络有限公司|2022-04-01 16:24:19| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2326|2022032763122037| 19980| 魏金凤| 浙江省| 浙江省杭州市| 浙江省杭州市西湖区古墩路58884...| 1| 3| 2228.43| 1559.90| 13.91| 682.44| 中通|1577073377562|20220328041523|20220329193723|20220329073623|20220401004923| 已签收| 68| 南康网络有限公司|2022-04-01 00:49:23| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2342|2022032894717891| 8972| 符强| 上海市| 上海市| 上海市上海市沪青平公路288820...| 1| 4| 6173.30| 4321.31| 10.36| 1862.35| 顺丰|9695161512380|20220328163430|20220329123530|20220329091330|20220401044230| 已签收| 186| 方正科技信息有限公司|2022-04-01 04:42:30| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2350|2022032887041347| 4597| 姜岩| 江苏省| 江苏省无锡市| 江苏省无锡市南长区南长街20121...| 1| 4| 4314.53| 2157.27| 41.27| 2198.54| 中通|5283447205985|20220328185032|20220329161332|20220329062132|20220401042032| 已签收| 220| 恩悌信息有限公司|2022-04-01 04:20:32| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2354|2022032733529686| 14993| 尹晶| 上海市| 上海市| 上海市上海市四川北路1661767...| 1| 2| 8535.21| 95.00| 20.41| 8460.62| 中通|3976165724549|20220328090635|20220329220435|20220329084235|20220401015335| 已签收| 846| 立信电子传媒有限公司|2022-04-01 01:53:35| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2362|2022032714983348| 7925| 罗淑华| 上海市| 上海市| 上海市上海市常德路8008918号8层| 1| 4| 5286.17| 80.00| 15.18| 5221.35| 韵达|9110286972135|20220328050040|20220329180940|20220329094040|20220401044540| 已签收| 522| 易动力科技有限公司|2022-04-01 04:45:40| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2370|2022032814968220| 14159| 邢涛| 上海市| 上海市| 上海市上海市嘉定区佳通路31弄34...| 1| 2| 6690.56| 80.00| 42.28| 6652.84| 顺丰|9638866501792|20220328052340|20220330002240|20220329091840|20220401041340| 已签收| 665| 凌颖信息科技有限公司|2022-04-01 04:13:40| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2334|2022032859756405| 7370| 黄丽娟| 江苏省| 江苏省苏州市| 江苏省苏州市现代大道1699409...| 1| 1| 2229.52| 100.00| 11.01| 2140.53| 中通|4278051015984|20220329082028|20220330154728|20220330073228|20220402042228| 已签收| 214|惠派国际公司信息有限公司|2022-04-02 04:22:28| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2403|2022032788808370| 4541| 阎桂香| 江苏省| 江苏省无锡市| 江苏省无锡市解放东路1000464...| 1| 2| 1052.92| 210.58| 13.67| 856.01| 顺丰|4476220310056|20220328173448|20220330022648|20220329012648|20220401031848| 已退款| 86| 昊嘉科技有限公司|2022-04-02 05:40:48| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2459|2022032955711624| 15084| 黄云| 上海市| 上海市| 上海市上海市水城南路207986号17层| 1| 2| 1720.08| 1204.06| 14.02| 530.04| 顺丰|5130441519061|20220329122210|20220331020110|20220329234010|20220402020310| 已签收| 53| 新格林耐特传媒有限公司|2022-04-02 02:03:10| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2501|2022032965004110| 7594| 谢帆| 贵州省| 贵州省贵阳市|贵州省贵阳市云岩区红边门店普陀路1...| 1| 5| 7990.56| 6392.45| 13.42| 1611.53| 顺丰|8154839863090|20220330170823|20220331215023|20220330233023|20220402213123| 已签收| 161| 合联电子传媒有限公司|2022-04-02 21:31:23| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2529|2022032866253715| 15300| 姚博| 上海市| 上海市| 上海市上海市嘉定区佳通路31弄39...| 1| 4| 515.96| 257.98| 27.30| 285.28| 韵达|4472053332463|20220329183828|20220330215828|20220329191128|20220402034728| 已签收| 29| 济南亿次元信息有限公司|2022-04-02 03:47:28| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2513|2022032810875003| 6753| 卿柳| 上海市| 上海市| 上海市上海市辛耕路1299124号14层| 1| 3| 2888.15| 2021.71| 17.57| 884.02| 顺丰|0370967098533|20220329062624|20220330114924|20220330080924|20220402002524| 已签收| 88| 良诺信息有限公司|2022-04-02 00:25:24| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2435|2022032844826283| 19014| 张桂兰| 江苏省| 江苏省江苏省淮安市|江苏省江苏省淮安市清河区翔宇中路1...| 1| 1| 2602.38| 80.00| 40.23| 2562.61| 韵达|1484782939199|20220329050759|20220330202559|20220330085359|20220402030259| 已签收| 256| 网新恒天网络有限公司|2022-04-02 03:02:59| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2322|2022032881798097| 18548| 陈丹丹| 江苏省| 江苏省溧阳市| 江苏省溧阳市天目路123956号上...| 1| 2| 2043.64| 100.00| 14.60| 1958.24| 韵达|8595786486688|20220328061921|20220329121521|20220328194921|20220331172121| 已退款| 196| 彩虹信息有限公司|2022-04-02 03:58:21| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2553|2022032935583382| 6329| 刘婷| 江苏省| 江苏省苏州市| 江苏省苏州市金鸡湖路885370号...| 1| 3| 1734.24| 1387.39| 45.83| 392.68| 韵达|3143365276943|20220330091539|20220331200739|20220330190339|20220402162639| 已签收| 39| MBP软件信息有限公司|2022-04-02 16:26:39| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
| 2537|2022032818405314| 11139| 张旭| 上海市| 上海市| 上海市上海市浦东新区杨高南路428...| 1| 3| 1448.48| 100.00| 28.55| 1377.03| 顺丰|9631168546997|20220329185733|20220330201433|20220329191033|20220402035433| 已签收| 138| 飞利信信息有限公司|2022-04-02 03:54:33| user1|2023-12-11 12:30:19| user1|2023-12-11 12:30:19|20231131|
+--------+----------------+-----------+-------------+--------+-------------------------------+------------------------------------+------------+--------------+-----------+--------------+--------------+-------------+------------------+-------------+--------------+--------------+--------------+--------------+------------+-----------+------------------------+-------------------+---------------+-------------------+---------------+-------------------+--------+
*/
// 停止会话连接
spark.stop()
}
// 使用 newAPIHadoopRDD 读取数据
def read(spark:SparkSession):DataFrame = {
// 1) 创建HbaseConfiguration配置信息
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "192.168.190.139") //设置zooKeeper集群地址,也可以通过将hbase-site.xml拷贝到resources目录下
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
hbaseConf.set("hbase.master", "192.168.190.139:16000") // 设置HBase HMaster
// 设置从HBase那张表读取数据
val tablename = "hbase_order_master" // 要读取的hbase表名
hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename) // 注意:读取时使用TableInputFormat
// 2)添加filter => TableMapReduceUtil.convertScanToString(getScan)
hbaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(getScan))
// 3)调用SparkContext中newAPIHadoopRDD读取表中的数据,构建RDD
val resultRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
// 测试获取的数据
// println(s"count = ${resultRDD.count()}")
// resultRDD.foreach(println)
// resultRDD.take(2).foreach(println)
/*
resultRDD.take(2).foreach{
case (_, result) =>
println(s"rowKey = ${Bytes.toString(result.getRow)}")
for (cell <- result.rawCells()) {
val cf = Bytes.toString(CellUtil.cloneFamily(cell)) // 列族
val colum = Bytes.toString(CellUtil.cloneQualifier(cell)) // 列限定符
val value = Bytes.toString(CellUtil.cloneValue(cell)) // 列值
println(s"\t$cf:$colum = $value")
}
}
*/
// 4)对获取的值进行过滤,并转换为DataFrame返回
import spark.implicits._
resultRDD
.map { t =>
val result = t._2 // Result
// 通过列族和列名获取列
val order_id = Bytes.toLong(result.getValue("ods".getBytes, "order_id".getBytes))
val order_sn = Bytes.toString(result.getValue("ods".getBytes, "order_sn".getBytes))
val customer_id = Bytes.toLong(result.getValue("ods".getBytes, "customer_id".getBytes))
val shipping_user = Bytes.toString(result.getValue("ods".getBytes, "shipping_user".getBytes))
val province = Bytes.toString(result.getValue("ods".getBytes, "province".getBytes))
val city = Bytes.toString(result.getValue("ods".getBytes, "city".getBytes))
val address = Bytes.toString(result.getValue("ods".getBytes, "address".getBytes))
val order_source = Bytes.toInt(result.getValue("ods".getBytes, "order_source".getBytes))
val payment_method = Bytes.toInt(result.getValue("ods".getBytes, "payment_method".getBytes))
val order_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "order_money".getBytes))
val district_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "district_money".getBytes))
val shipping_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "shipping_money".getBytes))
val payment_money = Bytes.toBigDecimal(result.getValue("ods".getBytes, "payment_money".getBytes))
val shipping_comp_name = Bytes.toString(result.getValue("ods".getBytes, "shipping_comp_name".getBytes))
val shipping_sn = Bytes.toString(result.getValue("ods".getBytes, "shipping_sn".getBytes))
val create_time = Bytes.toString(result.getValue("ods".getBytes, "create_time".getBytes))
val shipping_time = Bytes.toString(result.getValue("ods".getBytes, "shipping_time".getBytes))
val pay_time = Bytes.toString(result.getValue("ods".getBytes, "pay_time".getBytes))
val receive_time = Bytes.toString(result.getValue("ods".getBytes, "receive_time".getBytes))
val order_status = Bytes.toString(result.getValue("ods".getBytes, "order_status".getBytes))
val order_point = Bytes.toLong(result.getValue("ods".getBytes, "order_point".getBytes))
val invoice_title = Bytes.toString(result.getValue("ods".getBytes, "invoice_title".getBytes))
val modified_time = Bytes.toString(result.getValue("ods".getBytes, "modified_time".getBytes))
// 构造为OrderMaster对象实例
OrderMaster(order_id,
order_sn,
customer_id,
shipping_user,
province,
city,
address,
order_source,
payment_method,
order_money,
district_money,
shipping_money,
payment_money,
shipping_comp_name,
shipping_sn,
create_time,
shipping_time,
pay_time,
receive_time,
order_status,
order_point,
invoice_title,
modified_time)
}
// 转换为DataFrame
.toDF()
}
// case class
case class OrderMaster(
order_id: Long,
order_sn: String,
customer_id: Long,
shipping_user: String,
province: String,
city: String,
address: String,
order_source: Int,
payment_method: Int,
order_money: BigDecimal, // BigDecimal 默认为(38,18)
district_money: BigDecimal,
shipping_money: BigDecimal,
payment_money: BigDecimal,
shipping_comp_name: String,
shipping_sn: String,
create_time: String,
shipping_time: String,
pay_time: String,
receive_time: String,
order_status: String,
order_point: Long,
invoice_title: String,
modified_time: String // 注意这个字段
)
/* 创建scan过滤, 过滤出rowkey包含'20220402'内容的行
* 假设rowkey格式为:创建日期_发布日期_ID_TITLE
* 目标:查找 rowkey中包含 20220402 的数据
* 使用行过滤器:new RowFilter(CompareOp.EQUAL , new SubstringComparator("20220402"))
* 返回 org.apache.hadoop.hbase.client.Scan
*/
def getScan: Scan = {
// 创建Scan
val scan = new Scan()
// 指定行过滤器(行键必须包含子字符串"20220402")
val rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator("20220402"))
// scan 设置过滤器
scan.setFilter(rowFilter)
scan.withStartRow(Bytes.toBytes("020220402000000000")) // 起始行键
scan.withStopRow(Bytes.toBytes("920220402235959999")) // 结束行键
scan
}
}
运行以上程序,然后到Hive Cli中查看DWD中写入的数据。
第三部分:可能遇到的异常及解决方法
在Spark集成HBase过程中,可能会遇到各种异常。以下是一些常见异常及解决方法。
异常1:
java.lang.NoSuchMethodError: org.apache.hadoop.security.HadoopKerberosName.setRuleMechanism(Ljava/lang/String;)V
解决方法:
在pom.xml文件中,添加如下依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.1.2</version>
</dependency>
异常2:
Exception in thread “main” java.lang.NoClassDefFoundError: com/fasterxml/jackson/core/exc/InputCoercionException
解决方法:
在pom.xml文件中,添加如下依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.5</version>
</dependency>
异常3:在hbase shell中查看表内容时,看不到中文。
这其实不是异常,因为HBase默认是查看编码以后的中文。
解决方法:使用以下方法查看hbase表:
hbase> scan 'hbase_master_order',{FORMATTER => 'toString'}
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容