2024年重庆甘肃安徽等省职业院校技能大赛_大数据应用开发样题解析-模块C:实时数据处理-任务一:实时数据清洗

环境说明

Flink 任务在Yarn 上用per job 模式(即Job 分离模式,不采用Session 模式),方便Yarn 回收资源;

建议使用gson 解析json 数据。

任务描述

编写Java 工程代码,使用Flink 消费Kafka 中Topic 为ods_mall_log 和ods_mall_data 的数据并进行相应的数据统计计算(使用Processing Time)。

任务分析

注意任务描述中的两个关键词:一是”编写Java工程代码”,二是”使用Processing Time”(即使用处理时间)。

这里对出题方提出表扬,因为出题方终于听取了老师们意见,在本次的后续子任务描述中,不但纠正了原来一些明显的错误描述,而且应老师们的强烈要求,给出了比较详细的表结构(虽然仍然没有给出业务解释),这样参赛学员终于可以有一些数据线索可循。

本任务共有三个子任务组成:

  • 实时数据清洗子任务1

  • 实时数据清洗子任务2

  • 实时数据清洗子任务3

子任务1

子任务1描述

1、使用Flink 消费Kafka 中topic 为ods_mall_data 的数据,根据数据中不同的表将数据分别分发至kafka 的DWD 层的fact_order_master 、 fact_order_detail 的Topic 中(只获取data 的内容,具体的内容格式请自查,其分区数均为2),其他的表则无需处理。使用Kafka 自带的消费者消 费fact_order_master(Topic)的前1 条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下;

fact_order_master 表结构,存储位置:Kafka,存储格式:json

字段 类型 中文含义(和MySQL中相同) 备注
order_id int    
order_sn string    
customer_id int    
shipping_user string    
province string    
city string    
address string    
order_source int    
payment_method int    
order_money double    
district_money double    
shipping_money double    
payment_money double    
shipping_comp_name string    
shipping_sn string    
create_time timestamp    
shipping_time timestamp    
pay_time timestamp    
receive_time timestamp    
order_status string    
order_point int    
invoice_title string    
modified_time timestamp    

fact_order_detail 表结构,存储位置:Kafka,存储格式:json

字段 类型 中文含义(和MySQL中相同) 备注
order_detail_id int    
order_sn string    
product_id int    
product_name string    
product_cnt int    
product_price double    
average_cost double    
weight double    
fee_money double    
w_id int    
create_time timestamp    
modified_time timestamp    

子任务1分析

这个任务我们可以归纳如下:

  • 编写一个Flink流处理程序,既将Kafka当作数据源,也将Kafka作为Data Sink;

  • 需要同时写多个Kafka Topic。

提醒大家注意的是,因为出题方并未给出实时数据生成器发送到Kafka 中topic 为ods_mall_data 主题的数据格式,但是却指定下游的kafka 的fact_order_master和fact_order_detail 主题数据格式为json,因此在正式比赛时,大家要灵活应变,先用Kafka自带的消费者脚本查看ods_mall_data主题中数据的格式,再相应进行解析。如果ods_mall_data主题中的数据格式就是json格式,则我们直接抽取相应的data部分发往下游即可(下面的示例实现中即假定是这样的);但是如果ods_mall_data主题中的数据格式就是json格式,则需要你先解析出各个字段,再构造为json格式发往下游(构造json格式字符串的方法可参考下一个子任务中的实现)。

子任务1实现

  1. 首先,在IntelliJ IDEA中创建一个Flink Maven项目,取名为Flink140Example。

