3-2.大数据国赛第2套任务C-子任务二推荐系统

任务要求1

1.1实现思路

1.2连接Spark

1.3实现方式一

1.4实现方式二

 

 

任务要求1

根据子任务一的结果,计算出与用户id为6708的用户所购买相同商品种类最多的前10位用户id(只考虑他俩购买过多少种相同的商品,不考虑相同的商品买了多少次),并根据Hudi的dwd_ds_hudi库中相关表或MySQL数据库shtd_store中相关表,获取到这10位用户已购买过的商品,并剔除用户6708已购买的商品,通过计算这10位用户已购买的商品(剔除用户6708已购买的商品)与用户6708已购买的商品数据集中商品的余弦相似度累加再求均值,输出均值前5商品id作为推荐使用,将执行结果截图粘贴至客户端桌面【Release任务C提交结果.docx】中对应的任务序号下。

结果格式如下:

————————推荐Top5结果如下————————

相似度top1(商品id:1,平均相似度:0.983456)

相似度top2(商品id:71,平均相似度:0.782672)

相似度top3(商品id:22,平均相似度:0.7635246)

相似度top4(商品id:351,平均相似度:0.7335748)

相似度top5(商品id:14,平均相似度:0.522356)

1.1实现思路

img

1.2连接Spark

在Linux终端执行如下命令,连接spark shell

spark-shell  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

1.3实现方式一

查询用户38购买的商品

从订单和订单详情事实表中查询出用户38购买的商品

    spark.sql(
     """
       |select distinct `user_id`, `sku_id`
       |from `dwd_ds_hudi`.`fact_order_info` a
       |left join `dwd_ds_hudi`.`fact_order_detail` b
       | on a.id=b.order_id
       | where `user_id`='38'
       |""".stripMargin).createOrReplaceTempView("tmp1")

查询购买相同种类最多的用户

从订单和订单详情事实表中查询出与用户38购买的商品相同的用户

    spark.sql(
     """select c.`user_id`,row_number() over(order by ct desc ,`user_id`) rn from
       |(
       |select `user_id`, count( distinct `sku_id`) as ct from `dwd_ds_hudi`.`fact_order_info` a left join `dwd_ds_hudi`.`fact_order_detail` b on a.id=b.order_id
       | where `sku_id` in(select `sku_id` from tmp1) and `user_id`<>'38' group by `user_id`
       | ) c
       |""".stripMargin).createOrReplaceTempView("tmp2")

查询购买种类相同的前10用户

从订单和订单详情事实表中查询出与用户38购买的商品相同的用户并转换成1列

    spark.sql(
     """
       |select concat(max(n1),',',max(n2),',',max(n3),',',max(n4),',',max(n5),',',max(n6),',',max(n7),',',max(n8),',',max(n9),',',max(n10)) as output from
       |(select user_id as n1,'' as n2,'' as n3,'' as n4,'' as n5,'' as n6,'' as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=1
       |union all
       |select '' as n1,user_id as n2,'' as n3,'' as n4,'' as n5,'' as n6,'' as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=2
       |union all
       |select '' as n1,'' as n2,user_id as n3,'' as n4,'' as n5,'' as n6,'' as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=3
       |union all
       |select '' as n1,'' as n2,'' as n3,user_id as n4,'' as n5,'' as n6,'' as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=4
       |union all
       |select '' as n1,'' as n2,'' as n3,'' as n4,user_id as n5,'' as n6,'' as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=5
       |union all
       |select '' as n1,'' as n2,'' as n3,'' as n4,'' as n5,user_id as n6,'' as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=6
       |union all
       |select '' as n1,'' as n2,'' as n3,'' as n4,'' as n5,'' as n6,user_id as n7,'' as n8,'' as n9, '' as n10 from tmp2 where rn=7
       |union all
       |select '' as n1,'' as n2,'' as n3,'' as n4,'' as n5,'' as n6,'' as n7,user_id as n8,'' as n9, '' as n10 from tmp2 where rn=8
       |union all
       |select '' as n1,'' as n2,'' as n3,'' as n4,'' as n5,'' as n6,'' as n7,'' as n8,user_id as n9, '' as n10 from tmp2 where rn=9
       |union all
       |select '' as n1,'' as n2,'' as n3,'' as n4,'' as n5,'' as n6,'' as n7,'' as n8,'' as n9, user_id as n10 from tmp2 where rn=10) a
       |""".stripMargin).createOrReplaceTempView("tmp3")

