5.14 案例:使用Spark统计连续三周登录的用户数

假设有如下一个用户登录日志表:

+------+-------+-------------------+
|log_id|user_id|           log_time|
+------+-------+-------------------+
|     1|   1000|2022-03-10 10:08:13|
|     2|   1000|2022-03-18 10:33:22|
|     3|   1000|2022-03-26 18:59:19|
|     4|   1001|2022-03-03 20:59:13|
|     5|   1001|2022-03-10 05:53:49|
|     6|   1001|2022-02-26 02:27:51|
|     7|   1002|2022-03-01 20:59:13|
|     8|   1002|2022-03-07 05:53:49|
|     9|   1002|2022-02-28 02:27:51|
|   10|   1003|2022-02-27 20:59:13|
|   11|   1003|2022-03-05 05:53:49|
|   12|   1003|2022-03-12 02:27:51|
|   13|   1004|2022-02-28 20:59:13|
|   14|   1004|2022-03-05 05:53:49|
|   15|   1004|2022-03-18 02:27:51|
|   16|   1005|2022-02-25 20:59:13|
|   17|   1005|2022-03-04 05:53:49|
|   18|   1005|2022-03-11 02:27:51|
+------+-------+-------------------+

要求:根据登录日志表求最近连续三周登录的用户数。

任务说明

求 login_time 字段值为 2022-03-10 的最近连续三周登录的用户数,最终使用Spark中的show()方法输出如下字段:

字段名 字段说明 备注
end_date 数据统计日期 2022-03-10
active_total 活跃用户数  
date_range 统计周期 格式:统计开始时间_结束时间

实现思路

分析给定的测试数据,可以看出,满足”最近三周连续登录”条件的有三个用户。如下图所示:

img

想要完成这个需求,我们得先拿到最近三周的开始时间和结束时间,然后筛选范围数据,最后利用相关函数计算其连续性。

(1)确定当前 2022-03-10 是周几,然后求得周日的日期(也就是这三周的最后一天)。

(2)拿到 2022-03-10 这周的周日时间后,获取两周前的开始日期(也就是这三周的第一天)。

(3)筛选范围,计算每位用户是否符合三周的连续性。

创建数据环境

为能动手实现此任务,我们首先在 MySQL 的 test 数据库中创建数据测试表 log_data:

create table if not exists log_data(
  log_id varchar(200) comment '日志id',
  user_id varchar(200) comment '用户id',
  log_time datetime NULL DEFAULT NULL comment '登录时间'
);

然后执行以下SQL语句,插入测试数据:

insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('1', '2022-03-10 10:08:13', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('2', '2022-03-18 10:33:22', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('3', '2022-03-26 18:59:19', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('4', '2022-03-03 20:59:13', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('5', '2022-03-10 05:53:49', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('6', '2022-02-26 02:27:51', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('7', '2022-03-01 20:59:13', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('8', '2022-03-07 05:53:49', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('9', '2022-02-28 02:27:51', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('10', '2022-02-27 20:59:13', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('11', '2022-03-05 05:53:49', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('12', '2022-03-12 02:27:51', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('13', '2022-02-28 20:59:13', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('14', '2022-03-05 05:53:49', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('15', '2022-03-18 02:27:51', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('16', '2022-02-25 20:59:13', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('17', '2022-03-04 05:53:49', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('18', '2022-03-11 02:27:51', '1005');

实现过程

请按以下步骤实现。

1)读取数据源

编写Spark代码,读取MySQL中的测试数据到DataFrame中。代码如下:

val userLoginInfo = spark.read.format("jdbc")
    .option("driver","com.mysql.jdbc.Driver")
    .option("url","jdbc:mysql://localhost:3306/test")
    .option("user","root")
    .option("password","admin")
    .option("dbtable","log_data")
    .load()

userLoginInfo.printSchema
userLoginInfo.orderBy("user_id","log_time").show

执行以上代码,输出内容如下:

root
|-- log_id: string (nullable = true)
|-- user_id: string (nullable = true)
|-- log_time: timestamp (nullable = true)

+------+-------+-------------------+
|log_id|user_id|           log_time|
+------+-------+-------------------+
|     1|   1000|2022-03-10 10:08:13|
|     2|   1000|2022-03-18 10:33:22|
|     3|   1000|2022-03-26 18:59:19|
|     6|   1001|2022-02-26 02:27:51|
|     4|   1001|2022-03-03 20:59:13|
|     5|   1001|2022-03-10 05:53:49|
|     9|   1002|2022-02-28 02:27:51|
|     7|   1002|2022-03-01 20:59:13|
|     8|   1002|2022-03-07 05:53:49|
|   10|   1003|2022-02-27 20:59:13|
|   11|   1003|2022-03-05 05:53:49|
|   12|   1003|2022-03-12 02:27:51|
|   13|   1004|2022-02-28 20:59:13|
|   14|   1004|2022-03-05 05:53:49|
|   15|   1004|2022-03-18 02:27:51|
|   16|   1005|2022-02-25 20:59:13|
|   17|   1005|2022-03-04 05:53:49|
|   18|   1005|2022-03-11 02:27:51|
+------+-------+-------------------+

2)使用临时表,获取2022-03-10 这周的周日时间。代码如下:

// 注册为临时表
userLoginInfo.createOrReplaceTempView("log_data")

val df1 = spark.sql(
    """
      |select
      |   user_id,
      |   date(log_time) log_date,
      |   next_day("2022-03-10", "Sunday") date_end
      |from
      |   log_data
      |""".stripMargin)

df1.show

在上面的代码中,我们使用了函数next_day(),来获得指定日期后的第一个周日(Sunday)。请注意,如果登录日本身就是周日,则next_day()函数取值将是下一个周的周日。

执行以上代码,输出内容如下:

+-------+----------+----------+
|user_id| log_date| date_end|
+-------+----------+----------+
|   1000|2022-03-10|2022-03-13|
|   1000|2022-03-18|2022-03-13|
|   1000|2022-03-26|2022-03-13|
|   1001|2022-03-03|2022-03-13|
|   1001|2022-03-10|2022-03-13|
|   1001|2022-02-26|2022-03-13|
|   1002|2022-03-01|2022-03-13|
|   1002|2022-03-07|2022-03-13|
|   1002|2022-02-28|2022-03-13|
|   1003|2022-02-27|2022-03-13|
|   1003|2022-03-05|2022-03-13|
|   1003|2022-03-12|2022-03-13|
|   1004|2022-02-28|2022-03-13|
|   1004|2022-03-05|2022-03-13|
|   1004|2022-03-18|2022-03-13|
|   1005|2022-02-25|2022-03-13|
|   1005|2022-03-04|2022-03-13|
|   1005|2022-03-11|2022-03-13|
+-------+----------+----------+

3)筛选出最近三周的数据。

获取两周前的起始日期,筛选符合要求的数据,并计算每个登录日期所属周的周日日期。代码如下:

// 注册为临时表
df1.createOrReplaceTempView("first_data")

val df2 = spark.sql(
    """
      |select
      |   *
      |from
      |   (select
      |     user_id,
      |     log_date,
      |     if(dayofweek(log_date)=1,log_date,next_day(log_date,"sunday")) log_date_week_end,
      |     date_sub(date_end,20) date_begin,
      |     date_end
      |   from
      |     first_data
      |   )t1
      |where
      |   log_date <= date_end
      |   and
      |   log_date >= date_begin
      |""".stripMargin)

df2.show

在上面的代码中,if(dayofweek(log_date)=1,log_date,next_day(log_date,”sunday”))的含义是:如果登录日本身就是周日,则取自身;否则取周日。

执行以上代码,输出内容如下:

+-------+----------+-----------------+----------+----------+
|user_id| log_date|log_date_week_end|date_begin| date_end|
+-------+----------+-----------------+----------+----------+
|   1000|2022-03-10|       2022-03-13|2022-02-21|2022-03-13|
|   1001|2022-03-03|       2022-03-06|2022-02-21|2022-03-13|
|   1001|2022-03-10|       2022-03-13|2022-02-21|2022-03-13|
|   1001|2022-02-26|       2022-02-27|2022-02-21|2022-03-13|
|   1002|2022-03-01|       2022-03-06|2022-02-21|2022-03-13|
|   1002|2022-03-07|       2022-03-13|2022-02-21|2022-03-13|
|   1002|2022-02-28|       2022-03-06|2022-02-21|2022-03-13|
|   1003|2022-02-27|       2022-02-27|2022-02-21|2022-03-13|
|   1003|2022-03-05|       2022-03-06|2022-02-21|2022-03-13|
|   1003|2022-03-12|       2022-03-13|2022-02-21|2022-03-13|
|   1004|2022-02-28|       2022-03-06|2022-02-21|2022-03-13|
|   1004|2022-03-05|       2022-03-06|2022-02-21|2022-03-13|
|   1005|2022-02-25|       2022-02-27|2022-02-21|2022-03-13|
|   1005|2022-03-04|       2022-03-06|2022-02-21|2022-03-13|
|   1005|2022-03-11|       2022-03-13|2022-02-21|2022-03-13|
+-------+----------+-----------------+----------+----------+

4)计算最近三周连续登录的用户数。

思路:

  • 1)首先,按user_id和log_date_week_end去重,这样可以保证每个用户每周保留一条登录记录;

  • 2)然后,按user_id分组统计:group by … count,得到每个用户最近三周的登录记录;

  • 3)对结果进行过滤,如果每个用户最近三周的登录记录数等于3,则为连续登录用户数。

实现代码如下:

// 对df2按user_id和log_date_week_end去重
df2.dropDuplicates("user_id","log_date_week_end")
  // 注册为临时表
  .createOrReplaceTempView("second_data2")  


/*
// 然后,按user_id分组统计:group by ... count,得到每个用户最近三周的登录记录
spark.sql(
    """
      |select
      |   "2022-03-10" end_date,
      |   user_id,
      |   concat(date_begin,"_",date_end) date_range,
      |   count(*) loging_week_cnt
      |from
      |   second_data2
      |group by
      |   end_date,user_id,date_range
      |""".stripMargin
  ).show(false)  
+----------+-------+---------------------+---------------+
|end_date |user_id|date_range           |loging_week_cnt|
+----------+-------+---------------------+---------------+
|2022-03-10|1004   |2022-02-21_2022-03-13|1             |
|2022-03-10|1002   |2022-02-21_2022-03-13|2             |
|2022-03-10|1005   |2022-02-21_2022-03-13|3             |
|2022-03-10|1000   |2022-02-21_2022-03-13|1             |
|2022-03-10|1003   |2022-02-21_2022-03-13|3             |
|2022-03-10|1001   |2022-02-21_2022-03-13|3             |
+----------+-------+---------------------+---------------+
*/

然后,统计最近三周连续登录3次的用户数。代码如下:

val result = spark.sql(
    """
      |select
      |   end_date,
      |   count(distinct user_id) active_total,
      |   date_range
      |from
      |   (select
      |         "2022-03-10" end_date,
      |         user_id,
      |         concat(date_begin,"_",date_end) date_range,
      |         count(*) loging_week_cnt
      |   from
      |         second_data2
      |   group by
      |         end_date,
      |         user_id,
      |         date_range
      |   ) t1
      |where
      |   loging_week_cnt=3
      |group by
      |   end_date,
      |   date_range
      |""".stripMargin)
       
result.show(false)

执行以上代码,输出内容如下:

+----------+------------+---------------------+
|end_date |active_total|date_range           |
+----------+------------+---------------------+
|2022-03-10|3           |2022-02-21_2022-03-13|
+----------+------------+---------------------+
© 版权声明
THE END
喜欢就支持一下吧
点赞103赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容