参考教程:使用IntelliJ IDEA+Maven开发Flink项目

  1. 接下来,创建一个Flink流处理程序Task01Job.java,编辑代码如下:

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.TopicSelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class Task01Job {
  public static void main(String[] args) throws Exception {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      KafkaSource<String> source = KafkaSource.<String>builder()
              .setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
              .setTopics("ods_mall_data")
              .setGroupId("group-test")
              .setStartingOffsets(OffsetsInitializer.earliest()) // 注意,比赛时应为 .latest()
              .setValueOnlyDeserializer(new SimpleStringSchema())
              .build();

      // kafka sink
      Properties properties = new Properties();
      properties.setProperty("transaction.timeout.ms", "7200000"); // 2 hours

      // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
      KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
              .setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
              .setKafkaProducerConfig(properties)
              .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                      .setTopicSelector(new TopicSelector<String>() {
                          @Override
                          public String apply(String s) {
                              if(s.contains("order_detail_id")){
                                  return "fact_order_detail";
                              }
                              else{
                                  return "fact_order_master";
                              }
                          }
                      })
                      .setValueSerializationSchema(new SimpleStringSchema())
                      .build()
              ).build();

      // 流处理管道
      env
              // 指定Kafka数据源
              .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
              // 只需要order_master和order_detail表中的数据
              .filter(new FilterFunction<String>() {
                  @Override
                  public boolean filter(String line) throws Exception {
                      return line.contains("fact_order_master") || line.contains("fact_order_detail");
                  }
              })
              // 提取data部分
              .map(new MapFunction<String, String>() {
                  @Override
                  public String map(String line) throws Exception {
                      JsonObject jsonObj = JsonParser.parseString(line).getAsJsonObject().getAsJsonObject("data");
                      return jsonObj.toString();
                  }
              })
//               .print();
              .sinkTo(kafkaSink);

      // execute program
      env.execute("Flink Streaming Task01");
  }
}

使用mvn clean package命令(或任何其他自己熟悉的方式)将项目打包。

执行步骤

请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。

1)创建Kafka主题Topic:

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic fact_order_master

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic fact_order_detail

2)查看主题:

# ./bin/kafka-topics.sh --list --zookeeper localhost:2181

3)执行Kafka自带的消费者脚本,监听fact_order_master Topic:

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic fact_order_master --from-beginning

4)执行Kafka自带的消费者脚本,监听fact_order_detail Topic:

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic fact_order_detail --from-beginning

5)将Flink流处理jar包部署到YARN集群上运行:

# cd /opt/module/flink-1.14.0

# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task01Job xxx.jar

请将上述命令中的xxx.jar替换为你部署的jar包名称。

Flink on Yarn Per-Job提交命令格式如下:

./bin/flink run \
  # 指定yarn的Per-job模式,-t等价于-Dexecution.target
  -t yarn-per-job \
  # yarn应用的自定义name
  -Dyarn.application.name=consumerDemo \
  # 未指定并行度时的默认并行度值, 该值默认为1
  -Dparallelism.default=3 \
  # JobManager进程的内存
  -Djobmanager.memory.process.size=2048mb \
  # TaskManager进程的内存
  -Dtaskmanager.memory.process.size=2048mb \
  # 每个TaskManager的slot数目, 最佳配比是和vCores保持一致
  -Dtaskmanager.numberOfTaskSlots=2 \
  # 防止日志中文乱码
  -Denv.java.opts="-Dfile.encoding=UTF-8" \
  # 支持火焰图, Flink1.13新特性, 默认为false, 开发和测试环境可以开启, 生产环境建议关闭
  -Drest.flamegraph.enabled=true \
  # 入口类
  -c xxxx.MainClass \
  # 提交Job的jar包
  xxxx.jar

如果要退出任务,使用如下命令格式:

./bin/flink cancel \
  -t yarn-per-job \
  # Yarn的ApplicationID值, 可以通过Yarn的webUI直接查看
  -Dyarn.application.id=${YarnApplicationID} \
  # Flink的JobID, 可以通过Yarn找到Flink的WebUI进行查看
  ${FlinkJobID}

也可以通过以下命令查看:

./bin/flink list \
  -t yarn-per-job \
  -Dyarn.application.id=${yarnApplicationID}

