假设有如下一个用户登录日志表:
+------+-------+-------------------+
|log_id|user_id| log_time|
+------+-------+-------------------+
| 1| 1000|2022-03-10 10:08:13|
| 2| 1000|2022-03-18 10:33:22|
| 3| 1000|2022-03-26 18:59:19|
| 4| 1001|2022-03-03 20:59:13|
| 5| 1001|2022-03-10 05:53:49|
| 6| 1001|2022-02-26 02:27:51|
| 7| 1002|2022-03-01 20:59:13|
| 8| 1002|2022-03-07 05:53:49|
| 9| 1002|2022-02-28 02:27:51|
| 10| 1003|2022-02-27 20:59:13|
| 11| 1003|2022-03-05 05:53:49|
| 12| 1003|2022-03-12 02:27:51|
| 13| 1004|2022-02-28 20:59:13|
| 14| 1004|2022-03-05 05:53:49|
| 15| 1004|2022-03-18 02:27:51|
| 16| 1005|2022-02-25 20:59:13|
| 17| 1005|2022-03-04 05:53:49|
| 18| 1005|2022-03-11 02:27:51|
+------+-------+-------------------+
要求:根据登录日志表求最近连续三周登录的用户数。
任务说明
求 login_time 字段值为 2022-03-10 的最近连续三周登录的用户数,最终使用Spark中的show()方法输出如下字段:
字段名 | 字段说明 | 备注 |
---|---|---|
end_date | 数据统计日期 | 2022-03-10 |
active_total | 活跃用户数 | |
date_range | 统计周期 | 格式:统计开始时间_结束时间 |
实现思路
分析给定的测试数据,可以看出,满足”最近三周连续登录”条件的有三个用户。如下图所示:
想要完成这个需求,我们得先拿到最近三周的开始时间和结束时间,然后筛选范围数据,最后利用相关函数计算其连续性。
(1)确定当前 2022-03-10 是周几,然后求得周日的日期(也就是这三周的最后一天)。
(2)拿到 2022-03-10 这周的周日时间后,获取两周前的开始日期(也就是这三周的第一天)。
(3)筛选范围,计算每位用户是否符合三周的连续性。
创建数据环境
为能动手实现此任务,我们首先在 MySQL 的 test 数据库中创建数据测试表 log_data:
create table if not exists log_data(
log_id varchar(200) comment '日志id',
user_id varchar(200) comment '用户id',
log_time datetime NULL DEFAULT NULL comment '登录时间'
);
然后执行以下SQL语句,插入测试数据:
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('1', '2022-03-10 10:08:13', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('2', '2022-03-18 10:33:22', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('3', '2022-03-26 18:59:19', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('4', '2022-03-03 20:59:13', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('5', '2022-03-10 05:53:49', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('6', '2022-02-26 02:27:51', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('7', '2022-03-01 20:59:13', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('8', '2022-03-07 05:53:49', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('9', '2022-02-28 02:27:51', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('10', '2022-02-27 20:59:13', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('11', '2022-03-05 05:53:49', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('12', '2022-03-12 02:27:51', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('13', '2022-02-28 20:59:13', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('14', '2022-03-05 05:53:49', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('15', '2022-03-18 02:27:51', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('16', '2022-02-25 20:59:13', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('17', '2022-03-04 05:53:49', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('18', '2022-03-11 02:27:51', '1005');
实现过程
请按以下步骤实现。
1)读取数据源
编写Spark代码,读取MySQL中的测试数据到DataFrame中。代码如下:
val userLoginInfo = spark.read.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","jdbc:mysql://localhost:3306/test")
.option("user","root")
.option("password","admin")
.option("dbtable","log_data")
.load()
userLoginInfo.printSchema
userLoginInfo.orderBy("user_id","log_time").show
执行以上代码,输出内容如下:
root
|-- log_id: string (nullable = true)
|-- user_id: string (nullable = true)
|-- log_time: timestamp (nullable = true)
+------+-------+-------------------+
|log_id|user_id| log_time|
+------+-------+-------------------+
| 1| 1000|2022-03-10 10:08:13|
| 2| 1000|2022-03-18 10:33:22|
| 3| 1000|2022-03-26 18:59:19|
| 6| 1001|2022-02-26 02:27:51|
| 4| 1001|2022-03-03 20:59:13|
| 5| 1001|2022-03-10 05:53:49|
| 9| 1002|2022-02-28 02:27:51|
| 7| 1002|2022-03-01 20:59:13|
| 8| 1002|2022-03-07 05:53:49|
| 10| 1003|2022-02-27 20:59:13|
| 11| 1003|2022-03-05 05:53:49|
| 12| 1003|2022-03-12 02:27:51|
| 13| 1004|2022-02-28 20:59:13|
| 14| 1004|2022-03-05 05:53:49|
| 15| 1004|2022-03-18 02:27:51|
| 16| 1005|2022-02-25 20:59:13|
| 17| 1005|2022-03-04 05:53:49|
| 18| 1005|2022-03-11 02:27:51|
+------+-------+-------------------+
2)使用临时表,获取2022-03-10 这周的周日时间。代码如下:
// 注册为临时表
userLoginInfo.createOrReplaceTempView("log_data")
val df1 = spark.sql(
"""
|select
| user_id,
| date(log_time) log_date,
| next_day("2022-03-10", "Sunday") date_end
|from
| log_data
|""".stripMargin)
df1.show
在上面的代码中,我们使用了函数next_day(),来获得指定日期后的第一个周日(Sunday)。请注意,如果登录日本身就是周日,则next_day()函数取值将是下一个周的周日。
执行以上代码,输出内容如下:
+-------+----------+----------+
|user_id| log_date| date_end|
+-------+----------+----------+
| 1000|2022-03-10|2022-03-13|
| 1000|2022-03-18|2022-03-13|
| 1000|2022-03-26|2022-03-13|
| 1001|2022-03-03|2022-03-13|
| 1001|2022-03-10|2022-03-13|
| 1001|2022-02-26|2022-03-13|
| 1002|2022-03-01|2022-03-13|
| 1002|2022-03-07|2022-03-13|
| 1002|2022-02-28|2022-03-13|
| 1003|2022-02-27|2022-03-13|
| 1003|2022-03-05|2022-03-13|
| 1003|2022-03-12|2022-03-13|
| 1004|2022-02-28|2022-03-13|
| 1004|2022-03-05|2022-03-13|
| 1004|2022-03-18|2022-03-13|
| 1005|2022-02-25|2022-03-13|
| 1005|2022-03-04|2022-03-13|
| 1005|2022-03-11|2022-03-13|
+-------+----------+----------+
3)筛选出最近三周的数据。
获取两周前的起始日期,筛选符合要求的数据,并计算每个登录日期所属周的周日日期。代码如下:
// 注册为临时表
df1.createOrReplaceTempView("first_data")
val df2 = spark.sql(
"""
|select
| *
|from
| (select
| user_id,
| log_date,
| if(dayofweek(log_date)=1,log_date,next_day(log_date,"sunday")) log_date_week_end,
| date_sub(date_end,20) date_begin,
| date_end
| from
| first_data
| )t1
|where
| log_date <= date_end
| and
| log_date >= date_begin
|""".stripMargin)
df2.show
在上面的代码中,if(dayofweek(log_date)=1,log_date,next_day(log_date,”sunday”))的含义是:如果登录日本身就是周日,则取自身;否则取周日。
执行以上代码,输出内容如下:
+-------+----------+-----------------+----------+----------+
|user_id| log_date|log_date_week_end|date_begin| date_end|
+-------+----------+-----------------+----------+----------+
| 1000|2022-03-10| 2022-03-13|2022-02-21|2022-03-13|
| 1001|2022-03-03| 2022-03-06|2022-02-21|2022-03-13|
| 1001|2022-03-10| 2022-03-13|2022-02-21|2022-03-13|
| 1001|2022-02-26| 2022-02-27|2022-02-21|2022-03-13|
| 1002|2022-03-01| 2022-03-06|2022-02-21|2022-03-13|
| 1002|2022-03-07| 2022-03-13|2022-02-21|2022-03-13|
| 1002|2022-02-28| 2022-03-06|2022-02-21|2022-03-13|
| 1003|2022-02-27| 2022-02-27|2022-02-21|2022-03-13|
| 1003|2022-03-05| 2022-03-06|2022-02-21|2022-03-13|
| 1003|2022-03-12| 2022-03-13|2022-02-21|2022-03-13|
| 1004|2022-02-28| 2022-03-06|2022-02-21|2022-03-13|
| 1004|2022-03-05| 2022-03-06|2022-02-21|2022-03-13|
| 1005|2022-02-25| 2022-02-27|2022-02-21|2022-03-13|
| 1005|2022-03-04| 2022-03-06|2022-02-21|2022-03-13|
| 1005|2022-03-11| 2022-03-13|2022-02-21|2022-03-13|
+-------+----------+-----------------+----------+----------+
4)计算最近三周连续登录的用户数。
思路:
-
1)首先,按user_id和log_date_week_end去重,这样可以保证每个用户每周保留一条登录记录;
-
2)然后,按user_id分组统计:group by … count,得到每个用户最近三周的登录记录;
-
3)对结果进行过滤,如果每个用户最近三周的登录记录数等于3,则为连续登录用户数。
实现代码如下:
// 对df2按user_id和log_date_week_end去重
df2.dropDuplicates("user_id","log_date_week_end")
// 注册为临时表
.createOrReplaceTempView("second_data2")
/*
// 然后,按user_id分组统计:group by ... count,得到每个用户最近三周的登录记录
spark.sql(
"""
|select
| "2022-03-10" end_date,
| user_id,
| concat(date_begin,"_",date_end) date_range,
| count(*) loging_week_cnt
|from
| second_data2
|group by
| end_date,user_id,date_range
|""".stripMargin
).show(false)
+----------+-------+---------------------+---------------+
|end_date |user_id|date_range |loging_week_cnt|
+----------+-------+---------------------+---------------+
|2022-03-10|1004 |2022-02-21_2022-03-13|1 |
|2022-03-10|1002 |2022-02-21_2022-03-13|2 |
|2022-03-10|1005 |2022-02-21_2022-03-13|3 |
|2022-03-10|1000 |2022-02-21_2022-03-13|1 |
|2022-03-10|1003 |2022-02-21_2022-03-13|3 |
|2022-03-10|1001 |2022-02-21_2022-03-13|3 |
+----------+-------+---------------------+---------------+
*/
然后,统计最近三周连续登录3次的用户数。代码如下:
val result = spark.sql(
"""
|select
| end_date,
| count(distinct user_id) active_total,
| date_range
|from
| (select
| "2022-03-10" end_date,
| user_id,
| concat(date_begin,"_",date_end) date_range,
| count(*) loging_week_cnt
| from
| second_data2
| group by
| end_date,
| user_id,
| date_range
| ) t1
|where
| loging_week_cnt=3
|group by
| end_date,
| date_range
|""".stripMargin)
result.show(false)
执行以上代码,输出内容如下:
+----------+------------+---------------------+
|end_date |active_total|date_range |
+----------+------------+---------------------+
|2022-03-10|3 |2022-02-21_2022-03-13|
+----------+------------+---------------------+
1 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
2 本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
3 本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
暂无评论内容