环境说明
Flink 任务在Yarn 上用per job 模式(即Job 分离模式,不采用Session 模式),方便Yarn 回收资源;
建议使用gson 解析json 数据。
任务描述
编写Java 工程代码,使用Flink 消费Kafka 中dwd 层的Topic 数据,表结构与离线数据表结构相同,时间语义使用Processing Time。
任务分析
注意任务描述中的两个关键词:一是”编写Java工程代码”,二是”使用Processing Time”(即使用处理时间)。
本任务共有两个子任务组成:
子任务1
子任务1描述
1、使用Flink 消费kafka 中log_product_browse 主题的数据,统计商品的UV(浏览用户量)和PV(商品浏览量),将结果写入HBase 中的表ads:online_uv_pv 中。使用Hive cli(没写错)查询ads.pv_uv_result 表按照product_id 和pv 进行降序排序,查询出10 条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下;
其中,表空间为:ads,rowkey 为:计算时的年月日时分秒+商品id,列族为:info,列名为:商品id,商品名称,uv,pv,modified_time
例:Rowkey:2022-10-24 16:47:38-13645
子任务1分析
根据题意,编写Flink程序,读取log_product_browse主题的数据,统计uv和pv值,写入HBase的ads:online_uv_pv表中;
注意,HBase中使用了命名空间ads,hbase表都位于该ads表空间中。
在该任务描述中,涉及到几个行业术语,解释如下:
-
(1) 商品的UV:UV是指独立访问用户数,即Unique Visitor。同一个用户多次浏览商品,UV数仍计为1。因此这里的UV指的是浏览商品的唯一用户量。00:00-24:00内相同的用户只被计算一次。
-
(2) 商品的PV:PV是指页面访问量,即Page View,用户每次对商品的浏览均被记录,用户对同一商品的多次访问要进行累计。因此这里的PV指的是用户对商品的浏览量的累计。
-
(3) 用Hive cli查询HBase,说明要为HBase映射Hive表(”没写错”这句话,有故弄玄虚之嫌。直接说明映射Hive表不好吗?在公司你这样写需求试试,喷不S你!)
注意,这个任务需求说明仍有几点容易产生歧义的地方:
-
(1) 关于行键的说明,”rowkey 为:计算时的年月日时分秒+商品id”,接着给出的示例为”例:Rowkey:2022-10-24 16:47:38-13645″,怎么理解其中的”+”就是”-“?
-
(2) 这是一个聚合统计,但没有给出统计周期,那也就是直接事件驱动了-来一个数据就统计一次,这样做好吗?
-
(3) 需求描述中是”统计商品的UV(浏览用户量)和PV(商品浏览量)”,但是存储到HBase表中的列又包含“商品id,商品名称,uv,pv,modified_time”,这岂不是要统计每个商品的uv和pv?你描写任务需求时说清楚点会s吗?
-
(4) 上面的问题(3)又带来了新的问题:如果写入到HBase表中包含商品名称,那么因为Kafka的log_product_browse主题中的数据只包含有商品id,并不包含商品名称,因此我们还需要引入一个商品信息维表。对于每条流式数据,可以关联该维表数据源,以便为实时计算提供数据关联查询(根据商品id获取商品名称)。
另外,当我们使用uv、pv这两个统计指标时,通常隐含(预设)一个场景的问题:按天统计 pv、uv。即统计每天从0点到当前时间的pv和uv,第二天0点重新开始计数第二天的。大家可以回忆一下(如果你曾经做过自媒体或电商行业,在后台查看平台统计你店铺的uv和pv),是不是这样?
亦或,因为比赛时间不会跨天(即在同一天内完成比赛),并且任务需求中说的是“时间语义使用Processing Time”使用处理时间,因此赛方隐含(预设)的场景是当天的uv、pv统计?所以不必考虑跨天的问题,也就不必考虑累加uv、pv清零问题和状态问题??
所以,作为赛题,不应该这样不专业和这样的模糊不清,而应该明确业务场景问题。否则,评分规则该怎么定?或者我们怎么知道评分规则中是如何判断得分点呢?
面对这种需求描述模糊的问题,真的很让人抓狂。我们暂且认为出题方只考虑了比赛那个时间段的uv、pv计算(从“时间语义使用Processing Time”这句中推测),而不考虑真实的生产环境,那么也就不必使用窗口函数和触发器了,仅简单地基于事件驱动实时统计累加量即可。
数据源和数据格式分析
本子任务涉及到读取Kafka中log_product_browse 主题的数据。该数据来自于实时数据生成器,生成的初始数据格式大致如下:
product_browse:(2730|18456|1|2023042739287771|'20230419230057');
product_browse:(5438|11696|0|0|'20230419193256');
product_browse:(6045|16691|1|2023042737193353|'20230419103857");
product_browse:(9378|11940|1|2023042721662101|'20230419225958');
product_browse:(6544|8248|1|2023042744627979|'20230419233957');
product_browse:(13916|11696|0|0|'20230419174756');
product_browse:(6085|18456|1|2023042739287771|'20230419200157');
product_browse:(2726|8248|0|0|'20230420014757');
product_browse:(7692|8248|1|2023042744627979|'20230420030757");
product_browse:(6684|11696|1|2023042785336981|'20230419231056');
product_browse:(2923|11696|0|0|'20230420002956');
product_browse:(4753|18456|1|2023042739287771|'20230419221957');
product_browse:(3235|8248|1|2023042744627979|'20230420014757');
product_browse:(8008|11696|1|2023042785336981|'20230420010456');
product_browse:(14426|16691|1|2023042737193353|'20230419235457');
观察以上初始数据,可以得知以下几点信息:
-
(1)不包含log_id字段。
-
(2)modified_time字段变成了字符串’yyyyMMddHHmmss’。
-
(3)各字段的含义如下:product_id|customer_id|gen_order|order_sn|modified_time。
然后经过上一个子任务的实时数据处理,写入Kafka的log_product_browse 主题。根据任务描述可知,Kafka的log_product_browse 主题的存储格式为json,每条json字符串的格式大致如下:
{
"log_id":"随机数(0-9)+MMddHHmmssSSS",
"product_id":"2730",
"customer_id":18456,
"gen_order":1,
"order_sn":"2023042739287771",
"modified_time":"20230419230057"
}
不过请注意,不同省在比赛时这个格式也有可能会有调整,但方法是相同的。知晓原理,掌握方法,万变不离其踪,大家学会随机应变即可。
子任务1实现
要完成这样的表写入,使用Flink Table API更加简单和优雅 。因此本任务我们使用Flink Table API/Flink SQL来实现。
1)首先,在HBase中,创建命名空间ads(HBase有两个预定义命名空间: hbase和default)。打开hbase shell,执行语句如下:
hbase> create_namespace 'ads'
2)然后,在HBase中,创建要写入的数据表online_uv_pv。在hbase shell中,执行语句如下:
hbase> create 'ads:online_uv_pv','info'
-
接下来,创建一个Flink流处理程序Task04Job.java,编辑代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Task04Job {
public static void main(String[] args) {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 表执行环境
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// 创建关联维表(来自mysql中的product_info表)
String source_jdbc_table_create = "CREATE TABLE product_info_source (" +
" product_id BIGINT," +
" product_core STRING," +
" product_name STRING," +
" bar_code STRING," +
" brand_id BIGINT," +
" one_category_id INTEGER," +
" two_category_id INTEGER," +
" three_category_id INTEGER," +
" supplier_id BIGINT," +
" price DOUBLE," +
" average_cost DOUBLE," +
" publish_status INTEGER," +
" audit_status INTEGER," +
" weight DOUBLE," +
" length DOUBLE," +
" height DOUBLE," +
" width DOUBLE," +
" color_type INTEGER," +
" production_date STRING," +
" shelf_life BIGINT," +
" descript STRING," +
" indate STRING," +
" modified_time STRING," +
" PRIMARY KEY (product_id) NOT ENFORCED" +
") WITH (" +
" 'connector'='jdbc'," +
" 'username'='root'," +
" 'password'='123456'," +
" 'url'='jdbc:mysql://192.168.190.139:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true'," +
" 'table-name'='product_info'" +
")";
tableEnvironment.executeSql("DROP TABLE IF EXISTS product_info_source");
tableEnvironment.executeSql(source_jdbc_table_create);
// 只取product_id 和 product_name 两个字段
Table product_dim_table = tableEnvironment.sqlQuery("select product_id, product_name from product_info_source");
// 注册到一个临时表中
tableEnvironment.createTemporaryView("product_dim_table", product_dim_table);
// 创建源表
String source_kafka_table_create =
"CREATE TEMPORARY TABLE product_browse_source ( " +
" log_id BIGINT," +
" product_id STRING," +
" customer_id BIGINT," +
" gen_order BIGINT" +
" order_sn STRING," +
" modified_time STRING" +
") WITH ('connector'='kafka'," +
" 'topic'='log_product_browse'," +
" 'properties.bootstrap.servers'='192.168.190.139:9092'," +
" 'properties.group.id'='group-test'," +
" 'format'='json'," +
" 'scan.startup.mode'='latest-offset'" +
")";
tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_source");
tableEnvironment.executeSql(source_kafka_table_create);
// 创建Sink表
String sink_hbase_table_create =
"CREATE TEMPORARY TABLE product_browse_sink (" +
" row_key STRING," +
" info ROW<log_id BIGINT," +
" order_sn STRING," +
" product_id BIGINT," +
" customer_id STRING," +
" gen_order BIGINT" +
" modified_time STRING>," +
" PRIMARY KEY (row_key) NOT ENFORCED" +
") WITH ('connector' = 'hbase-2.2'," +
" 'table-name' = 'ads:online_uv_pv'," +
" 'zookeeper.quorum' = '192.168.190.139:2181'" +
" )";
tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_sink");
tableEnvironment.executeSql(sink_hbase_table_create);
// ETL管道
// rowkey 为:计算时的年月日时分秒+商品id,列族为:info,列名为:商品id,商品名称,uv,pv,modified_time
// 例:Rowkey:2022-10-24 16:47:38-13645
// 加载源表,关联维表,获取每个商品id的商品名称,并计算UV(distinct+count)和PV(count)
String querySql =
"select concat(date_format(now(),'yyyy-MM-dd HH:mm:ss'), '-', product_id) as row_key," +
" row(product_id, product_name, count(distinct(customer_id)), count(product_id)) as info " +
"from (select s.product_id, d.product_name, s.modified_time" +
" from product_browse_source s join product_dim_table d" +
" on s.product_id=d.product_id" +
" ) tmp_tb " +
"group by product_id, product_name";
tableEnvironment.sqlQuery(querySql).executeInsert("product_pv_uv_hbase"); // 将表写入HBase表
// TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
}
}
-
使用mvn clean package命令(或任何其他自己熟悉的方式)将项目打包。
执行步骤
请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。
将Flink流处理jar包部署到YARN集群上运行:
# cd /opt/module/flink-1.14.0
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task04Job xxx.jar
请将上述命令中的xxx.jar替换为你部署的jar包名称。
使用Hive cli查询ads.pv_uv_result 表
首先,创建Hive外表,关联到HBase的ads:online_uv_pv表。在hive cli中,执行如下SQL语句:
hive> CREATE DATABASE ads;
hive> CREATE EXTERNAL TABLE ads.pv_uv_result(
rowkey STRING,
product_id STRING,
product_name STRING,
uv BIGINT,
pv BIGINT,
modified_time STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':rowkey,info:product_id,info:product_name,info:uv,info:pv,info:modified_time')
tblproperties('hbase.table.name'='ads:online_uv_pv','hbase.table.default.storage.type'='binary');
在Hive中执行Hive QL查询语句,查询写入HBase中的数据(按照product_id 和pv 进行降序排序,查询出10 条数据):
hive> select * from ads.pv_uv_result order by product_id desc, pv desc limit 10;
请注意,如果执行中遇到疑问或错误,请参考。
附:pom.xml文件参考
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.11</scala.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 如果使用的是scala语言... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!--HBase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-base_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
</dependency>
<!--JDBC连接器-->
<!-->flink-connector-jdbc flink版本需在1.11.0之后<!-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
子任务2
子任务2描述
2、使用Flink 消费kafka 中fact_order_detail 主题的数据,统计商城每分钟的GMV(结果四舍五入保留两位小数),将结果存入redis 中(value 为字符串格式,仅存GMV),key 为store_gmv,使用redis cli 以get key 方式获取store_gmv 值,将每次截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(每分钟查询一次,至少查询3 次)。
(GMV:所有订单金额,购买商品单价*购买商品数量,包括已下单未付款)
子任务2分析
术语GMV:
GMV(全称Gross Merchandise Volume),即商品交易总额 ,是成交总额(一定时间段内)的意思。GMV在电商中是一个非常重要的指标,但不同的电商对于GMV的算法完全不一样;算法不同,计算出的数据也千差万别。
GMV和实际交易额的区别:
-
GMV:那些只要下单付款的订单金额,就会被计入到GMV当中,管你取消不取消订单、拒不拒收货物、退不退货呢;
-
实际交易额:只有买家收到货并确认收货,事后也不退款的订单金额,才能被计入到实际交易额中。
任务分析:
-
(1) 统计“每分钟”的GMV,意味着使用1分钟大小的滚动窗口(处理时间)。
-
(2) 实际上,本任务直接使用订单表order_master进行统计即可。不太理解为什么要用order_detail表。
根据任务书中的描述,kafka 中fact_order_detail 主题的数据以json格式存储,大致如下:
{
"order_detail_id":1,
"order_sn":"2022111496083548",
"product_id":5380,
"product_name":"无线5.0蓝牙耳机双耳入耳式迷你跑步运动",
"product_cnt":3,
"product_price":702.53,
"average_cost":0.00,
"weight":4.89577,
"fee_money":25.80,
"w_id":1291,
"create_time":"20221109153354",
"modified_time":"2022-11-10 04:55:54"
}
完成本任务,只涉及其中的product_cnt和product_price字段。
子任务2实现
另创建一个Flink流处理程序Task05Job.java,编辑代码如下:
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class Task05Job {
// 输入订单事件类型(只取订单状态和支付金额)
public static class OrderDetailEvent{
int productCnt; // 购买商品数量
double productPrice; // 购买商品单价
public OrderDetailEvent() {}
public OrderDetailEvent(int productCnt,double productPrice) {
this.productCnt = productCnt;
this.productPrice = productPrice;
}
@Override
public String toString() {
return "OrderDetailEvent{" +
"productCnt=" + productCnt +
", productPrice=" + productPrice +
'}';
}
}
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.190.139:9092") // 注意,如果是kafka集群,请使用broker列表
.setTopics("fact_order_detail") // kafka topic
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 定义redis sink的配置 (默认端口号6379)
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setMaxTotal(1000)
.setMaxIdle(32)
.setTimeout(10*1000)
.setHost("192.168.190.139")
.setPort(6379)
.build();
// 流处理管道
DataStream<Double> orderStream = env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.<String>noWatermarks(), "Kafka Source")
.map(new MapFunction<String, OrderDetailEvent>() {
@Override
public OrderDetailEvent map(String line) throws Exception {
// 转换为OrderDetailEvent对象
// 注意,比赛时根据接收到的json数据自行进行调整
JsonObject jsonObj = JsonParser.parseString(line).getAsJsonObject();
int productCnt = jsonObj.get("product_cnt").getAsInt(); // 商品数量
double productPrice = jsonObj.get("product_price").getAsDouble(); // 商品金额
// System.out.println("product_cnt:" + productCnt + ", product_price:" + productPrice);
return new OrderDetailEvent(productCnt,productPrice);
}
})
// 转换为keyed stream
.keyBy(new KeySelector<OrderDetailEvent, Integer>() {
@Override
public Integer getKey(OrderDetailEvent value) throws Exception {
return 0;
}
})
// 指定窗口
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
// 执行窗口聚合函数
.aggregate(new AggGmvTemp());
orderStream.print();
// data sink
orderStream.addSink(new RedisSink<Double>(conf, new RedisSinkMapper()));
// execute program
env.execute("Flink Streaming");
}
// 窗口增量聚合函数: IN, ACC, OUT
static class AggGmvTemp implements AggregateFunction<OrderDetailEvent, Double, Double> {
// 创建初始ACC
@Override
public Double createAccumulator() {
return 0.00;
}
// 累加每个订单项的支付金额
@Override
public Double add(OrderDetailEvent value, Double accumulator) {
return accumulator + value.productCnt * value.productPrice;
}
// 分区合并
@Override
public Double getResult(Double accumulator) {
return accumulator;
}
// 返回已下单订单的总支付金额
@Override
public Double merge(Double acc1, Double acc2) {
return acc1 + acc2;
}
}
// redisMap接口,设置key和value
// Redis Sink 核心类是 RedisMappe 接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
static class RedisSinkMapper implements RedisMapper<Double> {
// getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
@Override
public RedisCommandDescription getCommandDescription() {
// RedisCommand.SET 指定存储类型
return new RedisCommandDescription(RedisCommand.SET);
}
/**
* 获取 value值 value的数据是键值对
*
* @param event
* @return
*/
//指定key
// 查看所有key:keys * 查看指定key:get store_gmv
@Override
public String getKeyFromData(Double event) {
return "store_gmv";
}
// 指定value
@Override
public String getValueFromData(Double event) {
return String.valueOf(event);
}
}
}
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容