6)查看两个消费者脚本运行的窗口,观察各自的输出内容。

子任务2

子任务2描述

2、使用Flink 消费Kafka 中topic 为ods_mall_log 的数据,根据数据中不同的表前缀区分,过滤出product_browse 的数据,将数据分别分发至kafka的DWD 层log_product_browse 的Topic 中,其分区数为2,其他的表则无需处理。使用Kafka 自带的消费者消费log_product_browse(Topic)的前1条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下。

log_product_browse 表结构,存储位置:Kafka,存储格式:json

字段 类型 中文含义(和MySQL中相同) 备注
log_id long 自增长id 可以使用随机数(0-9)+MMddHHmmssSSS 代替
product_id string    
customer_id int    
gen_order int    
order_sn string    
modified_time timestamp    

子任务2分析

与子任务1类似,编写Flink流处理程序,既将Kafka作为数据源,也将Kafka作为Data Sink。整个数据流为:实时数据生成器-> flume-> Kafka ods_mall_log主题-> Flink程序 -> Kafka log_product_browse主题。

这个任务描述,挖了一个不大不小的坑(不是技术上的坑,是供应商公司设备环境的坑)。因为它指明写入到Kafka log_product_browse主题的数据存储格式是json,但就是不告诉你从Kafka ods_mall_log主题中获取到的数据格式是什么,也不告诉你数据生成器生成的数据格式。所以大家对技术要活学活用,在比赛时先使用Kafka自带的消费者脚本获取ods_mall_log主题中的数据,查看其格式,然后进行相应的数据解析,并转换为JSON格式的字符串发往下游的Kafka log_product_browse主题。