查询前10用户购买不同商品

查询前10用户购买的不同商品,代码如下:

    spark.sql(
     """
       |select distinct `sku_id`
       |from `dwd_ds_hudi`.`fact_order_info` a
       |left join `dwd_ds_hudi`.`fact_order_detail` b
       | on a.id=b.order_id
       | where
       | (`user_id` =(select split(output,",")[0] from tmp3)
       | or
       | `user_id` =(select split(output,",")[1] from tmp3)
       | or
       | `user_id` =(select split(output,",")[2] from tmp3)
       | or
       | `user_id` =(select split(output,",")[3] from tmp3)
       | or
       | `user_id` =(select split(output,",")[4] from tmp3)
       | or
       | `user_id` =(select split(output,",")[5] from tmp3)
       | or
       | `user_id` =(select split(output,",")[6] from tmp3)
       | or
       | `user_id` =(select split(output,",")[7] from tmp3)
       | or
       | `user_id` =(select split(output,",")[8] from tmp3)
       | or
       | `user_id` =(select split(output,",")[9] from tmp3))
       | and `sku_id` not in (select sku_id from tmp1)
       |
       |""".stripMargin).createOrReplaceTempView("tmp4")

查询需要标准化和one hot编码的数据

从dim_sku_info表中查询出sku_id,spu_id,price,weight,tm_id,category3_id,编写spark sql代码如下:

    import spark.implicits._
   spark.sql(
     """
       |select
       |   `id` ,
       | `spu_id` ,
       | `price` ,
       | `weight` ,
       | `tm_id` ,
       | `category3_id`
       |from `dwd_ds_hudi`.`dim_sku_info` order by id
       |""".stripMargin).createOrReplaceTempView("tmp5")

标准化处理price

将price和weight转换为向量dataframe,编写代码如下:

    val stdDF= spark.sql("select id,cast(price as double) price,cast(weight as double) weight from tmp5 order by id").rdd.map(row => (row.getAs[Integer]("id"),
     Vectors.dense(row.getAs[Double]("price")),
     Vectors.dense(row.getAs[Double]("weight"))
  )).toDF("id", "price","weight")
   stdDF.show(false)

对price进行标准化处理,编写代码如下:

    //对price进行标准化
   val scaler = new StandardScaler()
    .setInputCol("price")
    .setOutputCol("stdprice")
    .setWithStd(true)
    .setWithMean(false)
   val scalerModel = scaler.fit(stdDF)
   val scaledData = scalerModel.transform(stdDF)
   scaledData.show(false)

标准化处理weight

对weight进行标准化,编写代码如下

    //对weight进行标准化
   val scaler2 = new StandardScaler()
    .setInputCol("weight")
    .setOutputCol("stdweight")
    .setWithStd(true)
    .setWithMean(false)
   val scalerModel2 = scaler2.fit(scaledData)
   val scaledData2 = scalerModel2.transform(scaledData)
   scaledData2.show(false)
   scaledData2.createOrReplaceTempView("tmp6")

对spu_id进行one-hot编码

查询出id和spu_id数据并转换为dataframe,编写代码如下:

    //对spu_id进行one-hot编码
   val spu_one_hotDF=spark.sql(
     """
       |select distinct id,spu_id from tmp5 order by id
       |""".stripMargin)

对spu_id进行one hot编码

    val spu_encoder = new OneHotEncoder()
    .setInputCol("spu_id")
    .setOutputCol("spu_id_new")
    .setDropLast(false)
   val spu_encoderModel= spu_encoder.fit(spu_one_hotDF)
   val spu_encoderData = spu_encoderModel.transform(spu_one_hotDF)
   spu_encoderData.show(false)
   spu_encoderData.createOrReplaceTempView("tmp7")

