本任务共有两个子任务组成:
子任务1
子任务1描述
1、在主节点使用Flume 采集实时数据生成器25001 端口的socket 数据(实时数据生成器脚本为主节点/data_log 目录下的gen_ds_data_to_socket 脚本,该脚本为Master 节点本地部署且使用socket 传输),将数据存入到Kafka的Topic 中(Topic 名称为ods_mall_log,分区数为2,ZK 关于Kafka 的信息在其/kafka 节点),使用Kafka 自带的消费者消费ods_mall_log(Topic)中的数据,查看Topic 中的前1 条数据的结果,将查看命令与结果完整的截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下;
注:需先启动已配置好的Flume再启动脚本,否则脚本将无法成功启动,启动方式为进入/data_log目录执行./gen_ds_data_to_socket (如果没有权限,请执行授权命令chmod 777 /data_log/gen_ds_data_to_socket)
分析: 这里需要对出题方提出表扬,因为本次任务描述中,纠正了以往该任务描述中随意而错误的表述语句,细化了一些环境说明,这说明出题方还是听进了老师们频繁反馈的意见,并做出了改进。 不过提醒大家注意的是,出题方这个实时数据生成器脚本似乎会对python版本产生依赖,有反映在运行赛方的这个脚本时会出现与python有关的错误信息,导致后续无法采集数据。
实现原理
。
技术参考2:
测试环境:本案例演示使用的PBCP-2023(个人大数据竞赛平台),内置了模拟实时数据生成脚本gen_ds_data_to_socket。
Flume配置
在$FLUME_HOME/conf/目录下,创建一个flume配置文件ss2024.conf,编辑内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 25001
#c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#配置kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.brokerList = xueai:9092
a1.sinks.k1.topic = ods_mall_log
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2
在上面的配置文件中,我们配置了一个netcat类型的源,跑在25001端口上。另外配置了通道(channel)c1,以及Sink为kafka的ods_mall_log主题。
执行实时数据采集
请按以下步骤执行。
1)启动zookeeper集群。在终端窗口中,执行如下命令,并保持运行:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/zookeeper-server-start.sh config/zookeeper.properties
-
启动Kafka集群。在另一个终端窗口中,执行如下命令,并保持运行:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-server-start.sh config/server.properties
-
创建Kafka主题ods_mall_log和ods_mall_data(下一任务使用,这里一并创建了),指定使用2个分区。在另一个终端窗口中,执行如下命令:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic ods_mall_log
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic ods_mall_data
然后使用下面的命令查看主题是否创建成功:
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
-
另开一个窗口,启动Kafka自带的消费者脚本,订阅order主题,以便观察Kafka接收到的数据。保持运行:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-console-consumer.sh --bootstrap-server xueai:9092 --topic ods_mall_log --from-beginning
-
启动flume-ng。切换到Flume的安装目录下,执行如下命令:
# cd /opt/module/flume-1.9.0/
# ./bin/flume-ng agent -n a1 -c ./conf -f ./conf/ss2024.conf -Dflume.root.logger=INFO,console
-
最后一步,运行数据生成器脚本,将生成的用户登录日志发往Socket 25001端口。
# cd /data_log
# ./gen_ds_data_to_socket
-
回到kafka消费者脚本运行窗口,如果正常的话,会看到收到的用户登录日志信息:
1,12132,2022-03-16 23:09:48,146.23.254.255,0
2,13449,2022-03-16 22:19:48,8.148.174.180,0
3,7203,2022-03-16 22:23:48,84.35.22.35,0
4,4639,2022-03-16 19:28:48,52.194.3.131,0
5,9023,2022-03-16 21:34:49,201.111.203.250,0
6,12663,2022-03-16 20:49:48,101.106.188.22,1
7,9097,2022-03-16 21:36:48,36.78.211.45,1
8,18577,2022-03-16 22:24:48,145.56.82.105,1
9,12345,2022-03-16 19:53:48,43.107.106.144,1
......
-
关闭时,从后往前依次关闭(Ctrl + C)。
子任务2
子任务2描述
2、实时脚本启动后,在主节点进入到maxwell-1.29.0 的解压后目录下(在/opt/module 下),配置相关文件并启动,读取主节点MySQL 数据的binlog日志(MySQL 的binlog 相关配置已完毕,只需要关注ds_realtime_db 数据库的表)到Kafka 的Topic 中(Topic 名称为ods_mall_data,分区数为2,ZK 关于Kafka 的信息在其/kafka 节点)。使用Kafka 自带的消费者消费ods_mall_data(Topic)中的数据,查看Topic 中的前1 条数据的结果,将查看命令与结果完整的截图粘贴至客户端桌面【Release\模块B 提交结果.docx】中对应的任务序号下。
子任务2分析
Maxwell 是一个能实时读取 MySQL 二进制日志 binlog ,并生成 JSON 格式的消息,作为生产者发送给 Kafka、 Kinesis、 RabbitMQ、 Redis、 Google CloudPub/Sub、文件或其它平台的应用程序。Maxwell。
配置Maxwell
打开Maxwell安装目录(/opt/module/maxwell-1.29.0)下的config.properties(没有的话,复制一份,去掉后缀名),编辑内容如下:
# mysql login info
host=localhost
user=root
password=admin
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
# *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
kafka.bootstrap.servers=xueai:9092
kafka_topic=ods_mall_data
在上配置了Maxwell登录MySQL的信息,以及将数据发到Kafka的相关信息。
注意:PBCP-2023只配置了对MySQL中的ds_realtime_db数据库开启binlog日志。如果要采集其他数据库的binlog日志,请自行配置开启。
执行步骤
1)执行Kafka自带的消费者脚本:
# cd /opt/module/kafka_2.12-2.4.1/
# ./bin/kafka-console-consumer.sh \
--bootstrap-server xueai:9092 \
--topic ods_mall_data \
--from-beginning
2)启动maxwell:
# cd /opt/module/maxwell-1.29.0
# ./bin/maxwell --config ./config.properties --daemon
3)登录mysql服务器,执行如下SQL语句(模拟对数据库的操作):
mysql> use ds_realtime_db;
mysql> insert into customer_login_log values(1,12132,"2022-03-16 23:09:48","146.23.254.255",0);
4)回到Kafka消费者脚本运行窗口,查看,会看到收到的消息如下:
{"database":"ds_realtime_db","table":"customer_login_log","type":"insert","ts":1669086387,"xid":617,"commit":true,"data":{"login_id":1,"customer_id":12132,"login_time":"2022-03-16 23:09:48","login_ip":"146.23.254.255","login_type":0}}
5)执行如下SQL语句(模拟对fact_order_master表的批量insert操作):
mysql> create database ds_realtime_db;
mysql> use ds_realtime_db;
mysql> create table fact_order_master as select * from order_master;
6)回到Kafka消费者脚本运行窗口,查看,会看到收到的消息(fact_order_master表的JSON数据)如下图所示:
fact_order_master表的JSON数据格式为:
{
"database":"ds_realtime_db",
"table":"fact_order_master",
"type":"insert",
"ts":1669108369,
"xid":4061,
"commit":true,
"data":{
"order_id":6097,
"order_sn":"2022111417659492",
"customer_id":583,
"shipping_user":"龚桂芳",
"province":"上海市",
"city":"上海市",
"address":"上海市上海市真光路12889755号3层",
"order_source":1,
"payment_method":3,
"order_money":4125.78,
"district_money":0.00,
"shipping_money":39.06,
"payment_money":4164.84,
"shipping_comp_name":"韵达",
"shipping_sn":"9846757521358",
"create_time":"20221110023759",
"shipping_time":"20221110132159",
"pay_time":"20221110032759",
"receive_time":"20221112203359",
"order_status":"已签收",
"order_point":416,
"invoice_title":"雨林木风计算机传媒有限公司",
"modified_time":"2022-11-12 12:33:59"
}
}
7)登录mysql服务器,执行如下SQL语句(模拟对fact_order_detail表的批量insert操作):
mysql> create table fact_order_detail as select * from order_detail;
8)回到Kafka消费者脚本运行窗口,查看,会看到收到的消息(fact_order_detail 表的JSON数据)如下图所示:
fact_order_detail表的JSON数据格式为:
{
"database":"ds_realtime_db",
"table":"fact_order_detail",
"type":"insert",
"ts":1669108716,
"xid":4840,
"xoffset":0,
"data":{
"order_detail_id":1,
"order_sn":"2022111496083548",
"product_id":5380,
"product_name":"无线5.0蓝牙耳机双耳入耳式迷你跑步运动",
"product_cnt":3,
"product_price":702.53,
"average_cost":0.00,
"weight":4.89577,
"fee_money":25.80,
"w_id":1291,
"create_time":"20221109153354",
"modified_time":"2022-11-10 04:55:54"
}
}
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容