假设实时数据生成器发出的日志类数据格式如下:

   数据发送开始:product_browse:(2730|18456|1|2023042739287771|'20230419230057');
  customer_balance_Log:(8248|1|2023042744627979|'20230423031057'|9626.24);
  product_browse:(5438|11696|0|0|'20230419193256');
  product_browse:(6045|16691|1|2023042737193353|'20230419103857");
  customer_point_Log:(16691|0|0|10|'20230421100256');
  product_browse:(9378|11940|1|2023042721662101|'20230419225958');
  customer_login_log:(8248|'20230421045357'|'40.253.78.255'|0);
  product_browse:(6544|8248|1|2023042744627979|'20230419233957');
  product_browse:(13916|11696|0|0|'20230419174756');
  product_browse:(6085|18456|1|2023042739287771|'20230419200157');
  product_browse:(2726|8248|0|0|'20230420014757');
  customer_login_log:(18456|'20230421055957'|'199.209.123.4'|0);
  product_browse:(7692|8248|1|2023042744627979|'20230420030757");
  customer_balance_log:(16691|1|2023042737193353|'20230423025657'|5044.44);
  product_browse:(6684|11696|1|2023042785336981|'20230419231056');
  product_browse:(2923|11696|0|0|'20230420002956');
  product_browse:(4753|18456|1|2023042739287771|'20230419221957');
  product_browse:(3235|8248|1|2023042744627979|'20230420014757');
  product_browse:(8008|11696|1|2023042785336981|'20230420010456');
  customer_point_log:(16691|0|2023042737193353|504|'20230423025657");
  product_browse:(14426|16691|1|2023042737193353|'20230419235457');

观察以上数据,(对比mysql中的product_browse表)可以发现:

  • 不包含log_id字段。

  • modified_time字段变成了字符串’yyyyMMddHHmmss’。

  • 各字段的含义(对应的字段)如下:product_id|customer_id|gen_order|order_sn|modified_time。

我们假设这些数据直接发往了Kafka ods_mall_log主题,则我们需要从该主题中获取这些格式的数据,并过滤出其中表前缀为product_browse的数据,然后将数据分别分发至kafka的log_product_browse主题中。

子任务2实现

  1. 另创建一个Flink流处理程序Task02Job.java,编辑代码如下:

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

import java.util.Properties;

public class Task02Job {
  // 实体类
  static class ProductBrowse{
      String log_id;
      String product_id;
      long customer_id;
      long gen_order;
      String order_sn;
      String modified_time;

      ProductBrowse(){}

      public ProductBrowse(String log_id, String product_id, long customer_id, long gen_order, String order_sn, String modified_time) {
          this.log_id = log_id;
          this.product_id = product_id;
          this.customer_id = customer_id;
          this.gen_order = gen_order;
          this.order_sn = order_sn;
          this.modified_time = modified_time;
      }
  }

  public static void main(String[] args) throws Exception {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 源和sink名称
      String source_topic = "ods_mall_log";
      String sink_topic = "log_product_browse";

      // kafka source
      KafkaSource<String> source = KafkaSource.<String>builder()
              .setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
              .setTopics(source_topic)
              .setGroupId("group-test")
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setValueOnlyDeserializer(new SimpleStringSchema())
              .build();

      // kafka sink
      Properties properties = new Properties();
      properties.setProperty("transaction.timeout.ms", "7200000"); // 2 hours

      // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。
      KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
              .setBootstrapServers("192.168.190.139:9092") // 如果是集群,则为"host1:9092,host2:9092,host3:9092"
              .setKafkaProducerConfig(properties)
              .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                      .setTopic(sink_topic)
                      .setValueSerializationSchema(new SimpleStringSchema())
                      .build()
              ).build();

      // 流处理管道
      env
              // 指定Kafka数据源
              .fromSource(source, WatermarkStrategy.<String>noWatermarks(), "Kafka Source")
              // 只分发product_browse日志数据
              .filter(new FilterFunction<String>() {
                  @Override
                  public boolean filter(String line) throws Exception {
                      return line.contains("product_browse");
                  }
              })
              /* 解析数据行,构造为json格式发往下游:
              上游数据:product_browse:(2730|18456|1|2023042739287771|'20230419230057')
                      各字段含义: product_id|customer_id|gen_order|order_sn|modified_time
              下游数据:log_id long       自增长id 可以使用随机数(0-9)+MMddHHmmssSSS 代替
                      product_id         string
                      customer_id         int
                      gen_order           int
                      order_sn           string
                      modified_time       timestamp
                这个题的log_id有点问题,如果log_id的字段是long类型,那么随机数0怎么办?
                所以这里我们假设log_id是String类型
                */
              .map(new MapFunction() {
                  @Override
                  public String map(String value) throws Exception {
                      // 解析数据
                      String data = value.split(":")[1]; // (2730|18456|1|2023042739287771|'20230419230057')
                      String content = data.substring(1,data.length()-1); // 2730|18456|1|2023042739287771|'20230419230057'
                      String[] arr = content.split("\\|");

                      // 构造log_id
                      SimpleDateFormat formatter= new SimpleDateFormat("MMddHHmmssSSS");
                      String log_id = (new Random()).nextInt(10) + "+" + formatter.format(new Date(System.currentTimeMillis()));

                      // 构造json字符串
                      TestGson.ProductBrowse productBrowse = new TestGson.ProductBrowse(log_id,arr[0],Long.parseLong(arr[1]),Long.parseLong(arr[2]),arr[3],arr[4].substring(1, arr[4].length()-1));
                      Gson gson = new Gson();
                      // System.out.println(gson.toJson(productBrowse));
                      return gson.toJson(productBrowse);
                  }
              })
//               .print();
              .sinkTo(kafkaSink);

      // execute program
      env.execute("Flink Streaming Task02");
  }
}

使用mvn clean package命令(或任何自己熟悉的方式)将项目打包。

执行步骤

请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。

1)创建Kafka主题Topic:

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic log_product_browse

2)查看主题:

# ./bin/kafka-topics.sh --list --zookeeper localhost:2181

3)执行Kafka自带的消费者脚本,监听log_product_browse Topic:

# cd /opt/module/kafka_2.12-2.4.1/

# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic log_product_browse --from-beginning

4)将Flink流处理jar包部署到YARN集群上运行:

# cd /opt/module/flink-1.14.0

# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task02Job xxx.jar

请将上述命令中的xxx.jar替换为你部署的jar包名称。

5)查看消费者脚本运行的窗口,观察输出内容。

子任务3

子任务3描述

3、在任务1 、2 进行的同时, 需要将order_master 、order_detail 、product_browse 备份至HBase 中(若Int 类型长度不够,可使用BigInt 或Long 类型代替),使用HBase Shell 查看ods:order_master 表的任意2 条数据,查看字段为row_key 与shipping_user、查看ods:order_detail 表的任意2 条数据, 查看字段为row_key 与product_name 、查看ods:product_browse 表的任意2 条数据,查看字段为row_key 与order_sn。 将结果分别截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(截图中不能有乱码)。

三个HBase 中的数据结构描述如下。

ods:order_master 数据结构如下:

字段 类型 中文含义(和MySQL中相同) 备注
rowkey string rowkey 可以使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式)代替
Info   列族名  
order_id int    
order_sn string    
customer_id int    
shipping_user string    
province string    
city string    
address string    
order_source int    
payment_method int    
order_money double    
district_money double    
shipping_money double    
payment_money double    
shipping_comp_name string    
shipping_sn string    
create_time string    
shipping_time string    
pay_time string    
receive_time string    
order_status string    
order_point int    
invoice_title string    
modified_time string    

ods:order_detail 数据结构如下:

字段 类型 中文含义(和MySQL中相同) 备注
rowkey string rowkey 可以使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式)代替
Info   列族名  
order_detail_id int    
order_sn string    
product_id int    
product_name string    
product_cnt int    
product_price double    
average_cost double    
weight double    
fee_money double    
w_id int    
create_time string    
modified_time string    

ods:product_browse 数据结构如下:

字段 类型 中文含义(和MySQL中相同) 备注
rowkey string rowkey 该字段使用logid 进行拆分,将log_id 拆分为随机数和MMddHHmmssSSS 两块,在其中插入yyyy(date 的格式) 最终格式为:随机数(0-9)+yyyy+MMddHHmmssSSS
Info   列族名  
log_id int   该字段缺失,使用随机数(0-9)+MMddHHmmssSSS(date 的格式)
order_sn string    
product_id string    
customer_id string    
gen_order int    
modified_time double    

注意,ods:product_browse 表的数据结构有以下几个问题:

(1)字段顺序与原始数据顺序不一致。

(2)多个字段的数据类型与原始数据类型不一致,如log_id,customer_id,modified_time(特别不理解,怎么就变成double类型的了呢?)等。

(3)无法理解备注中rowkey和log_id这两个字段间的关系。一方面说log_id字段缺失,要求使用随机数(0-9)+ MMddHHmmssSSS(date 的格式)– 个人理解为填充,但数据类型又规定是int;另一方面,rowkey字段使用logid进行拆分,吧啦吧啦 — 实在无法理解出题人表达的是什么意思。

子任务3分析

根据题意,编写Flink程序:

    1. 读取Kafka fact_order_master主题,写入HBase的ods:order_master表中;

    1. 读取Kafka fact_order_detail主题,写入HBase的ods:order_detail表中;

    1. 读取Kafka log_product_browse主题,写入HBase的ods:product_browse表中。

注意,HBase中使用了命名空间ods,三个hbase表都位于该ods表空间中。

要完成这样的表写入,使用Flink Table API更加简单和优雅 。因此本任务我们使用Flink Table API/Flink SQL来实现。

子任务3实现

1)首先,在HBase中,创建命名空间ods(HBase有两个预定义命名空间: hbase和default)。打开hbase shell,执行语句如下:

hbase> create_namespace 'ods'