对tm_id进行one-hot编码

查询出id和tm_id数据并转换为dataframe,编写代码如下:

    //对tm_id进行one-hot编码
   val tm_one_hotDF=spark.sql(
     """
       |select distinct id,tm_id from tmp5 order by id
       |""".stripMargin)

对tm_id进行one hot编码

    val tm_one_hotDF=spark.sql(
     """
       |select distinct id,tm_id from tmp5 order by id
       |""".stripMargin)
   val tm_encoder = new OneHotEncoder()
    .setInputCol("tm_id")
    .setOutputCol("tm_id_new")
    .setDropLast(false)
   val tm_encoderModel= tm_encoder.fit(tm_one_hotDF)
   val tm_encoderData = tm_encoderModel.transform(tm_one_hotDF)
   tm_encoderData.show(false)
   tm_encoderData.createOrReplaceTempView("tmp8")

对category3_id进行one-hot编码

查询出id和category3_id数据并转换为dataframe,编写代码如下:

    //对category3_id进行one-hot编码
   val category3_one_hotDF=spark.sql(
     """
       |select distinct id,category3_id from tmp5 order by id
       |""".stripMargin)

对category3_id进行one hot编码

    val category3_encoder = new OneHotEncoder()
    .setInputCol("category3_id")
    .setOutputCol("category3_id_new")
    .setDropLast(false)
   val category3_encoderModel= category3_encoder.fit(category3_one_hotDF)
   val category3_encoderData = category3_encoderModel.transform(category3_one_hotDF)
   category3_encoderData.show(false)
   category3_encoderData.createOrReplaceTempView("tmp9")

将标准化和one hot编码后的数据合并

合并标准化和one hot编码后的数据合并成一个新的表,代码如下:

    spark.sql(
     """
       |select a.id,
       |b.`stdprice` as `price`,
       |b.`stdweight` as `weight`,
       |c.`spu_id_new` as `spu_id`,
       |d.`tm_id_new` as `tm_id`,
       |e.`category3_id_new` as `category3_id`
       |from
       |tmp5 a
       |left join tmp6 b
       |on a.id = b.id
       |left join tmp7 c
       |on a.id = c.id
       |left join tmp8 d
       |on a.id = d.id
       |left join tmp9 e
       |on a.id = e.id
       |order by a.id
       |""".stripMargin).createOrReplaceTempView("tmp10")


   spark.sql("select * from tmp10").show(false)

获取用户购买的商品特征向量

获取用户38购买的商品特征向量dataframe

    val featuresDF = spark.sql("select * from tmp10 where `id` in (select `sku_id` from tmp1)").toDF
   featuresDF.show(false)

获取前10用户购买的商品特征向量(剔除与用户38相同的商品)dataframe

    val featuresDF2 = spark.sql("select * from tmp10 where `id` in (select `sku_id` from tmp4)").toDF
   featuresDF2.show(false)

将特征列转换为RDD

将用户38购买的商品特征向量dataframe转换为RDD

    val featuresRDD = featuresDF.rdd.map {
     case Row(sku_id: Int, stdprice: Vector, stdweight: Vector, spuIdEncoded: Vector, tmIdEncoded: Vector, category3IdEncoded: Vector) =>
       val spuIdArr = spuIdEncoded.toArray
       val spuIdVector = Vectors.dense(spuIdArr.slice(1, spuIdArr.length))
       val tmIdArr = tmIdEncoded.toArray
       val tmIdVector = Vectors.dense(tmIdArr.slice(1, tmIdArr.length))
       val category3IdArr = category3IdEncoded.toArray
       val category3IdVector = Vectors.dense(category3IdArr.slice(1, category3IdArr.length))
      (sku_id, stdprice, stdweight, spuIdVector, tmIdVector, category3IdVector)
  }
   featuresRDD.collect().foreach(println)

