环境说明
Flink 任务在Yarn 上用per job 模式(即Job 分离模式,不采用Session 模式),方便Yarn 回收资源;
建议使用gson 解析json 数据。
任务描述
编写Java 工程代码,使用Flink 消费Kafka 中Topic 为ods_mall_log 和ods_mall_data 的数据并进行相应的数据统计计算(使用Processing Time)。
任务分析
注意任务描述中的两个关键词:一是”编写Java工程代码”,二是”使用Processing Time”(即使用处理时间)。
这里对出题方提出表扬,因为出题方终于听取了老师们意见,在本次的后续子任务描述中,不但纠正了原来一些明显的错误描述,而且应老师们的强烈要求,给出了比较详细的表结构(虽然仍然没有给出业务解释),这样参赛学员终于可以有一些数据线索可循。
本任务共有三个子任务组成:
子任务1
子任务1描述
1、使用Flink 消费Kafka 中topic 为ods_mall_data 的数据,根据数据中不同的表将数据分别分发至kafka 的DWD 层的fact_order_master 、 fact_order_detail 的Topic 中(只获取data 的内容,具体的内容格式请自查,其分区数均为2),其他的表则无需处理。使用Kafka 自带的消费者消 费fact_order_master(Topic)的前1 条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下;
fact_order_master 表结构,存储位置:Kafka,存储格式:json
字段 | 类型 | 中文含义(和MySQL中相同) | 备注 |
---|---|---|---|
order_id | int | ||
order_sn | string | ||
customer_id | int | ||
shipping_user | string | ||
province | string | ||
city | string | ||
address | string | ||
order_source | int | ||
payment_method | int | ||
order_money | double | ||
district_money | double | ||
shipping_money | double | ||
payment_money | double | ||
shipping_comp_name | string | ||
shipping_sn | string | ||
create_time | timestamp | ||
shipping_time | timestamp | ||
pay_time | timestamp | ||
receive_time | timestamp | ||
order_status | string | ||
order_point | int | ||
invoice_title | string | ||
modified_time | timestamp |
fact_order_detail 表结构,存储位置:Kafka,存储格式:json
字段 | 类型 | 中文含义(和MySQL中相同) | 备注 |
---|---|---|---|
order_detail_id | int | ||
order_sn | string | ||
product_id | int | ||
product_name | string | ||
product_cnt | int | ||
product_price | double | ||
average_cost | double | ||
weight | double | ||
fee_money | double | ||
w_id | int | ||
create_time | timestamp | ||
modified_time | timestamp |
子任务1分析
这个任务我们可以归纳如下:
-
编写一个Flink流处理程序,既将Kafka当作数据源,也将Kafka作为Data Sink;
-
需要同时写多个Kafka Topic。
提醒大家注意的是,因为出题方并未给出实时数据生成器发送到Kafka 中topic 为ods_mall_data 主题的数据格式,但是却指定下游的kafka 的fact_order_master和fact_order_detail 主题数据格式为json,因此在正式比赛时,大家要灵活应变,先用Kafka自带的消费者脚本查看ods_mall_data主题中数据的格式,再相应进行解析。如果ods_mall_data主题中的数据格式就是json格式,则我们直接抽取相应的data部分发往下游即可(下面的示例实现中即假定是这样的);但是如果ods_mall_data主题中的数据格式就是json格式,则需要你先解析出各个字段,再构造为json格式发往下游(构造json格式字符串的方法可参考下一个子任务中的实现)。
子任务1实现
-
首先,在IntelliJ IDEA中创建一个Flink Maven项目,取名为Flink140Example。
参考教程:。
-
接下来,创建一个Flink流处理程序Task01Job.java,编辑代码如下:
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class Task01Job {
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
.setTopics("ods_mall_data")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest()) // 注意,比赛时应为 .latest()
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// kafka sink
Properties properties = new Properties();
properties.setProperty("transaction.timeout.ms", "7200000"); // 2 hours
// KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
.setKafkaProducerConfig(properties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopicSelector(new TopicSelector<String>() {
@Override
public String apply(String s) {
if(s.contains("order_detail_id")){
return "fact_order_detail";
}
else{
return "fact_order_master";
}
}
})
.setValueSerializationSchema(new SimpleStringSchema())
.build()
).build();
// 流处理管道
env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
// 只需要order_master和order_detail表中的数据
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return line.contains("fact_order_master") || line.contains("fact_order_detail");
}
})
// 提取data部分
.map(new MapFunction<String, String>() {
@Override
public String map(String line) throws Exception {
JsonObject jsonObj = JsonParser.parseString(line).getAsJsonObject().getAsJsonObject("data");
return jsonObj.toString();
}
})
// .print();
.sinkTo(kafkaSink);
// execute program
env.execute("Flink Streaming Task01");
}
}
使用mvn clean package命令(或任何其他自己熟悉的方式)将项目打包。
执行步骤
请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。
1)创建Kafka主题Topic:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic fact_order_master
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic fact_order_detail
2)查看主题:
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3)执行Kafka自带的消费者脚本,监听fact_order_master Topic:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic fact_order_master --from-beginning
4)执行Kafka自带的消费者脚本,监听fact_order_detail Topic:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic fact_order_detail --from-beginning
5)将Flink流处理jar包部署到YARN集群上运行:
# cd /opt/module/flink-1.14.0
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task01Job xxx.jar
请将上述命令中的xxx.jar替换为你部署的jar包名称。
Flink on Yarn Per-Job提交命令格式如下:
./bin/flink run \
# 指定yarn的Per-job模式,-t等价于-Dexecution.target
-t yarn-per-job \
# yarn应用的自定义name
-Dyarn.application.name=consumerDemo \
# 未指定并行度时的默认并行度值, 该值默认为1
-Dparallelism.default=3 \
# JobManager进程的内存
-Djobmanager.memory.process.size=2048mb \
# TaskManager进程的内存
-Dtaskmanager.memory.process.size=2048mb \
# 每个TaskManager的slot数目, 最佳配比是和vCores保持一致
-Dtaskmanager.numberOfTaskSlots=2 \
# 防止日志中文乱码
-Denv.java.opts="-Dfile.encoding=UTF-8" \
# 支持火焰图, Flink1.13新特性, 默认为false, 开发和测试环境可以开启, 生产环境建议关闭
-Drest.flamegraph.enabled=true \
# 入口类
-c xxxx.MainClass \
# 提交Job的jar包
xxxx.jar
如果要退出任务,使用如下命令格式:
./bin/flink cancel \
-t yarn-per-job \
# Yarn的ApplicationID值, 可以通过Yarn的webUI直接查看
-Dyarn.application.id=${YarnApplicationID} \
# Flink的JobID, 可以通过Yarn找到Flink的WebUI进行查看
${FlinkJobID}
也可以通过以下命令查看:
./bin/flink list \
-t yarn-per-job \
-Dyarn.application.id=${yarnApplicationID}
6)查看两个消费者脚本运行的窗口,观察各自的输出内容。
子任务2
子任务2描述
2、使用Flink 消费Kafka 中topic 为ods_mall_log 的数据,根据数据中不同的表前缀区分,过滤出product_browse 的数据,将数据分别分发至kafka的DWD 层log_product_browse 的Topic 中,其分区数为2,其他的表则无需处理。使用Kafka 自带的消费者消费log_product_browse(Topic)的前1条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下。
log_product_browse 表结构,存储位置:Kafka,存储格式:json
字段 | 类型 | 中文含义(和MySQL中相同) | 备注 |
---|---|---|---|
log_id | long | 自增长id | 可以使用随机数(0-9)+MMddHHmmssSSS 代替 |
product_id | string | ||
customer_id | int | ||
gen_order | int | ||
order_sn | string | ||
modified_time | timestamp |
子任务2分析
与子任务1类似,编写Flink流处理程序,既将Kafka作为数据源,也将Kafka作为Data Sink。整个数据流为:实时数据生成器-> flume-> Kafka ods_mall_log主题-> Flink程序 -> Kafka log_product_browse主题。
这个任务描述,挖了一个不大不小的坑(不是技术上的坑,是供应商公司设备环境的坑)。因为它指明写入到Kafka log_product_browse主题的数据存储格式是json,但就是不告诉你从Kafka ods_mall_log主题中获取到的数据格式是什么,也不告诉你数据生成器生成的数据格式。所以大家对技术要活学活用,在比赛时先使用Kafka自带的消费者脚本获取ods_mall_log主题中的数据,查看其格式,然后进行相应的数据解析,并转换为JSON格式的字符串发往下游的Kafka log_product_browse主题。
假设实时数据生成器发出的日志类数据格式如下:
数据发送开始:product_browse:(2730|18456|1|2023042739287771|'20230419230057');
customer_balance_Log:(8248|1|2023042744627979|'20230423031057'|9626.24);
product_browse:(5438|11696|0|0|'20230419193256');
product_browse:(6045|16691|1|2023042737193353|'20230419103857");
customer_point_Log:(16691|0|0|10|'20230421100256');
product_browse:(9378|11940|1|2023042721662101|'20230419225958');
customer_login_log:(8248|'20230421045357'|'40.253.78.255'|0);
product_browse:(6544|8248|1|2023042744627979|'20230419233957');
product_browse:(13916|11696|0|0|'20230419174756');
product_browse:(6085|18456|1|2023042739287771|'20230419200157');
product_browse:(2726|8248|0|0|'20230420014757');
customer_login_log:(18456|'20230421055957'|'199.209.123.4'|0);
product_browse:(7692|8248|1|2023042744627979|'20230420030757");
customer_balance_log:(16691|1|2023042737193353|'20230423025657'|5044.44);
product_browse:(6684|11696|1|2023042785336981|'20230419231056');
product_browse:(2923|11696|0|0|'20230420002956');
product_browse:(4753|18456|1|2023042739287771|'20230419221957');
product_browse:(3235|8248|1|2023042744627979|'20230420014757');
product_browse:(8008|11696|1|2023042785336981|'20230420010456');
customer_point_log:(16691|0|2023042737193353|504|'20230423025657");
product_browse:(14426|16691|1|2023042737193353|'20230419235457');
观察以上数据,(对比mysql中的product_browse表)可以发现:
-
不包含log_id字段。
-
modified_time字段变成了字符串’yyyyMMddHHmmss’。
-
各字段的含义(对应的字段)如下:product_id|customer_id|gen_order|order_sn|modified_time。
我们假设这些数据直接发往了Kafka ods_mall_log主题,则我们需要从该主题中获取这些格式的数据,并过滤出其中表前缀为product_browse的数据,然后将数据分别分发至kafka的log_product_browse主题中。
子任务2实现
-
另创建一个Flink流处理程序Task02Job.java,编辑代码如下:
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import java.util.Properties;
public class Task02Job {
// 实体类
static class ProductBrowse{
String log_id;
String product_id;
long customer_id;
long gen_order;
String order_sn;
String modified_time;
ProductBrowse(){}
public ProductBrowse(String log_id, String product_id, long customer_id, long gen_order, String order_sn, String modified_time) {
this.log_id = log_id;
this.product_id = product_id;
this.customer_id = customer_id;
this.gen_order = gen_order;
this.order_sn = order_sn;
this.modified_time = modified_time;
}
}
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 源和sink名称
String source_topic = "ods_mall_log";
String sink_topic = "log_product_browse";
// kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
.setTopics(source_topic)
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// kafka sink
Properties properties = new Properties();
properties.setProperty("transaction.timeout.ms", "7200000"); // 2 hours
// KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
.setKafkaProducerConfig(properties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sink_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
).build();
// 流处理管道
env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.<String>noWatermarks(), "Kafka Source")
// 只分发product_browse日志数据
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return line.contains("product_browse");
}
})
/* 解析数据行,构造为json格式发往下游:
上游数据:product_browse:(2730|18456|1|2023042739287771|'20230419230057')
各字段含义: product_id|customer_id|gen_order|order_sn|modified_time
下游数据:log_id long 自增长id 可以使用随机数(0-9)+MMddHHmmssSSS 代替
product_id string
customer_id int
gen_order int
order_sn string
modified_time timestamp
这个题的log_id有点问题,如果log_id的字段是long类型,那么随机数0怎么办?
所以这里我们假设log_id是String类型
*/
.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
// 解析数据
String data = value.split(":")[1]; // (2730|18456|1|2023042739287771|'20230419230057')
String content = data.substring(1,data.length()-1); // 2730|18456|1|2023042739287771|'20230419230057'
String[] arr = content.split("\\|");
// 构造log_id
SimpleDateFormat formatter= new SimpleDateFormat("MMddHHmmssSSS");
String log_id = (new Random()).nextInt(10) + "+" + formatter.format(new Date(System.currentTimeMillis()));
// 构造json字符串
TestGson.ProductBrowse productBrowse = new TestGson.ProductBrowse(log_id,arr[0],Long.parseLong(arr[1]),Long.parseLong(arr[2]),arr[3],arr[4].substring(1, arr[4].length()-1));
Gson gson = new Gson();
// System.out.println(gson.toJson(productBrowse));
return gson.toJson(productBrowse);
}
})
// .print();
.sinkTo(kafkaSink);
// execute program
env.execute("Flink Streaming Task02");
}
}
使用mvn clean package命令(或任何自己熟悉的方式)将项目打包。
执行步骤
请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。
1)创建Kafka主题Topic:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic log_product_browse
2)查看主题:
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3)执行Kafka自带的消费者脚本,监听log_product_browse Topic:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic log_product_browse --from-beginning
4)将Flink流处理jar包部署到YARN集群上运行:
# cd /opt/module/flink-1.14.0
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task02Job xxx.jar
请将上述命令中的xxx.jar替换为你部署的jar包名称。
5)查看消费者脚本运行的窗口,观察输出内容。
子任务3
子任务3描述
3、在任务1 、2 进行的同时, 需要将order_master 、order_detail 、product_browse 备份至HBase 中(若Int 类型长度不够,可使用BigInt 或Long 类型代替),使用HBase Shell 查看ods:order_master 表的任意2 条数据,查看字段为row_key 与shipping_user、查看ods:order_detail 表的任意2 条数据, 查看字段为row_key 与product_name 、查看ods:product_browse 表的任意2 条数据,查看字段为row_key 与order_sn。 将结果分别截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(截图中不能有乱码)。
三个HBase 中的数据结构描述如下。
ods:order_master 数据结构如下:
字段 | 类型 | 中文含义(和MySQL中相同) | 备注 |
---|---|---|---|
rowkey | string | rowkey | 可以使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式)代替 |
Info | 列族名 | ||
order_id | int | ||
order_sn | string | ||
customer_id | int | ||
shipping_user | string | ||
province | string | ||
city | string | ||
address | string | ||
order_source | int | ||
payment_method | int | ||
order_money | double | ||
district_money | double | ||
shipping_money | double | ||
payment_money | double | ||
shipping_comp_name | string | ||
shipping_sn | string | ||
create_time | string | ||
shipping_time | string | ||
pay_time | string | ||
receive_time | string | ||
order_status | string | ||
order_point | int | ||
invoice_title | string | ||
modified_time | string |
ods:order_detail 数据结构如下:
字段 | 类型 | 中文含义(和MySQL中相同) | 备注 |
---|---|---|---|
rowkey | string | rowkey | 可以使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式)代替 |
Info | 列族名 | ||
order_detail_id | int | ||
order_sn | string | ||
product_id | int | ||
product_name | string | ||
product_cnt | int | ||
product_price | double | ||
average_cost | double | ||
weight | double | ||
fee_money | double | ||
w_id | int | ||
create_time | string | ||
modified_time | string |
ods:product_browse 数据结构如下:
字段 | 类型 | 中文含义(和MySQL中相同) | 备注 |
---|---|---|---|
rowkey | string | rowkey | 该字段使用logid 进行拆分,将log_id 拆分为随机数和MMddHHmmssSSS 两块,在其中插入yyyy(date 的格式) 最终格式为:随机数(0-9)+yyyy+MMddHHmmssSSS |
Info | 列族名 | ||
log_id | int | 该字段缺失,使用随机数(0-9)+MMddHHmmssSSS(date 的格式) | |
order_sn | string | ||
product_id | string | ||
customer_id | string | ||
gen_order | int | ||
modified_time | double |
注意,ods:product_browse 表的数据结构有以下几个问题:
(1)字段顺序与原始数据顺序不一致。
(2)多个字段的数据类型与原始数据类型不一致,如log_id,customer_id,modified_time(特别不理解,怎么就变成double类型的了呢?)等。
(3)无法理解备注中rowkey和log_id这两个字段间的关系。一方面说log_id字段缺失,要求使用随机数(0-9)+ MMddHHmmssSSS(date 的格式)– 个人理解为填充,但数据类型又规定是int;另一方面,rowkey字段使用logid进行拆分,吧啦吧啦 — 实在无法理解出题人表达的是什么意思。
子任务3分析
根据题意,编写Flink程序:
-
-
读取Kafka fact_order_master主题,写入HBase的ods:order_master表中;
-
-
-
读取Kafka fact_order_detail主题,写入HBase的ods:order_detail表中;
-
-
-
读取Kafka log_product_browse主题,写入HBase的ods:product_browse表中。
-
注意,HBase中使用了命名空间ods,三个hbase表都位于该ods表空间中。
要完成这样的表写入,使用Flink Table API更加简单和优雅 。因此本任务我们使用Flink Table API/Flink SQL来实现。
子任务3实现
1)首先,在HBase中,创建命名空间ods(HBase有两个预定义命名空间: hbase和default)。打开hbase shell,执行语句如下:
hbase> create_namespace 'ods'
其他与命名空间有关的操作命令如下:
-
查看命名空间ods: describe_namespace ‘ods’
-
查看当前有哪命名空间 list_namespace
-
显示指定命名空间中所有的表 list_namespace_tables ‘ods’
-
在ods命名空间中创建tb1表 create ‘ods:tb1’, ‘cf1’
-
删除namespace(只有空的namespace才可以删除) drop_namespace ‘ods’
2)然后,在HBase中,创建要写入的三张表。在hbase shell中,执行语句如下:
hbase> create 'ods:order_master','info'
hbase> create 'ods:order_detail','info'
hbase> create 'ods:product_browse','info'
-
另创建一个Flink流处理程序Task03Job_1.java,编辑代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.*;
public class Task03Job_1 {
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 表执行环境
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// 创建源表
String source_kafka_table_create =
"CREATE TEMPORARY TABLE order_master_source ( " +
" order_id BIGINT," +
" order_sn STRING," +
" customer_id BIGINT," +
" shipping_user STRING," +
" province STRING," +
" city STRING," +
" address STRING," +
" order_source INTEGER," +
" payment_method INTEGER," +
" order_money DOUBLE," +
" district_money DOUBLE," +
" shipping_money DOUBLE," +
" payment_money DOUBLE," +
" shipping_comp_name STRING," +
" shipping_sn STRING," +
" create_time STRING," +
" shipping_time STRING," +
" pay_time STRING," +
" receive_time STRING," +
" order_status STRING," +
" order_point INTEGER," +
" invoice_title STRING," +
" modified_time STRING" +
") WITH ('connector'='kafka'," +
" 'topic'='fact_order_master'," +
" 'properties.bootstrap.servers'='192.168.190.139:9092'," +
" 'properties.group.id'='group-test'," +
" 'format'='json'," +
" 'scan.startup.mode'='latest-offset'" +
")";
tableEnvironment.executeSql("DROP TABLE IF EXISTS order_master_source");
tableEnvironment.executeSql(source_kafka_table_create);
// 创建Sink表
String sink_hbase_table_create =
"CREATE TEMPORARY TABLE order_master_sink (" +
" row_key STRING," +
" info ROW<order_id BIGINT," +
" order_sn STRING," +
" customer_id BIGINT," +
" shipping_user STRING," +
" province STRING," +
" city STRING," +
" address STRING," +
" order_source INTEGER," +
" payment_method INTEGER," +
" order_money DOUBLE," +
" district_money DOUBLE," +
" shipping_money DOUBLE," +
" payment_money DOUBLE," +
" shipping_comp_name STRING," +
" shipping_sn STRING," +
" create_time STRING," +
" shipping_time STRING," +
" pay_time STRING," +
" receive_time STRING," +
" order_status STRING," +
" order_point INTEGER," +
" invoice_title STRING," +
" modified_time STRING>," +
" PRIMARY KEY (row_key) NOT ENFORCED" +
") WITH ('connector' = 'hbase-2.2'," +
" 'table-name' = 'ods:order_master'," +
" 'zookeeper.quorum' = '192.168.190.139:2181'" +
" )";
tableEnvironment.executeSql("DROP TABLE IF EXISTS order_master_sink");
tableEnvironment.executeSql(sink_hbase_table_create);
// ETL管道
tableEnvironment
// 加载源表到Table中
.from("order_master_source")
// 增加row_key列(rowkey使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式))
.select(concat(randInteger(10).cast(DataTypes.STRING()),"+", dateFormat(currentTimestamp(),"yyyyMMddHHmmssSSS")).as("row_key"),
row($("order_id"),$("order_sn"),$("customer_id"),$("shipping_user")
,$("province"),$("city"),$("address"), $("order_source")
,$("payment_method"),$("order_money"),$("district_money")
,$("shipping_money"),$("payment_money"),$("shipping_comp_name")
,$("shipping_sn"),$("create_time"),$("shipping_time")
,$("pay_time"),$("receive_time"),$("order_status")
,$("order_point"),$("invoice_title"),$("modified_time")).as("info"))
// 将表写入HBase表
.executeInsert("order_master_sink");
// TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
// env.execute("Flink Streaming Task03")
}
}
-
另创建一个Flink流处理程序Task03Job_2.java,编辑代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.*;
public class Task03Job_2 {
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 表执行环境
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// 创建源表
String source_kafka_table_create =
"CREATE TEMPORARY TABLE order_detail_source ( " +
" order_detail_id BIGINT," +
" order_sn STRING," +
" product_id BIGINT," +
" product_name STRING," +
" product_cnt INTEGER," +
" product_price DOUBLE," +
" average_cost DOUBLE," +
" weight DOUBLE," +
" fee_money DOUBLE," +
" w_id INTEGER," +
" create_time STRING," +
" modified_time STRING" +
") WITH ('connector'='kafka'," +
" 'topic'='fact_order_detail'," +
" 'properties.bootstrap.servers'='192.168.190.139:9092'," +
" 'properties.group.id'='group-test'," +
" 'format'='json'," +
" 'scan.startup.mode'='latest-offset'" +
")";
tableEnvironment.executeSql("DROP TABLE IF EXISTS order_detail_source");
tableEnvironment.executeSql(source_kafka_table_create);
// 创建Sink表
String sink_hbase_table_create =
"CREATE TEMPORARY TABLE order_detail_sink (" +
" row_key STRING," +
" info ROW<order_detail_id BIGINT," +
" order_sn STRING," +
" product_id BIGINT," +
" product_name STRING," +
" product_cnt INTEGER," +
" product_price DOUBLE," +
" average_cost DOUBLE," +
" weight DOUBLE," +
" fee_money DOUBLE," +
" w_id INTEGER," +
" create_time STRING," +
" modified_time STRING>," +
" PRIMARY KEY (row_key) NOT ENFORCED" +
") WITH ('connector' = 'hbase-2.2'," +
" 'table-name' = 'ods:order_detail'," +
" 'zookeeper.quorum' = '192.168.190.139:2181'" +
" )";
tableEnvironment.executeSql("DROP TABLE IF EXISTS order_detail_sink");
tableEnvironment.executeSql(sink_hbase_table_create);
// ETL管道
tableEnvironment
// 加载源表到Table中
.from("order_detail_source")
// 增加row_key列(rowkey使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式))
.select(concat(randInteger(10).cast(DataTypes.STRING()),"+", dateFormat(currentTimestamp(),"yyyyMMddHHmmssSSS")).as("row_key"),
row($("order_detail_id"),$("order_sn"),$("product_id")
,$("product_name"),$("product_cnt"),$("product_price")
,$("average_cost"),$("weight"),$("fee_money")
,$("w_id"),$("create_time"),$("modified_time")).as("info"))
// 将表写入HBase表
.executeInsert("order_detail_sink");
// TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
// env.execute("Flink Streaming Task03")
}
}
-
另创建一个Flink流处理程序Task03Job_3.java,编辑代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.*;
import org.apache.flink.table.api.*;
public class Task03Job_3 {
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 表执行环境
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// 创建源表
String source_kafka_table_create =
"CREATE TEMPORARY TABLE product_browse_source ( " +
" log_id BIGINT," +
" order_sn STRING," +
" product_id BIGINT," +
" customer_id STRING," +
" gen_order BIGINT" +
" modified_time STRING" +
") WITH ('connector'='kafka'," +
" 'topic'='log_product_browse'," +
" 'properties.bootstrap.servers'='192.168.190.139:9092'," +
" 'properties.group.id'='group-test'," +
" 'format'='json'," +
" 'scan.startup.mode'='latest-offset'" +
")";
tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_source");
tableEnvironment.executeSql(source_kafka_table_create);
// 创建Sink表
String sink_hbase_table_create =
"CREATE TEMPORARY TABLE product_browse_sink (" +
" row_key STRING," +
" info ROW<log_id BIGINT," +
" order_sn STRING," +
" product_id BIGINT," +
" customer_id STRING," +
" gen_order BIGINT" +
" modified_time STRING>," +
" PRIMARY KEY (row_key) NOT ENFORCED" +
") WITH ('connector' = 'hbase-2.2'," +
" 'table-name' = 'ods:product_browse'," +
" 'zookeeper.quorum' = '192.168.190.139:2181'" +
" )";
tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_sink");
tableEnvironment.executeSql(sink_hbase_table_create);
// ETL管道
tableEnvironment
// 加载源表到Table中
.from("product_browse_source")
// 增加row_key列(注意这里row_key的构造规则,它来自于对log_id字段的拆分,而log_id字段来自上一任务中的构造(用+连接随机数和日期))
// 备注中的+号到底是表意还是表形?出题人你多说明一下会S吗?
.select(concat(splitIndex($("log_id").toString(),"+",0),"+","2024","+",splitIndex($("log_id").toString(),"+",1)).as("row_key"),
row($("log_id"),$("order_sn"),$("product_id"),$("customer_id"),$("gen_order"), $("modified_time")).as("info"))
// 将表写入HBase表
.executeInsert("product_browse_sink");
// TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
// env.execute("Flink Streaming Task03")
}
}
执行步骤
请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。
将Flink流处理jar包部署到YARN集群上运行:
# cd /opt/module/flink-1.14.0
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task03Job_1 xxx.jar
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task03Job_2 xxx.jar
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task03Job_3 xxx.jar
请将上述命令中的xxx.jar替换为你部署的jar包名称。
查询执行结果
根据任务描述,需要使用HBase Shell :
(1) 查看ods:order_master 表的任意2 条数据,查看字段为row_key 与shipping_user。语句如下:
hbase> scan 'ods:order_master', {COLUMNS => 'info:shipping_user', LIMIT => 2, FORMATTER => 'toString'}
(2) 查看ods:order_detail 表的任意2 条数据, 查看字段为row_key 与product_name。语句如下:
hbase> scan 'ods:order_detail', {COLUMNS => 'info:product_name', LIMIT => 2, FORMATTER => 'toString'}
(3) 查看ods:product_browse 表的任意2 条数据,查看字段为row_key 与order_sn。语句如下:
hbase> scan 'ods:product_browse', {COLUMNS => 'info:order_sn', LIMIT => 2, FORMATTER => 'toString'}
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容