其他与命名空间有关的操作命令如下:

  • 查看命名空间ods: describe_namespace ‘ods’

  • 查看当前有哪命名空间 list_namespace

  • 显示指定命名空间中所有的表 list_namespace_tables ‘ods’

  • 在ods命名空间中创建tb1表 create ‘ods:tb1’, ‘cf1’

  • 删除namespace(只有空的namespace才可以删除) drop_namespace ‘ods’

2)然后,在HBase中,创建要写入的三张表。在hbase shell中,执行语句如下:

hbase> create 'ods:order_master','info'
hbase> create 'ods:order_detail','info'
hbase> create 'ods:product_browse','info'
  1. 另创建一个Flink流处理程序Task03Job_1.java,编辑代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.*;

public class Task03Job_1 {
  public static void main(String[] args) throws Exception {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 表执行环境
      final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

      // 创建源表
      String source_kafka_table_create =
              "CREATE TEMPORARY TABLE order_master_source ( " +
                      "   order_id BIGINT," +
                      "   order_sn STRING," +
                      "   customer_id BIGINT," +
                      "   shipping_user STRING," +
                      "   province STRING," +
                      "   city STRING," +
                      "   address STRING," +
                      "   order_source INTEGER," +
                      "   payment_method INTEGER," +
                      "   order_money DOUBLE," +
                      "   district_money DOUBLE," +
                      "   shipping_money DOUBLE," +
                      "   payment_money DOUBLE," +
                      "   shipping_comp_name STRING," +
                      "   shipping_sn STRING," +
                      "   create_time STRING," +
                      "   shipping_time STRING," +
                      "   pay_time STRING," +
                      "   receive_time STRING," +
                      "   order_status STRING," +
                      "   order_point INTEGER," +
                      "   invoice_title STRING," +
                      "   modified_time STRING" +
                      ") WITH ('connector'='kafka'," +
                      "       'topic'='fact_order_master'," +
                      "       'properties.bootstrap.servers'='192.168.190.139:9092'," +
                      "       'properties.group.id'='group-test'," +
                      "       'format'='json'," +
                      "       'scan.startup.mode'='latest-offset'" +
                      ")";

      tableEnvironment.executeSql("DROP TABLE IF EXISTS order_master_source");
      tableEnvironment.executeSql(source_kafka_table_create);

      // 创建Sink表
      String sink_hbase_table_create =
              "CREATE TEMPORARY TABLE order_master_sink (" +
                      "   row_key STRING," +
                      "   info ROW<order_id BIGINT," +
                      "   order_sn STRING," +
                      "   customer_id BIGINT," +
                      "   shipping_user STRING," +
                      "   province STRING," +
                      "   city STRING," +
                      "   address STRING," +
                      "   order_source INTEGER," +
                      "   payment_method INTEGER," +
                      "   order_money DOUBLE," +
                      "   district_money DOUBLE," +
                      "   shipping_money DOUBLE," +
                      "   payment_money DOUBLE," +
                      "   shipping_comp_name STRING," +
                      "   shipping_sn STRING," +
                      "   create_time STRING," +
                      "   shipping_time STRING," +
                      "   pay_time STRING," +
                      "   receive_time STRING," +
                      "   order_status STRING," +
                      "   order_point INTEGER," +
                      "   invoice_title STRING," +
                      "   modified_time STRING>," +
                      "   PRIMARY KEY (row_key) NOT ENFORCED" +
                      ") WITH ('connector' = 'hbase-2.2'," +
                      "       'table-name' = 'ods:order_master'," +
                      "       'zookeeper.quorum' = '192.168.190.139:2181'" +
                      " )";
      tableEnvironment.executeSql("DROP TABLE IF EXISTS order_master_sink");
      tableEnvironment.executeSql(sink_hbase_table_create);

      // ETL管道
      tableEnvironment
              // 加载源表到Table中
              .from("order_master_source")
              // 增加row_key列(rowkey使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式))
              .select(concat(randInteger(10).cast(DataTypes.STRING()),"+", dateFormat(currentTimestamp(),"yyyyMMddHHmmssSSS")).as("row_key"),
                      row($("order_id"),$("order_sn"),$("customer_id"),$("shipping_user")
                              ,$("province"),$("city"),$("address"), $("order_source")
                              ,$("payment_method"),$("order_money"),$("district_money")
                              ,$("shipping_money"),$("payment_money"),$("shipping_comp_name")
                              ,$("shipping_sn"),$("create_time"),$("shipping_time")
                              ,$("pay_time"),$("receive_time"),$("order_status")
                              ,$("order_point"),$("invoice_title"),$("modified_time")).as("info"))
              // 将表写入HBase表
              .executeInsert("order_master_sink");

      // TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
      // env.execute("Flink Streaming Task03")
  }
}
  1. 另创建一个Flink流处理程序Task03Job_2.java,编辑代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.*;