将前10用户购买的商品特征向量(剔除与用户38相同的商品)dataframe转换为RDD

    val featuresRDD2 = featuresDF2.rdd.map {
     case Row(sku_id: Int, stdprice: Vector, stdweight: Vector, spuIdEncoded: Vector, tmIdEncoded: Vector, category3IdEncoded: Vector) =>
       val spuIdArr = spuIdEncoded.toArray
       val spuIdVector = Vectors.dense(spuIdArr.slice(1, spuIdArr.length))
       val tmIdArr = tmIdEncoded.toArray
       val tmIdVector = Vectors.dense(tmIdArr.slice(1, tmIdArr.length))
       val category3IdArr = category3IdEncoded.toArray
       val category3IdVector = Vectors.dense(category3IdArr.slice(1, category3IdArr.length))
      (sku_id, stdprice, stdweight, spuIdVector, tmIdVector, category3IdVector)
  }
   featuresRDD2.collect().foreach(println)

编写余弦相似度计算函数

计算余弦相似度的函数,两个向量的点积除以两个向量的欧几里得范数的乘积,代码如下:

  // 计算余弦相似度的函数,两个向量的点积除以两个向量的欧几里得范数的乘积
 def cosineSimilarity(v1: Vector, v2: Vector): Double = {
   // dot 函数计算两个向量的点积,也称为内积或数量积。对于两个长度相同的向量 v1 和 v2,它们的点积定义为:
   // dot(v1, v2) = v1[0]*v2[0] + v1[1]*v2[1] + ... + v1[n-1]*v2[n-1]
   // norm 函数计算向量的范数,通常使用 2-范数(欧几里得范数)。对于一个向量 v,它的 2-范数定义为:
   // norm(v, 2.0) = sqrt(v[0]^2 + v[1]^2 + ... + v[n-1]^2)
   v1.dot(v2) / (Vectors.norm(v1, 2.0) * Vectors.norm(v2, 2.0))
}

计算所有组合的相似度的平均值

将将用户38购买的商品特征向量和前10用户购买的商品特征向量(剔除与用户38相同的商品)逐个成对计算余弦相似度后累加求平均值,代码如下:

    // 计算所有组合的余弦相似度
   val similarities = featuresRDD.cartesian(featuresRDD2).map {
     case ((sku_id1, stdprice1, stdweight1, spuIdEncoded1, tmIdEncoded1, category3IdEncoded1),
    (sku_id2, stdprice2, stdweight2, spuIdEncoded2, tmIdEncoded2, category3IdEncoded2)) =>
       val similarity = (cosineSimilarity(stdprice1, stdprice2) +
         cosineSimilarity(stdweight1, stdweight2) +
         cosineSimilarity(spuIdEncoded1, spuIdEncoded2) +
         cosineSimilarity(tmIdEncoded1, tmIdEncoded2) +
         cosineSimilarity(category3IdEncoded1, category3IdEncoded2))/5
      (sku_id2, similarity)
  }

   // 打印余弦相似度
   similarities.collect().foreach(println)

按要求输出结果

推荐相似度最高的前5个商品,按要求输出结果,代码如下:

    // 结果去重取最大值,根据余弦相似度排序后取前5个结果
   val result = similarities.reduceByKey((x, y) => if (x > y) x else y).sortBy(i => (-i._2, -i._1)).collect().take(5)
   result.foreach(println)
   var index = 1
   result.foreach( i => {
       if(index == 1){
         println("------------------------推荐Top5结果如下------------------------")
      }
       println(f"相似度top${index}(商品id:${i._1},平均相似度:${i._2})")
       index += 1
    }
  )

1.4实现方式二

查询用户id 38购买的商品

    //查询用户id 38购买的商品
   spark.sql(
     """
       |select distinct `user_id`, `sku_id`
       |from `dwd_ds_hudi`.`fact_order_info` a
       |left join `dwd_ds_hudi`.`fact_order_detail` b
       | on a.id=b.order_id
       | where `user_id`='38'
       |""".stripMargin).repartition(20).createOrReplaceTempView("tmp1")

