2024年福建省大数据应用开发赛项样题解析-模块C:实时数据处理-任务二:实时指标计算

任务要求:编写Scala 工程代码,使用Flink 消费Kafka 中dwd 层的Topic数据,表结构与离线数据表结构相同。

本任务共有两个子任务组成。点击正文链接,可快速跳转到相应子任务实现部分:

  • 查看子任务1的实现

  • 查看子任务2的实现

子任务1

子任务1描述

1.使用Flink 消费kafka 中的数据,统计商品的UV 和PV,将结果写入HBase 中的表ads:online_uv_pv 中。使用Hive cli 查询ads.pv_uv_result 表,查询出10 条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下;

子任务1分析

首先了解题目中相关术语的概念和含义。

(1) 商品的UV:可以理解为独立商品数,即单一商品数统计。但是这里题意模糊,所以假定任务是想要实时统计全部商品的UV。

(2) 商品的PV:可以理解为商品总数。因为任务描述中没有明确是统计每个商品的PV,还是全部商品的PV,因此这里假定是要统计订单中全部商品数量的累加。

(3) 商品信息位于商品明细表(order_detail)中。我们另外创建一个fact_order_detail表,然后实时数据获取路径为:insert fact_order_detail => maxwell binlog => Kafka fact_order_detail主题。

(4) Flink流程序消费Kafka fact_order_detail主题数据,实时统计UV和PV,写入HBase表。

(5) 用Hive cli查询HBase,说明要为HBase映射Hive表(多么奇葩的做法啊!)

注意,结果写入HBase中时,rowkey的规则是什么?任务描述中没有给出任何说明。 本参考实现中采取的规则是:rowkey = 随机数(0-9)+ yyyyMMddHHmmssSSS。比赛时以具体要求为准。

将Flink流数据写入到HBase表的方法有多种,其中主要有以下两种方式:

  • (1)直接调用HBase Java API,通过编码构造行键(rowkey)和列,然后以put的方式将数据写入HBase表;

  • (2)使用Flink的Table API和SQL,以临时表的形式写入HBase。

其中第二种方法更加简单、流畅。本示例采用第二种方法,即使用Flink Table API/SQL的形式将流数据写入到HBase表。

子任务1参考实现

请按以下步骤操作。

1)首先,在HBase中,创建命名空间(即任务描述中的数据库)和要写入的表,语句如下:

hbase> create_namespace 'ads'
hbase> create 'ads:online_uv_pv','cf'

2)创建Hive外表

在Hive中创建HBase对应的外表,语句如下:

hive> CREATE DATABASE ads;

hive> CREATE EXTERNAL TABLE ads.online_uv_pv(
    key STRING,
    uv BIGINT,
    pv BIGINT
) ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':key,cf:uv,cf:pv')
tblproperties('hbase.table.name'='ads:online_uv_pv','hbase.table.default.storage.type'='binary');

3)开发环境准备

(1) 在IDEA中创建一个Maven项目。

(2) 打开项目中的pom.xml文件,配置依赖,添加如下依赖项:

    <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <flink.version>1.14.0</flink.version>
      <target.java.version>1.8</target.java.version>
      <scala.binary.version>2.12</scala.binary.version>
      <scala.version>2.12.11</scala.version>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <log4j.version>2.12.1</log4j.version>
  </properties>

  <dependencies>
      <!-- Apache Flink dependencies -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <!-- 如果使用的是scala语言... -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-common</artifactId>
          <version>${flink.version}</version>
      </dependency>

      <!-- Scala Library, provided by Flink as well. -->
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
          <scope>provided</scope>
      </dependency>

      <!-- Add connector dependencies here. They must be in the default scope (compile). -->
      <!-- Example:-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      <!--HBase -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-hbase-base_2.12</artifactId>
          <version>1.14.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-hbase-2.2_2.12</artifactId>
          <version>1.14.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>2.2.3</version>
      </dependency>

      <!--JDBC连接器-->
      <!-->flink-connector-jdbc flink版本需在1.11.0之后<!-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.48</version>
      </dependency>

      <!-- Add logging framework, to produce console output when running in the IDE. -->
      <!-- These dependencies are excluded from the application JAR by default. -->
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-slf4j-impl</artifactId>
          <version>${log4j.version}</version>
          <scope>runtime</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-api</artifactId>
          <version>${log4j.version}</version>
          <scope>runtime</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-core</artifactId>
          <version>${log4j.version}</version>
          <scope>runtime</scope>
      </dependency>
  </dependencies>
  1. 创建Flink流处理程序Task02Job1.scala,编辑代码如下:

package com.ss2023

import com.google.gson.JsonParser
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._

/**
*
* 本版本使用 流API和表API混合的方式
*/

object Task02Job1 {

// case class,流事件类型
case class Product(pid: String)

def main(args: Array[String]) {
  // 设置流执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  // 设置并行度
  env.setParallelism(1)

  // 流表执行环境
  val tableEnvironment = StreamTableEnvironment.create(env)

  // kafka source
  val source = KafkaSource.builder[String]
    .setBootstrapServers("192.168.190.139:9092")     // 请替换为自己的IP
    .setTopics("fact_order_detail")
    .setGroupId("group-test")
    .setStartingOffsets(OffsetsInitializer.earliest)
    .setValueOnlyDeserializer(new SimpleStringSchema)
    .build

  // 创建Sink表
  val product_pv_uv_hbase_create =
    """
      |CREATE TEMPORARY TABLE product_pv_uv_hbase (
      |   row_key STRING,
      |   cf ROW <uv BIGINT, pv BIGINT>,
      |   PRIMARY KEY (row_key) NOT ENFORCED
      |) WITH ('connector' = 'hbase-2.2',
      |       'table-name' = 'ads:online_uv_pv',
      |       'zookeeper.quorum' = '192.168.190.139:2181'
      | )
      |""".stripMargin
  tableEnvironment.executeSql("DROP TABLE IF EXISTS product_pv_uv_hbase")
  tableEnvironment.executeSql(product_pv_uv_hbase_create)

  // 读取源数据,并转换
  val dataStream = env
    // kafka源
    .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
    // map转换
    .map(line => {
val jsonObj = JsonParser.parseString(line).getAsJsonObject.getAsJsonObject("product_id") // 取商品id字段        
      Product(jsonObj.toString)       // 构造Product对象
    })

  // 流转表,返回一个Table
  val inputTable = tableEnvironment.fromDataStream(dataStream)

  // 将Table对象注册为视图并查询它
  tableEnvironment.createTemporaryView("InputTable", inputTable)

  // 使用SQL计算rowKey,UV 和 PV
  // 其中rowkey使用随机数(0-9)+yyyyMMddHHmmssSSS
  tableEnvironment.sqlQuery(
    """
      |select concat(cast(rand_integer(10) as string), date_format(now(),'yyyyMMddHHmmssSSS')) as row_key,
      |row(count(distinct(pid)), count(pid)) as cf
      |from InputTable
      |""".stripMargin)
    .executeInsert("product_pv_uv_hbase")   // 将表写入HBase表

  // TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
}

}

子任务1部署准备

1). Flink安装目录的lib文件夹下jar包问题:

(1.1) 将flink-sql-connector-hbase-2.2_2.12-1.14.0.jar包移出lib目录;(如果有这个包的话)

(1.2) 将以下两个jar包拷贝到lib目录下:

  • flink-connector-hbase-2.2_2.12-1.14.0.jar

  • hbase-client-2.2.3.jar

(1.3) 编辑$FLINK_HOME/conf/flink-conf.yaml文件,在文件的最后增加如下一行:

classloader.resolve-order: parent-first

注:Flink版本迭代太快,“萝卜快了不洗泥”,一堆的兼容性问题。以上配置是为了解决可能会出现的包冲突问题。

2). 将hbase-site.xml文件拷贝到项目的resources目录下,并在其中添加如下内容:

<property>
  <name>hbase.defaults.for.version.skip</name>
  <value>true</value>
</property>

3). 在Linux的/etc/profile配置文件中,加入下面这两项:

export HADOOP_CLASSPATH=`hadoop classpath`
export HBASE_CLASSPATH=/opt/module/hbase-2.2.3/lib/*

然后执行以下语句:

source /etc/profile

子任务1执行步骤

请首先确保已经启动了大数据运行环境(在PBCP2023中是执行startall.sh脚本,或者分别启动HDFS、YARN、HBase、Zookeeper、Kafka等),然后按以下步骤执行。

  1. 启动Hive元数据服务,并保持运行。打开一个终端窗口,执行如下命令:

# hive --service metastore
  1. 创建Kafka主题fact_order_detail:

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic fact_order_detail

# 查看主题
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
  1. 项目打包。在IDEA中,打开Termianl终端,执行如下命令打包:

mvn clean package

然后将打好的jar包(位于项目的target目录下)拷贝到Linux中。

  1. 将Flink流处理jar包部署到YARN集群上运行(yarn-per-job模式),命令如下:

# cd /opt/module/flink-1.14.0

# export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
# ./bin/flink run -m yarn-cluster -t yarn-per-job --class com.ss2023.Task02Job1 xxx.jar

请将上述命令中的xxx.jar替换为你部署的jar包名称,类名替换为你自己写的类名。

  1. 执行数据生成器程序,生成实时订单明细数据发往Kafka。

注:因为样题中对此数据说明模糊不清,未明确如何发送订单明细数据,所以这里使用PBCP自定义的数据生成器程序模拟生成订单明细数据并发往Kafka的fact_order_detail主题。在比赛时正式任务书中应当有明确的说明(例如在实时数据采集阶段同时采集订单明细数据到Kafka等),大家相应进行修改即可。

  1. HBase中查看数据

打开hbase shell,执行以下命令,查看插入到person表中的数据:

hbase(main):001:0> scan 'ads:online_uv_pv'
  1. 在Hive中执行Hive QL查询语句,查询写入HBase中的数据。

hive> select * from ads.online_uv_pv limit 10;

子任务2

子任务2描述

2.使用Flink 消费kafka 中的数据,统计商城每分钟的GMV,将结果存入redis 中,key 为store_gmv,使用redis cli 以get key方式获取store_gmv 值,将每次截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(每分钟查询一次,至少查询3 次)。

子任务2分析

术语GMV

GMV(全称Gross Merchandise Volume),即商品交易总额,是成交总额(一定时间段内)的意思。

请注意GMV和实际交易额的区别。

GMV:那些只要下单付款的订单金额,就会被计入到GMV当中,管你取消不取消订单、拒不拒收货物、退不退货呢;

实际交易额:只有买家收到货并确认收货,事后也不退款的订单金额,才能被计入到实际交易额中。

GMV在电商中是一个非常重要的指标,但不同的电商对于GMV的算法完全不一样;算法不同,计算出的数据也千差万别。

根据题意,我们可以得出以下几点:

(1) 使用订单表order_master进行统计:只取payment_money字段即可。

(2) 只统计订单状态(order_status)为“已付款”的订单的支付金额。

(3) 统计“每分钟”的GMV,意味着使用1分钟大小的滚动窗口。

子任务2实现参考

Flink流处理程序实现代码如下:

package com.ss2023

import com.google.gson.JsonParser
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
//flink需要手动添加隐式转换,implicit
import org.apache.flink.api.scala._

/**
*
* fact_order_master Topic中数据的JSON格式如下:
* {
* "order_id":6097,
* "order_sn":"2022111417659492",
* "customer_id":583,
* "shipping_user":"龚桂芳",
* "province":"上海市",
* "city":"上海市",
* "address":"上海市上海市真光路12889755号3层",
* "order_source":1,
* "payment_method":3,
* "order_money":4125.78,
* "district_money":0.00,
* "shipping_money":39.06,
* "payment_money":4164.84,
* "shipping_comp_name":"韵达",
* "shipping_sn":"9846757521358",
* "create_time":"20221110023759",
* "shipping_time":"20221110132159",
* "pay_time":"20221110032759",
* "receive_time":"20221112203359",
* "order_status":"已签收",
* "order_point":416,
* "invoice_title":"雨林木风计算机传媒有限公司",
* "modified_time":"2022-11-12 12:33:59"
* }
*
*/