public class Task03Job_2 {
  public static void main(String[] args) throws Exception {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 表执行环境
      final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

      // 创建源表
      String source_kafka_table_create =
              "CREATE TEMPORARY TABLE order_detail_source ( " +
                      "   order_detail_id BIGINT," +
                      "   order_sn STRING," +
                      "   product_id BIGINT," +
                      "   product_name STRING," +
                      "   product_cnt INTEGER," +
                      "   product_price DOUBLE," +
                      "   average_cost DOUBLE," +
                      "   weight DOUBLE," +
                      "   fee_money DOUBLE," +
                      "   w_id INTEGER," +
                      "   create_time STRING," +
                      "   modified_time STRING" +
                      ") WITH ('connector'='kafka'," +
                      "       'topic'='fact_order_detail'," +
                      "       'properties.bootstrap.servers'='192.168.190.139:9092'," +
                      "       'properties.group.id'='group-test'," +
                      "       'format'='json'," +
                      "       'scan.startup.mode'='latest-offset'" +
                      ")";

      tableEnvironment.executeSql("DROP TABLE IF EXISTS order_detail_source");
      tableEnvironment.executeSql(source_kafka_table_create);

      // 创建Sink表
      String sink_hbase_table_create =
              "CREATE TEMPORARY TABLE order_detail_sink (" +
                      "   row_key STRING," +
                      "   info ROW<order_detail_id BIGINT," +
                      "   order_sn STRING," +
                      "   product_id BIGINT," +
                      "   product_name STRING," +
                      "   product_cnt INTEGER," +
                      "   product_price DOUBLE," +
                      "   average_cost DOUBLE," +
                      "   weight DOUBLE," +
                      "   fee_money DOUBLE," +
                      "   w_id INTEGER," +
                      "   create_time STRING," +
                      "   modified_time STRING>," +
                      "   PRIMARY KEY (row_key) NOT ENFORCED" +
                      ") WITH ('connector' = 'hbase-2.2'," +
                      "       'table-name' = 'ods:order_detail'," +
                      "       'zookeeper.quorum' = '192.168.190.139:2181'" +
                      " )";
      tableEnvironment.executeSql("DROP TABLE IF EXISTS order_detail_sink");
      tableEnvironment.executeSql(sink_hbase_table_create);

      // ETL管道
      tableEnvironment
              // 加载源表到Table中
              .from("order_detail_source")
              // 增加row_key列(rowkey使用随机数(0-9)+yyyyMMddHHmmssSSS(date 的格式))
              .select(concat(randInteger(10).cast(DataTypes.STRING()),"+", dateFormat(currentTimestamp(),"yyyyMMddHHmmssSSS")).as("row_key"),
                      row($("order_detail_id"),$("order_sn"),$("product_id")
                              ,$("product_name"),$("product_cnt"),$("product_price")
                              ,$("average_cost"),$("weight"),$("fee_money")
                              ,$("w_id"),$("create_time"),$("modified_time")).as("info"))
              // 将表写入HBase表
              .executeInsert("order_detail_sink");

      // TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
      // env.execute("Flink Streaming Task03")
  }
}
  1. 另创建一个Flink流处理程序Task03Job_3.java,编辑代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.*;