查询购买相同商品种类最多的用户id

    spark.sql(
     """select c.`user_id`,row_number() over(order by ct desc ,`user_id`) rn from
       |(
       |select `user_id`, count( distinct `sku_id`) as ct from `dwd_ds_hudi`.`fact_order_info` a left join `dwd_ds_hudi`.`fact_order_detail` b on a.id=b.order_id
       | where `sku_id` in(select `sku_id` from tmp1) and `user_id`<>'38' group by `user_id`
       | ) c
       |""".stripMargin).repartition(20).createOrReplaceTempView("tmp2")

查询购买相同商品种类最多的前10购买的商品

查询购买相同商品种类最多的前10用户id剔除用户38已购买的商品

    spark.sql(
     """
       |select distinct `sku_id`
       |from `dwd_ds_hudi`.`fact_order_info` a
       |left join `dwd_ds_hudi`.`fact_order_detail` b
       | on a.id=b.order_id
       | where
       | (a.`user_id` in (select user_id from tmp2 order by rn limit 10))
       | and b.`sku_id` not in (select sku_id from tmp1)
       |""".stripMargin).repartition(20).createOrReplaceTempView("tmp3")

查询需要标准化和one hot编码的数据

从dim_sku_info表中查询出sku_id,spu_id,price,weight,tm_id,category3_id,编写spark sql代码如下:

    val selectedDF = spark.sql("select id,spu_id,price,weight,tm_id,category3_id from dwd_ds_hudi.dim_sku_info order by id").toDF()

定义price和weight标准化策略

定义price和weight转换成向量和标准化策略,代码如下:

    val priceAssembler = new VectorAssembler().setInputCols(Array("price")).setOutputCol("price_features")
   val priceScaler = new StandardScaler().setInputCol("price_features").setOutputCol("scaled_price_features")
   val weightAssembler = new VectorAssembler().setInputCols(Array("weight")).setOutputCol("weight_features")
   val weightScaler = new StandardScaler().setInputCol("weight_features").setOutputCol("scaled_weight_features")

定义独热编码策略

将spu_id、tm_id、category3_id当做字符串并生成索引,定义spu_id、tm_id、category3_id的独热编码策略,代码如下:

    val spuIdIndexer = new StringIndexer().setInputCol("spu_id").setOutputCol("spu_id_index").setHandleInvalid("skip")
   val tmIdIndexer = new StringIndexer().setInputCol("tm_id").setOutputCol("tm_id_index").setHandleInvalid("skip")
   val category3IdIndexer = new StringIndexer().setInputCol("category3_id").setOutputCol("category3_id_index").setHandleInvalid("skip")

   val oneHotEncoder = new OneHotEncoder().setInputCols(Array("spu_id_index", "tm_id_index", "category3_id_index")).setOutputCols(Array("spu_id_encoded", "tm_id_encoded", "category3_id_encoded")).setDropLast(false)

构建管道拟合并转换数据

构建管道:将数据处理和特征处理的步骤组成一个管道,将多个数据处理和机器学习操作按照指定顺序组合成一个流水线的工具 调用setStages 方法接受一个包含各个阶段实例的数组,按照数组中的顺序执行。 使用管道模型进行转换,transform 方法使用拟合后的模型对测试数据进行转换。

    val pipeline = new Pipeline().setStages(Array(priceAssembler, priceScaler, weightAssembler, weightScaler, spuIdIndexer, tmIdIndexer, category3IdIndexer, oneHotEncoder))
   val model = pipeline.fit(selectedDF)
   val transformedDF = model.transform(selectedDF)
   transformedDF.show(false)

获取用户购买的商品特征向量

获取用户38购买的商品特征向量dataframe

    val featuresDF = transformedDF.select("id", "scaled_price_features", "scaled_weight_features", "spu_id_encoded", "tm_id_encoded", "category3_id_encoded").filter("id in (select sku_id from tmp1 order by sku_id)")
   featuresDF.show(false)

获取前10用户购买的商品特征向量(剔除与用户38相同的商品)dataframe

    val featuresDF2 = transformedDF.select("id", "scaled_price_features", "scaled_weight_features", "spu_id_encoded", "tm_id_encoded", "category3_id_encoded").filter("id in (select sku_id from tmp3 order by sku_id)")