object Task03Job2 {

// 输入订单事件类型(只取订单状态和支付金额两个字段)
case class OrderEvent(order_status:String, payment_money: Double)

// 主方法
def main(args: Array[String]) {
  // 设置流执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)   // 设置并行度,便于观察

  // kafka source
  val source = KafkaSource.builder[String]
    .setBootstrapServers("192.168.190.139:9092")       // 记得改为自己的IP地址
    .setTopics("fact_order_master")
    .setGroupId("group-test")
    .setStartingOffsets(OffsetsInitializer.earliest)
    .setValueOnlyDeserializer(new SimpleStringSchema)
    .build

  // 定义redis sink的配置 (默认端口号6379)
  val conf = new FlinkJedisPoolConfig.Builder()
    .setMaxTotal(1000)
    .setMaxIdle(32)
    .setTimeout(10*1000)
    .setHost("192.168.190.139")   // 记得改为自己的IP地址
    .setPort(6379)
    .build()

  // 流处理管道
  val orderStream = env
    // 指定Kafka数据源
    .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
    // 转换为OrderEvent对象
    .map(line => {
val jsonObj = JsonParser.parseString(line).getAsJsonObject
      val order_status = jsonObj.getAsJsonObject("order_status").toString   // 订单状态
      val payment_money = jsonObj.getAsJsonObject("payment_money").toDouble // 支付金额
      OrderEvent(order_status, payment_money)
    })
    // 过滤出"已付款"的订单项
    .filter(_.order_status=="已付款")
    // 分区
    .keyBy(_.order_status)
    // 指定窗口
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 执行窗口聚合函数
    .aggregate(new AggAvgTemp, new ProcessAvgTemp)

  // orderStream.print()

  // Redis Sink
  orderStream.addSink(new RedisSink[Double](conf, new RedisSinkMapper))

  // execute program
  env.execute("Flink Streaming Task03")
}

// 窗口增量聚合函数
class AggAvgTemp extends AggregateFunction[OrderEvent, Double, Double] {
  // 创建初始ACC
  override def createAccumulator = 0.0

  // 累加每个订单的支付金额
  override def add(input: OrderEvent, acc: Double) = {
    acc + input.payment_money
  }

  // 分区合并
  override def merge(acc1: Double, acc2: Double) = {
    acc1 + acc2
  }

  // 返回已下单订单的总支付金额
  override def getResult(acc: Double): Double = acc
}

// 窗口处理函数
class ProcessAvgTemp extends ProcessWindowFunction[Double, Double, String, TimeWindow] {
  override def process(id: String,
                        context: Context,
                        events: Iterable[Double],
                        out: Collector[Double]): Unit = {
    // 注意,Iterable[Double]将只包含一个读数,
    // 即MyReduceFunction计算出的预先聚合的平均值。
    val total_pay_money = events.iterator.next
    out.collect(total_pay_money)
  }
}

// redisMap接口,设置key和value
// Redis Sink 核心类是 RedisMappe 接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
class RedisSinkMapper extends RedisMapper[Double] {
  // getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
  override def getCommandDescription: RedisCommandDescription = {
    // RedisCommand.HSET 指定存储类型
    new RedisCommandDescription(RedisCommand.SET)
  }

  /**
    * 获取 value值 value的数据是键值对
    *
    * @param data
    * @return
    */
  //指定key
  // 查看所有key:keys *             查看指定key:get top3itemamount
  override def getKeyFromData(event: Double): String = "store_gmv"

  // 指定value
  override def getValueFromData(event: Double): String = event.toString
}
}

项目编译并打包。在IDEA中,打开Termianl终端,执行如下命令打包:

mvn clean package

然后将打好的jar包(位于项目的target目录下)拷贝到Linux中。

子任务2执行步骤

请首先确保已经启动了大数据运行环境(执行startall.sh脚本,或者分别启动HDFS、YARN、HBase、Zookeeper、Kafka等),然后按以下步骤执行。

  1. 创建Kafka主题fact_order_master(如果之前没有创建的话):

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic fact_order_master

# 查看主题
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
  1. 运行Redis服务器。在命令行切换到Redis安装目录,执行下面的命令,运行Redis服务器,并保持运行:

# cd /opt/module/redis-6.2.6

# ./bin/redis-server ./conf/redis.conf
  1. 将Flink流处理jar包部署到YARN集群上运行(yarn-per-job模式),命令如下:

# cd /opt/module/flink-1.14.0

# export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_CLASSPATH
# ./bin/flink run -m yarn-cluster -t yarn-per-job --class com.ss2023.Task02Job2 xxx.jar

请将上述命令中的xxx.jar替换为你部署的jar包名称。

(5) 执行数据生成器程序,生成实时订单数据发往Kafka。

(6) 另打开一个终端,切换到Redis安装目录,运行Redis CLI:

# cd /opt/module/redis-6.2.6
# ./bin/redis-cli

(7) 然后在redis命令行,执行查询,查询命令如下:

redis> get store_gmv
© 版权声明
THE END
喜欢就支持一下吧
点赞266赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容