import org.apache.flink.table.api.*;

public class Task03Job_3 {
  public static void main(String[] args) throws Exception {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 表执行环境
      final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

      // 创建源表
      String source_kafka_table_create =
              "CREATE TEMPORARY TABLE product_browse_source ( " +
                      "   log_id BIGINT," +
                      "   order_sn STRING," +
                      "   product_id BIGINT," +
                      "   customer_id STRING," +
                      "   gen_order BIGINT" +
                      "   modified_time STRING" +
                      ") WITH ('connector'='kafka'," +
                      "       'topic'='log_product_browse'," +
                      "       'properties.bootstrap.servers'='192.168.190.139:9092'," +
                      "       'properties.group.id'='group-test'," +
                      "       'format'='json'," +
                      "       'scan.startup.mode'='latest-offset'" +
                      ")";

      tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_source");
      tableEnvironment.executeSql(source_kafka_table_create);

      // 创建Sink表
      String sink_hbase_table_create =
              "CREATE TEMPORARY TABLE product_browse_sink (" +
                      "   row_key STRING," +
                      "   info ROW<log_id BIGINT," +
                      "   order_sn STRING," +
                      "   product_id BIGINT," +
                      "   customer_id STRING," +
                      "   gen_order BIGINT" +
                      "   modified_time STRING>," +
                      "   PRIMARY KEY (row_key) NOT ENFORCED" +
                      ") WITH ('connector' = 'hbase-2.2'," +
                      "       'table-name' = 'ods:product_browse'," +
                      "       'zookeeper.quorum' = '192.168.190.139:2181'" +
                      " )";
      tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_sink");
      tableEnvironment.executeSql(sink_hbase_table_create);

      // ETL管道
      tableEnvironment
              // 加载源表到Table中
              .from("product_browse_source")
              // 增加row_key列(注意这里row_key的构造规则,它来自于对log_id字段的拆分,而log_id字段来自上一任务中的构造(用+连接随机数和日期))
              // 备注中的+号到底是表意还是表形?出题人你多说明一下会S吗?
              .select(concat(splitIndex($("log_id").toString(),"+",0),"+","2024","+",splitIndex($("log_id").toString(),"+",1)).as("row_key"),
                      row($("log_id"),$("order_sn"),$("product_id"),$("customer_id"),$("gen_order"), $("modified_time")).as("info"))
              // 将表写入HBase表
              .executeInsert("product_browse_sink");

      // TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
      // env.execute("Flink Streaming Task03")
  }
}

执行步骤

请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。

将Flink流处理jar包部署到YARN集群上运行:

# cd /opt/module/flink-1.14.0

# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task03Job_1 xxx.jar
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task03Job_2 xxx.jar
# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task03Job_3 xxx.jar

请将上述命令中的xxx.jar替换为你部署的jar包名称。

查询执行结果

根据任务描述,需要使用HBase Shell :

(1) 查看ods:order_master 表的任意2 条数据,查看字段为row_key 与shipping_user。语句如下:

hbase> scan 'ods:order_master', {COLUMNS => 'info:shipping_user', LIMIT => 2, FORMATTER => 'toString'}

(2) 查看ods:order_detail 表的任意2 条数据, 查看字段为row_key 与product_name。语句如下:

hbase> scan 'ods:order_detail', {COLUMNS => 'info:product_name', LIMIT => 2, FORMATTER => 'toString'}

(3) 查看ods:product_browse 表的任意2 条数据,查看字段为row_key 与order_sn。语句如下:

hbase> scan 'ods:product_browse', {COLUMNS => 'info:order_sn', LIMIT => 2, FORMATTER => 'toString'}
© 版权声明
THE END
喜欢就支持一下吧
点赞245赞赏 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容