featuresDF2.show(false)

将特征列转换为RDD

将用户38购买的商品特征向量dataframe转换为RDD

    val featuresRDD = featuresDF.rdd.map {
   case Row(sku_id: Int, scaledPriceFeatures: Vector, scaledWeightFeatures: Vector, spuIdEncoded: Vector, tmIdEncoded: Vector, category3IdEncoded: Vector) =>
      (sku_id, scaledPriceFeatures, scaledWeightFeatures,  spuIdEncoded, tmIdEncoded, category3IdEncoded)
  }
   featuresRDD.collect().foreach(println)

将前10用户购买的商品特征向量(剔除与用户38相同的商品)dataframe转换为RDD

    val featuresRDD2 = featuresDF2.rdd.map {
     case Row(sku_id: Int, scaledPriceFeatures: Vector, scaledWeightFeatures: Vector, spuIdEncoded: Vector, tmIdEncoded: Vector, category3IdEncoded: Vector) =>
      (sku_id, scaledPriceFeatures, scaledWeightFeatures,  spuIdEncoded, tmIdEncoded, category3IdEncoded)
  }
   featuresRDD2.collect().foreach(println)

编写余弦相似度计算函数

计算余弦相似度的函数,两个向量的点积除以两个向量的欧几里得范数的乘积,代码如下:

  // 计算余弦相似度的函数,两个向量的点积除以两个向量的欧几里得范数的乘积
 def cosineSimilarity(v1: Vector, v2: Vector): Double = {
   // dot 函数计算两个向量的点积,也称为内积或数量积。对于两个长度相同的向量 v1 和 v2,它们的点积定义为:
   // dot(v1, v2) = v1[0]*v2[0] + v1[1]*v2[1] + ... + v1[n-1]*v2[n-1]
   // norm 函数计算向量的范数,通常使用 2-范数(欧几里得范数)。对于一个向量 v,它的 2-范数定义为:
   // norm(v, 2.0) = sqrt(v[0]^2 + v[1]^2 + ... + v[n-1]^2)
   v1.dot(v2) / (Vectors.norm(v1, 2.0) * Vectors.norm(v2, 2.0))
}

计算所有组合的相似度的平均值

将将用户38购买的商品特征向量和前10用户购买的商品特征向量(剔除与用户38相同的商品)逐个成对计算余弦相似度后累加求平均值,代码如下:

    // 计算所有组合的余弦相似度
   val similarities = featuresRDD.cartesian(featuresRDD2).map {
   case ((sku_id1, scaledPriceFeatures1, scaledWeightFeatures1, spuIdEncoded1, tmIdEncoded1, category3IdEncoded1),
          (sku_id2, scaledPriceFeatures2, scaledWeightFeatures2, spuIdEncoded2, tmIdEncoded2, category3IdEncoded2)) =>
       val similarity = (cosineSimilarity(scaledPriceFeatures1, scaledPriceFeatures2) +
                       cosineSimilarity(scaledWeightFeatures1, scaledWeightFeatures2) +
                       cosineSimilarity(spuIdEncoded1, spuIdEncoded2) +
                       cosineSimilarity(tmIdEncoded1, tmIdEncoded2) +
                       cosineSimilarity(category3IdEncoded1, category3IdEncoded2))/5
      (sku_id2, similarity)
  }

   // 打印余弦相似度
   similarities.collect().foreach(println)

按要求输出结果

推荐相似度最高的前5个商品,按要求输出结果,代码如下:

    // 结果去重取最大值,根据余弦相似度排序后取前5个结果
   val result = similarities.reduceByKey((x, y) => if (x > y) x else y).sortBy(i => (-i._2, -i._1)).collect().take(5)
   var index = 1
   result.foreach( i => {
       if(index == 1){
         println("------------------------推荐Top5结果如下------------------------")
      }
       println(f"相似度top${index}(商品id:${i._1},平均相似度:${i._2})")
       index += 1
    }
  )
© 版权声明
THE END
喜欢就支持一下吧
点赞257赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容