任务要求:编写Scala 工程代码,使用Flink 消费Kafka 中dwd 层的Topic数据,表结构与离线数据表结构相同。
本任务共有两个子任务组成。点击正文链接,可快速跳转到相应子任务实现部分:
子任务1
子任务1描述
1.使用Flink 消费kafka 中的数据,统计商品的UV 和PV,将结果写入HBase 中的表ads:online_uv_pv 中。使用Hive cli 查询ads.pv_uv_result 表,查询出10 条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下;
子任务1分析
首先了解题目中相关术语的概念和含义。
(1) 商品的UV:可以理解为独立商品数,即单一商品数统计。但是这里题意模糊,所以假定任务是想要实时统计全部商品的UV。
(2) 商品的PV:可以理解为商品总数。因为任务描述中没有明确是统计每个商品的PV,还是全部商品的PV,因此这里假定是要统计订单中全部商品数量的累加。
(3) 商品信息位于商品明细表(order_detail)中。我们另外创建一个fact_order_detail表,然后实时数据获取路径为:insert fact_order_detail => maxwell binlog => Kafka fact_order_detail主题。
(4) Flink流程序消费Kafka fact_order_detail主题数据,实时统计UV和PV,写入HBase表。
(5) 用Hive cli查询HBase,说明要为HBase映射Hive表(多么奇葩的做法啊!)
注意,结果写入HBase中时,rowkey的规则是什么?任务描述中没有给出任何说明。 本参考实现中采取的规则是:rowkey = 随机数(0-9)+ yyyyMMddHHmmssSSS。比赛时以具体要求为准。
将Flink流数据写入到HBase表的方法有多种,其中主要有以下两种方式:
-
(1)直接调用HBase Java API,通过编码构造行键(rowkey)和列,然后以put的方式将数据写入HBase表;
-
(2)使用Flink的Table API和SQL,以临时表的形式写入HBase。
其中第二种方法更加简单、流畅。本示例采用第二种方法,即使用Flink Table API/SQL的形式将流数据写入到HBase表。
子任务1参考实现
请按以下步骤操作。
1)首先,在HBase中,创建命名空间(即任务描述中的数据库)和要写入的表,语句如下:
hbase> create_namespace 'ads'
hbase> create 'ads:online_uv_pv','cf'
2)创建Hive外表
在Hive中创建HBase对应的外表,语句如下:
hive> CREATE DATABASE ads;
hive> CREATE EXTERNAL TABLE ads.online_uv_pv(
key STRING,
uv BIGINT,
pv BIGINT
) ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':key,cf:uv,cf:pv')
tblproperties('hbase.table.name'='ads:online_uv_pv','hbase.table.default.storage.type'='binary');
3)开发环境准备
(1) 在IDEA中创建一个Maven项目。
(2) 打开项目中的pom.xml文件,配置依赖,添加如下依赖项:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 如果使用的是scala语言... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!--HBase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-base_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
</dependency>
<!--JDBC连接器-->
<!-->flink-connector-jdbc flink版本需在1.11.0之后<!-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
-
创建Flink流处理程序Task02Job1.scala,编辑代码如下:
package com.ss2023
import com.google.gson.JsonParser
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
/**
*
* 本版本使用 流API和表API混合的方式
*/
object Task02Job1 {
// case class,流事件类型
case class Product(pid: String)
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(1)
// 流表执行环境
val tableEnvironment = StreamTableEnvironment.create(env)
// kafka source
val source = KafkaSource.builder[String]
.setBootstrapServers("192.168.190.139:9092") // 请替换为自己的IP
.setTopics("fact_order_detail")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build
// 创建Sink表
val product_pv_uv_hbase_create =
"""
|CREATE TEMPORARY TABLE product_pv_uv_hbase (
| row_key STRING,
| cf ROW <uv BIGINT, pv BIGINT>,
| PRIMARY KEY (row_key) NOT ENFORCED
|) WITH ('connector' = 'hbase-2.2',
| 'table-name' = 'ads:online_uv_pv',
| 'zookeeper.quorum' = '192.168.190.139:2181'
| )
|""".stripMargin
tableEnvironment.executeSql("DROP TABLE IF EXISTS product_pv_uv_hbase")
tableEnvironment.executeSql(product_pv_uv_hbase_create)
// 读取源数据,并转换
val dataStream = env
// kafka源
.fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
// map转换
.map(line => {
val jsonObj = JsonParser.parseString(line).getAsJsonObject.getAsJsonObject("product_id") // 取商品id字段
Product(jsonObj.toString) // 构造Product对象
})
// 流转表,返回一个Table
val inputTable = tableEnvironment.fromDataStream(dataStream)
// 将Table对象注册为视图并查询它
tableEnvironment.createTemporaryView("InputTable", inputTable)
// 使用SQL计算rowKey,UV 和 PV
// 其中rowkey使用随机数(0-9)+yyyyMMddHHmmssSSS
tableEnvironment.sqlQuery(
"""
|select concat(cast(rand_integer(10) as string), date_format(now(),'yyyyMMddHHmmssSSS')) as row_key,
|row(count(distinct(pid)), count(pid)) as cf
|from InputTable
|""".stripMargin)
.executeInsert("product_pv_uv_hbase") // 将表写入HBase表
// TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
}
}
子任务1部署准备
1). Flink安装目录的lib文件夹下jar包问题:
(1.1) 将flink-sql-connector-hbase-2.2_2.12-1.14.0.jar包移出lib目录;(如果有这个包的话)
(1.2) 将以下两个jar包拷贝到lib目录下:
-
flink-connector-hbase-2.2_2.12-1.14.0.jar
-
hbase-client-2.2.3.jar
(1.3) 编辑$FLINK_HOME/conf/flink-conf.yaml文件,在文件的最后增加如下一行:
classloader.resolve-order: parent-first
注:Flink版本迭代太快,“萝卜快了不洗泥”,一堆的兼容性问题。以上配置是为了解决可能会出现的包冲突问题。
2). 将hbase-site.xml文件拷贝到项目的resources目录下,并在其中添加如下内容:
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
</property>
3). 在Linux的/etc/profile配置文件中,加入下面这两项:
export HADOOP_CLASSPATH=`hadoop classpath`
export HBASE_CLASSPATH=/opt/module/hbase-2.2.3/lib/*
然后执行以下语句:
source /etc/profile
子任务1执行步骤
请首先确保已经启动了大数据运行环境(在PBCP2023中是执行startall.sh脚本,或者分别启动HDFS、YARN、HBase、Zookeeper、Kafka等),然后按以下步骤执行。
-
启动Hive元数据服务,并保持运行。打开一个终端窗口,执行如下命令:
# hive --service metastore
-
创建Kafka主题fact_order_detail:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic fact_order_detail
# 查看主题
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
-
项目打包。在IDEA中,打开Termianl终端,执行如下命令打包:
mvn clean package
然后将打好的jar包(位于项目的target目录下)拷贝到Linux中。
-
将Flink流处理jar包部署到YARN集群上运行(yarn-per-job模式),命令如下:
# cd /opt/module/flink-1.14.0
# export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
# ./bin/flink run -m yarn-cluster -t yarn-per-job --class com.ss2023.Task02Job1 xxx.jar
请将上述命令中的xxx.jar替换为你部署的jar包名称,类名替换为你自己写的类名。
-
执行数据生成器程序,生成实时订单明细数据发往Kafka。
注:因为样题中对此数据说明模糊不清,未明确如何发送订单明细数据,所以这里使用PBCP自定义的数据生成器程序模拟生成订单明细数据并发往Kafka的fact_order_detail主题。在比赛时正式任务书中应当有明确的说明(例如在实时数据采集阶段同时采集订单明细数据到Kafka等),大家相应进行修改即可。
-
HBase中查看数据
打开hbase shell,执行以下命令,查看插入到person表中的数据:
hbase(main):001:0> scan 'ads:online_uv_pv'
-
在Hive中执行Hive QL查询语句,查询写入HBase中的数据。
hive> select * from ads.online_uv_pv limit 10;
子任务2
子任务2描述
2.使用Flink 消费kafka 中的数据,统计商城每分钟的GMV,将结果存入redis 中,key 为store_gmv,使用redis cli 以get key方式获取store_gmv 值,将每次截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(每分钟查询一次,至少查询3 次)。
子任务2分析
术语GMV
GMV(全称Gross Merchandise Volume),即商品交易总额,是成交总额(一定时间段内)的意思。
请注意GMV和实际交易额的区别。
GMV:那些只要下单付款的订单金额,就会被计入到GMV当中,管你取消不取消订单、拒不拒收货物、退不退货呢;
实际交易额:只有买家收到货并确认收货,事后也不退款的订单金额,才能被计入到实际交易额中。
GMV在电商中是一个非常重要的指标,但不同的电商对于GMV的算法完全不一样;算法不同,计算出的数据也千差万别。
根据题意,我们可以得出以下几点:
(1) 使用订单表order_master进行统计:只取payment_money字段即可。
(2) 只统计订单状态(order_status)为“已付款”的订单的支付金额。
(3) 统计“每分钟”的GMV,意味着使用1分钟大小的滚动窗口。
子任务2实现参考
Flink流处理程序实现代码如下:
package com.ss2023
import com.google.gson.JsonParser
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
//flink需要手动添加隐式转换,implicit
import org.apache.flink.api.scala._
/**
*
* fact_order_master Topic中数据的JSON格式如下:
* {
* "order_id":6097,
* "order_sn":"2022111417659492",
* "customer_id":583,
* "shipping_user":"龚桂芳",
* "province":"上海市",
* "city":"上海市",
* "address":"上海市上海市真光路12889755号3层",
* "order_source":1,
* "payment_method":3,
* "order_money":4125.78,
* "district_money":0.00,
* "shipping_money":39.06,
* "payment_money":4164.84,
* "shipping_comp_name":"韵达",
* "shipping_sn":"9846757521358",
* "create_time":"20221110023759",
* "shipping_time":"20221110132159",
* "pay_time":"20221110032759",
* "receive_time":"20221112203359",
* "order_status":"已签收",
* "order_point":416,
* "invoice_title":"雨林木风计算机传媒有限公司",
* "modified_time":"2022-11-12 12:33:59"
* }
*
*/
object Task03Job2 {
// 输入订单事件类型(只取订单状态和支付金额两个字段)
case class OrderEvent(order_status:String, payment_money: Double)
// 主方法
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度,便于观察
// kafka source
val source = KafkaSource.builder[String]
.setBootstrapServers("192.168.190.139:9092") // 记得改为自己的IP地址
.setTopics("fact_order_master")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build
// 定义redis sink的配置 (默认端口号6379)
val conf = new FlinkJedisPoolConfig.Builder()
.setMaxTotal(1000)
.setMaxIdle(32)
.setTimeout(10*1000)
.setHost("192.168.190.139") // 记得改为自己的IP地址
.setPort(6379)
.build()
// 流处理管道
val orderStream = env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
// 转换为OrderEvent对象
.map(line => {
val jsonObj = JsonParser.parseString(line).getAsJsonObject
val order_status = jsonObj.getAsJsonObject("order_status").toString // 订单状态
val payment_money = jsonObj.getAsJsonObject("payment_money").toDouble // 支付金额
OrderEvent(order_status, payment_money)
})
// 过滤出"已付款"的订单项
.filter(_.order_status=="已付款")
// 分区
.keyBy(_.order_status)
// 指定窗口
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 执行窗口聚合函数
.aggregate(new AggAvgTemp, new ProcessAvgTemp)
// orderStream.print()
// Redis Sink
orderStream.addSink(new RedisSink[Double](conf, new RedisSinkMapper))
// execute program
env.execute("Flink Streaming Task03")
}
// 窗口增量聚合函数
class AggAvgTemp extends AggregateFunction[OrderEvent, Double, Double] {
// 创建初始ACC
override def createAccumulator = 0.0
// 累加每个订单的支付金额
override def add(input: OrderEvent, acc: Double) = {
acc + input.payment_money
}
// 分区合并
override def merge(acc1: Double, acc2: Double) = {
acc1 + acc2
}
// 返回已下单订单的总支付金额
override def getResult(acc: Double): Double = acc
}
// 窗口处理函数
class ProcessAvgTemp extends ProcessWindowFunction[Double, Double, String, TimeWindow] {
override def process(id: String,
context: Context,
events: Iterable[Double],
out: Collector[Double]): Unit = {
// 注意,Iterable[Double]将只包含一个读数,
// 即MyReduceFunction计算出的预先聚合的平均值。
val total_pay_money = events.iterator.next
out.collect(total_pay_money)
}
}
// redisMap接口,设置key和value
// Redis Sink 核心类是 RedisMappe 接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
class RedisSinkMapper extends RedisMapper[Double] {
// getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
override def getCommandDescription: RedisCommandDescription = {
// RedisCommand.HSET 指定存储类型
new RedisCommandDescription(RedisCommand.SET)
}
/**
* 获取 value值 value的数据是键值对
*
* @param data
* @return
*/
//指定key
// 查看所有key:keys * 查看指定key:get top3itemamount
override def getKeyFromData(event: Double): String = "store_gmv"
// 指定value
override def getValueFromData(event: Double): String = event.toString
}
}
项目编译并打包。在IDEA中,打开Termianl终端,执行如下命令打包:
mvn clean package
然后将打好的jar包(位于项目的target目录下)拷贝到Linux中。
子任务2执行步骤
请首先确保已经启动了大数据运行环境(执行startall.sh脚本,或者分别启动HDFS、YARN、HBase、Zookeeper、Kafka等),然后按以下步骤执行。
-
创建Kafka主题fact_order_master(如果之前没有创建的话):
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic fact_order_master
# 查看主题
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
-
运行Redis服务器。在命令行切换到Redis安装目录,执行下面的命令,运行Redis服务器,并保持运行:
# cd /opt/module/redis-6.2.6
# ./bin/redis-server ./conf/redis.conf
-
将Flink流处理jar包部署到YARN集群上运行(yarn-per-job模式),命令如下:
# cd /opt/module/flink-1.14.0
# export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
# ./bin/flink run -m yarn-cluster -t yarn-per-job --class com.ss2023.Task02Job2 xxx.jar
请将上述命令中的xxx.jar替换为你部署的jar包名称。
(5) 执行数据生成器程序,生成实时订单数据发往Kafka。
(6) 另打开一个终端,切换到Redis安装目录,运行Redis CLI:
# cd /opt/module/redis-6.2.6
# ./bin/redis-cli
(7) 然后在redis命令行,执行查询,查询命令如下:
redis> get store_gmv
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容