Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐

Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐Spark大数据分析与实战:基于SparkMLlib实现音乐推荐基于SparkMLlib实现音乐推荐一、实验背景:熟悉Audioscrobbler数据集基于该数据集选择合适的MLlib库算法进行数据处理进行音乐推荐(或用户推荐)二、实验目的:计算AUC评分最高的参数利用AUC评分最高的参数,给用户推荐艺术家对多个用户进行艺术家推荐利用AUC评分最高的参数,给艺术家推荐喜欢他的用户三、实验步骤:安装Hadoop和Spark启动Hadoop与Spark将文件上传到

Spark大数据分析与实战:基于Spark MLlib 实现音乐推荐


基于Spark MLlib 实现音乐推荐

一、实验背景:

熟悉 Audioscrobbler 数据集

基于该数据集选择合适的 MLlib 库算法进行数据处理

进行音乐推荐(或用户推荐)

二、实验目的:

计算AUC评分最高的参数

利用AUC评分最高的参数,给用户推荐艺术家

对多个用户进行艺术家推荐

利用AUC评分最高的参数,给艺术家推荐喜欢他的用户

三、实验步骤:

  1. 安装Hadoop和Spark
  2. 启动Hadoop与Spark
  3. 将文件上传到 HDFS
  4. 实现音乐推荐

四、实验过程:

1、安装Hadoop和Spark

具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:

Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894

提示:如果IDEA未构建Spark项目,可以转接到以下的博客:

IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536

2、启动Hadoop与Spark

查看3个节点的进程

master在这里插入图片描述
slave1
在这里插入图片描述
slave2
在这里插入图片描述

3、将文件上传到 HDFS

Shell命令:

[root@master ~]# cd /opt/data/profiledata_06-May-2005/
[root@master profiledata_06-May-2005]# ls
[root@master profiledata_06-May-2005]# hadoop dfs -put artist_alias.txt artist_data.txt user_artist_data.txt /spark/input

在这里插入图片描述

4、实现音乐推荐

源代码:

package com.John.SparkProject

import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ 
   ALS, ALSModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{ 
   DataFrame, Dataset, SparkSession}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random


/** * @author John * @Date 2021/5/25 12:49 */
object project02 { 
   
  def main(args: Array[String]): Unit = { 
   

    /** * 前期环境配置以及数据准备 */
    // 创建一个SparkSession对象
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Project02_RecommenderApp")
      .set("spark.sql.crossJoin.enabled", "true")
    val spark = new SparkSession.Builder()
      .config(conf)
      .getOrCreate()

    import spark.implicits._
    // 导入 artist_data.txt 文件 (每个艺术家的 ID 和对应的名字)
    // 字段名分别是: artistid artist_name
    val rawArtistData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/artist_data.txt")
    val artistIdDF = transformArtistData(rawArtistData)
    val artistIdDFtest = transformArtistData1(rawArtistData)

    // 导入 artist_alias.txt 文件 (将拼写错误的艺术家 ID 或ID 变体对应到该艺术家的规范 ID)
    // 字段名分别是: badid goodid
    val rawAliasData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/artist_alias.txt")
    val artistAlias = transformAliasData(rawAliasData).collect().toMap

    // 导入 user_artist_data.txt 文件 (用户音乐收听数据)
    // 字段名分别是 userid artistid playcount
    val rawUserArtistData = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/user_artist_data.txt")

    // 整合数据
    val allDF = transformUserArtistData(spark, rawUserArtistData, artistAlias)
    allDF.persist()

    // 拆分训练集和测试集
    val Array(trainDF, testDF) = allDF.randomSplit(Array(0.9, 0.1))
    trainDF.persist()

    // 查看一下指定 user 听过的艺术家
    allDF.join(artistIdDFtest,"artist").select("name").filter("user='2093760'").show(5)

    /** * 根据题目要求参数 * 给用户推荐艺术家 */
    // // 根据题目要求构建模型
    // val als = new ALS()
    // .setSeed(Random.nextLong())
    // .setImplicitPrefs(true)
    // .setRank(10) // 模型潜在因素个数
    // .setRegParam(0.01) // 正则化参数
    // .setAlpha(1.0) // 管理偏好观察值的 基线置信度
    // .setMaxIter(5) // 最大迭代次数
    // .setUserCol("user")
    // .setItemCol("artist")
    // .setRatingCol("count")
    // .setPredictionCol("prediction")
    // // 用训练数据训练模型
    // val model = als.fit(trainDF)
    // // 释放缓存资源
    // trainDF.unpersist()
    // // 开始推荐
    // val userID = 2093760
    // val artistNum = 5
    // val recommendDF = recommend(model, userID, artistNum, artistIdDF)
    //
    // val strings = recommendDF.map(_.mkString("|")).collect()
    // println(strings.toBuffer)

    /** * 计算AUC评分最高的参数 * 原理:循环指定参数暴力计算,根据AUC计算出评分最高的参数进行建模 */
    // // 艺术家id数据,用于AUC评分
    // val allArtistIds = allDF.select("artist").as[Int].distinct().collect()
    // val bAllArtistIds = spark.sparkContext.broadcast(allArtistIds)
    //
    // // 网格搜索
    // val evaluations =
    // // 利用for循环,生成不同的超参数配置
    // for (rank <- Seq(5, 30);
    // regParam <- Seq(4.0, 0.0001);
    // alpha <- Seq(1.0, 40.0))
    // yield { 
   
    // // 构建模型
    // val als = new ALS()
    // .setSeed(Random.nextLong())
    // .setImplicitPrefs(true)
    // .setRank(rank)
    // .setRegParam(regParam)
    // .setAlpha(alpha)
    // .setMaxIter(5)
    // .setUserCol("user")
    // .setItemCol("artist")
    // .setRatingCol("count")
    // .setPredictionCol("prediction")
    // val model = als.fit(trainDF)
    // val auc = areaUnderCurve(testDF, bAllArtistIds, model.transform)
    // // 释放资源
    // model.userFactors.unpersist()
    // model.itemFactors.unpersist()
    // (auc, (rank, regParam, alpha))
    // }
    // // 按评分降序输出各参数信息
    // evaluations.sorted.reverse.foreach(println)
    // (0.9134340440577203,(30,4.0,40.0)) // 最优参数
    // (0.9124295941963009,(30,4.0,1.0))
    // (0.9121292259762062,(30,1.0E-4,40.0))
    // (0.9111586767382363,(5,4.0,40.0))
    // (0.9097682726329872,(5,1.0E-4,40.0))
    // (0.9089218752871897,(5,4.0,1.0))
    // (0.9038315464345514,(5,1.0E-4,1.0))
    // (0.8951870697645603,(30,1.0E-4,1.0))

    /** * 利用AUC评分最高的参数 * 给用户推荐艺术家 */
    // // 利用AUC评分最高的参数进行模型构建
    // val als = new ALS()
    // .setSeed(Random.nextLong())
    // .setImplicitPrefs(true)
    // .setRank(30) // 模型潜在因素个数
    // .setRegParam(4.0) // 正则化参数
    // .setAlpha(40.0) // 管理偏好观察值的 基线置信度
    // .setMaxIter(5) // 最大迭代次数
    // .setUserCol("user")
    // .setItemCol("artist")
    // .setRatingCol("count")
    // .setPredictionCol("prediction")
    // // 训练模型
    // val model = als.fit(trainDF)
    // // 释放缓存资源
    // trainDF.unpersist()
    // // 开始推荐
    // val userID = 2093760
    // val artistNum = 5
    // val recommendDF = recommend(model, userID, artistNum, artistIdDF)
    // recommendDF.show()
    //
    // val strings = recommendDF.map(_.mkString("|")).collect()
    // println(strings.toBuffer)

    /** * 用测试数据对10个用户进行推荐 */
// // 利用AUC评分最高的参数进行模型构建
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(30) // 模型潜在因素个数
// .setRegParam(4.0) // 正则化参数
// .setAlpha(40.0) // 管理偏好观察值的 基线置信度
// .setMaxIter(5) // 最大迭代次数
// .setUserCol("user")
// .setItemCol("artist")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// // 训练模型
// val model = als.fit(trainDF)
// //用测试数据对100个用户进行推荐
// val someUsers = testDF.select("user").as[Int].distinct.take(10)
// // 推荐
// someUsers.map { user =>
// val recommendDF = recommend(model, user, 5, artistIdDF)
// val strings = recommendDF.map(_.mkString("|")).collect()
// (user, strings.toBuffer)
// }.foreach(println)


    /** * 利用AUC评分最高的参数 * 给艺术家推荐喜欢他的用户 */
// //利用AUC评分最高的参数进行模型构建
// val als = new ALS()
// .setSeed(Random.nextLong())
// .setImplicitPrefs(true)
// .setRank(30) // 模型潜在因素个数
// .setRegParam(4.0) // 正则化参数
// .setAlpha(40.0) // 管理偏好观察值的 基线置信度
// .setMaxIter(5) // 最大迭代次数
// .setUserCol("artist")
// .setItemCol("user")
// .setRatingCol("count")
// .setPredictionCol("prediction")
// // 训练模型
// val model = als.fit(trainDF)
// // 释放缓存资源
// trainDF.unpersist()
// // 开始推荐
// val artistID = 930
// val userNum = 100
// val recommendDF = recommendArtist(model, artistID, userNum, allDF)
// val strings = recommendDF.select("user").map(_.mkString("|")).collect()
// println(strings.toBuffer)


    // 关闭spark
    spark.stop()
  }

  /** * 规范艺术家的ID,合并数据,创建一个总的数据集 * * @param spark SparkSession * @param rawUserArtistDS 用户和艺术家的关系数据集 * @param artistAlias 艺术家别名id,用于补全 * @return */
  def transformUserArtistData(spark: SparkSession, rawUserArtistDS: Dataset[String], artistAlias: Map[Int, Int]): DataFrame = { 
   
    import spark.implicits._

    // 广播变量
    // 广播变量主要用于在迭代中一直需要被访问的只读变量
    // 它将此变量缓存在每个 executor 里,以减少集群网络传输消耗
    val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

    rawUserArtistDS.map(line => { 
   
      val Array(userId, artistId, count) = line.split(' ').map(_.toInt) //以空格分隔每一行,并将数据转换为int类型
      val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId) // 如果有值,那就可以得到这个值,如果没有就会得到一个默认值
      (userId, finalArtistId, count)
    }).toDF("user", "artist", "count").cache() // 设置字段名称, cache() 以指示 Spark 在 RDD 计算好后将其暂时存储在集群的内存里
  }

  /** * 将 artist_data.txt 文件的数据转化为dataframe * * @param rawArtistData Dataset类型的artist_data数据 * @return */
  def transformArtistData(rawArtistData: Dataset[String]): DataFrame = { 
   
    // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
    import rawArtistData.sparkSession.implicits._

    // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
    rawArtistData.flatMap(line => { 
   
      val (id, name) = line.span(_ != '\t')
      try { 
   
        if (name.nonEmpty)
          Some(id.toInt, name.trim)
        else
          None
      } catch { 
   
        case _: Exception => None
      }
    }).toDF("id", "name").cache()
  }

  /** * 将 artist_data.txt 文件的数据转化为dataframe * * @param rawArtistData Dataset类型的artist_data数据 * @return */
  def transformArtistData1(rawArtistData: Dataset[String]): DataFrame = { 
   
    // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
    import rawArtistData.sparkSession.implicits._

    // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
    rawArtistData.flatMap(line => { 
   
      val (id, name) = line.span(_ != '\t')
      try { 
   
        if (name.nonEmpty)
          Some(id.toInt, name.trim)
        else
          None
      } catch { 
   
        case _: Exception => None
      }
    }).toDF("artist", "name").cache()
  }

  /** * 将 artist_alias.txt 文件的数据转化为dataframe * * @param rawAliasData Dataset类型的artist_alias数据 * @return */
  def transformAliasData(rawAliasData: Dataset[String]): Dataset[(Int, Int)] = { 
   
    // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
    import rawAliasData.sparkSession.implicits._

    // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
    rawAliasData.flatMap(line => { 
   
      val Array(artist, alias) = line.split('\t')
      try { 
   
        if (artist.nonEmpty)
          Some(artist.toInt, alias.toInt)
        else
          None
      } catch { 
   
        case _: Exception => None
      }
    })
  }

  /** * 为指定用户推荐艺术家 * * @param model 训练好的ALS模型 * @param userId 用户id * @param howMany 推荐多少个艺术家 * @param artistIdDF 艺术家id和名称的关系映射 * @return */
  def recommend(model: ALSModel, userId: Int, howMany: Int, artistIdDF: DataFrame): DataFrame = { 
   
    import artistIdDF.sparkSession.implicits._

    // 推荐操作
    // 根据 userId 推荐 artistId
    val toRecommend = model.itemFactors
      .select($"id".as("artist")) //
      .withColumn("user", lit(userId)) // 增加一列 userId

    // 根据 prediction 排序,得到输入推荐艺术家的个数
    val topRecommendtions = model.transform(toRecommend)
      .select("artist", "prediction")
      .orderBy($"prediction".desc)
      .limit(howMany)

    // 将artistid转换为int,并将其放入数组
    val recommendedArtistIds = topRecommendtions.select("artist").as[Int].collect()

    // 根据 id 得到指定的那一行数据,并将数据返回
    artistIdDF.filter($"id" isin (recommendedArtistIds: _*))
  }

  /** * 为指定艺术家推荐用户 * * @param model 训练好的ALS模型 * @param artistId 艺术家id * @param howMany 推荐多少个艺术家 * @param userIdDF 用户id和名称的关系映射 * @return */
  def recommendArtist(model: ALSModel, artistId: Int, howMany: Int, userIdDF: DataFrame): DataFrame = { 
   
    import userIdDF.sparkSession.implicits._

    // 推荐操作
    // 根据 artistId 推荐 userId
    val toRecommend = model.itemFactors
      .select($"id".as("user")) //
      .withColumn("artist", lit(artistId))

    // 根据 prediction 排序,得到输入推荐用户的个数
    val topRecommendtions = model.transform(toRecommend)
      .select("user", "prediction")
      .orderBy($"prediction".desc)
      .limit(howMany)

    // 将userid转换为int,并将其放入数组
    val recommendedArtistIds = topRecommendtions.select("user").as[Int].collect()

    // 根据 id 得到指定的那一行数据,并将数据返回
    userIdDF.filter($"user" isin (recommendedArtistIds: _*)).dropDuplicates(Seq("user"))
  }

  /** * 计算AUC评分 * * @param positiveData 测试数据 * @param bAllArtistIDs 所有的艺术家ID * @param predictFunction model.transform * @return 评分 0-1 */
  def areaUnderCurve(positiveData: DataFrame,
                     bAllArtistIDs: Broadcast[Array[Int]],
                     predictFunction: DataFrame => DataFrame): Double = { 
   
    import positiveData.sparkSession.implicits._

    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive".
    // Make predictions for each of them, including a numeric score
    val positivePredictions = predictFunction(positiveData.select("user", "artist")).
      withColumnRenamed("prediction", "positivePrediction")

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int, Int)].
      groupByKey { 
    case (user, _) => user }.
      flatMapGroups { 
    case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { 
    case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
        // Make at most one pass over all artists to avoid an infinite loop.
        // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) { 
   
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) { 
   
            negative += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    // Join positive predictions to negative predictions by user, only.
    // This will result in a row for every possible pairing of positive and negative
    // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

}




  /** * 规范艺术家的ID,合并数据,创建一个总的数据集 * * @param spark SparkSession * @param rawUserArtistDS 用户和艺术家的关系数据集 * @param artistAlias 艺术家别名id,用于补全 * @return */
  def transformUserArtistData(spark: SparkSession, rawUserArtistDS: Dataset[String], artistAlias: Map[Int, Int]): DataFrame = { 
   
    import spark.implicits._

    // 广播变量
    // 广播变量主要用于在迭代中一直需要被访问的只读变量
    // 它将此变量缓存在每个 executor 里,以减少集群网络传输消耗
    val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

    rawUserArtistDS.map(line => { 
   
      val Array(userId, artistId, count) = line.split(' ').map(_.toInt) //以空格分隔每一行,并将数据转换为int类型
      val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId) // 如果有值,那就可以得到这个值,如果没有就会得到一个默认值
      (userId, finalArtistId, count)
    }).toDF("user", "artist", "count").cache() // 设置字段名称, cache() 以指示 Spark 在 RDD 计算好后将其暂时存储在集群的内存里
  }

  /** * 将 artist_data.txt 文件的数据转化为dataframe * * @param rawArtistData Dataset类型的artist_data数据 * @return */
  def transformArtistData(rawArtistData: Dataset[String]): DataFrame = { 
   
    // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
    import rawArtistData.sparkSession.implicits._

    // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
    rawArtistData.flatMap(line => { 
   
      val (id, name) = line.span(_ != '\t')
      try { 
   
        if (name.nonEmpty)
          Some(id.toInt, name.trim)
        else
          None
      } catch { 
   
        case _: Exception => None
      }
    }).toDF("id", "name").cache()
  }

  /** * 将 artist_data.txt 文件的数据转化为dataframe * * @param rawArtistData Dataset类型的artist_data数据 * @return */
  def transformArtistData1(rawArtistData: Dataset[String]): DataFrame = { 
   
    // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
    import rawArtistData.sparkSession.implicits._

    // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
    rawArtistData.flatMap(line => { 
   
      val (id, name) = line.span(_ != '\t')
      try { 
   
        if (name.nonEmpty)
          Some(id.toInt, name.trim)
        else
          None
      } catch { 
   
        case _: Exception => None
      }
    }).toDF("artist", "name").cache()
  }

  /** * 将 artist_alias.txt 文件的数据转化为dataframe * * @param rawAliasData Dataset类型的artist_alias数据 * @return */
  def transformAliasData(rawAliasData: Dataset[String]): Dataset[(Int, Int)] = { 
   
    // 一个隐式implicit的转换方法,转换出正确的类型,完成编译。
    import rawAliasData.sparkSession.implicits._

    // 将每一行数据以tap分隔开,最后将数据类型转换为dataframe
    rawAliasData.flatMap(line => { 
   
      val Array(artist, alias) = line.split('\t')
      try { 
   
        if (artist.nonEmpty)
          Some(artist.toInt, alias.toInt)
        else
          None
      } catch { 
   
        case _: Exception => None
      }
    })
  }

  /** * 为指定用户推荐艺术家 * * @param model 训练好的ALS模型 * @param userId 用户id * @param howMany 推荐多少个艺术家 * @param artistIdDF 艺术家id和名称的关系映射 * @return */
  def recommend(model: ALSModel, userId: Int, howMany: Int, artistIdDF: DataFrame): DataFrame = { 
   
    import artistIdDF.sparkSession.implicits._

    // 推荐操作
    // 查询指定用户ID的推荐艺术家所有信息
    val toRecommend = model.itemFactors
      .select($"id".as("artist")) //
      .withColumn("user", lit(userId)) // 增加一列 userId

    // 根据 prediction 排序,得到输入推荐艺术家的个数
    val topRecommendtions = model.transform(toRecommend)
      .select("artist", "prediction")
      .orderBy($"prediction".desc)
      .limit(howMany)

    // 得到需要推荐艺术家的用户id
    val recommendedArtistIds = topRecommendtions.select("artist").as[Int].collect()

    // 根据 id 得到指定的那一行数据,并将数据返回
    artistIdDF.filter($"id" isin (recommendedArtistIds: _*))
  }

  /** * 为指定艺术家推荐用户 * * @param model 训练好的ALS模型 * @param artistId 艺术家id * @param howMany 推荐多少个艺术家 * @param userIdDF 用户id和名称的关系映射 * @return */
  def recommendArtist(model: ALSModel, artistId: Int, howMany: Int, userIdDF: DataFrame): DataFrame = { 
   
    import userIdDF.sparkSession.implicits._

    // 推荐操作
    // 根据 userId 推荐艺术家
    val toRecommend = model.itemFactors
      .select($"id".as("user")) //
      .withColumn("artist", lit(artistId))

    // 根据 prediction 排序,得到输入推荐艺术家的个数
    val topRecommendtions = model.transform(toRecommend)
      .select("user", "prediction")
      .orderBy($"prediction".desc)
      .limit(howMany)

    // 得到需要推荐的艺术家的id
    val recommendedArtistIds = topRecommendtions.select("user").as[Int].collect()

    // 根据 id 得到指定的那一行数据,并将数据返回
    userIdDF.filter($"user" isin (recommendedArtistIds: _*)).dropDuplicates(Seq("user"))
  }

  /** * 计算AUC评分 * * @param positiveData 测试数据 * @param bAllArtistIDs 所有的艺术家ID * @param predictFunction model.transform * @return 评分 0-1 */
  def areaUnderCurve(positiveData: DataFrame,
                     bAllArtistIDs: Broadcast[Array[Int]],
                     predictFunction: DataFrame => DataFrame): Double = { 
   
    import positiveData.sparkSession.implicits._

    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive".
    // Make predictions for each of them, including a numeric score
    val positivePredictions = predictFunction(positiveData.select("user", "artist")).
      withColumnRenamed("prediction", "positivePrediction")

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int, Int)].
      groupByKey { 
    case (user, _) => user }.
      flatMapGroups { 
    case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { 
    case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
        // Make at most one pass over all artists to avoid an infinite loop.
        // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) { 
   
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) { 
   
            negative += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    // Join positive predictions to negative predictions by user, only.
    // This will result in a row for every possible pairing of positive and negative
    // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

}

查看一下2093760听过的艺术家

查看结果:

David gray:英国民谣创作才子
The Saw Doctors:雷鬼乐乐队
Jurassic 5:说唱团体
Blackalicious:说唱团体
Xzibit:说唱歌手

在这里插入图片描述
根据题目要求参数,给用户2093760推荐艺术家

推荐结果:

50 Cent:说唱歌手
Dr. Dre:说唱歌手
Ludacris:说唱歌手
2Pac:说唱歌手
The Game:说唱歌手

结论:

我们可以看出给用户2093760推荐的歌手都是说唱歌手,这么绝对的推荐不一定是最优的。
因此我们需要计算AUC评分,用评分最高的参数再次对用户2093760进行推荐,得出相对比较靠谱的推荐结果!

在这里插入图片描述

什么是AUC?

这里问题来了,这个AUC评分究竟是什么东西,为什么它能评估模型的好坏?

简单的概括一下,AUC是一个模型的评价指标,用于分类任务
这个指标简单来说其实就是随机抽出一对样本(一个正样本,一个负样本),然后用训练得到的分类器来对这两个样本进行预测,预测得到正样本的概率大于负样本概率的概率

在这里插入图片描述
如何计算AUC?

方法一:

在有M个正样本,N个负样本的数据集里
一共有MN对样本(一对样本即,一个正样本与一个负样本)
统计这M
N对样本里,正样本的预测概率大于负样本的预测概率的个数

在这里插入图片描述
在这里插入图片描述
单纯的看这个公式,估计大家看的不是特别明白,那么我举一个小例子!

假设有4条样本
2个正样本(label=1),2个负样本(label=0),那么M*N=4

在这里插入图片描述
两两进行比较,即总共有4个样本对,分别是:
(D,B)0.8大于0.4 结果为1
(D,A)0.8大于0.1 结果为1
(C,B)0.35小于0.4 结果为0
(C,A)0.35大于0.1 结果为1

在这里插入图片描述
方法二:

另外一个方法就是利用下面的公式

在这里插入图片描述
这是上面每个参数代表的含义:
在这里插入图片描述
同样用刚才的例子来看公式:

在这里插入图片描述
我们再根据每个样本的概率进行排序:

在这里插入图片描述
然后就根据公式进行计算:
在这里插入图片描述
前面的累加就将标签为1的样本他对应的rank加起来,后面的M和N就是正样本和负样本的数量。
在这里插入图片描述
那么如果正样本和负样本的概率相同怎么办?

比如说下面的例子:
在这里插入图片描述
其实很简单,我们只需要将概率相同的正样本和负样本的rank都加起来后求平均值即可!
在这里插入图片描述
这里介绍完AUC后,我们就开始进行计算!

可以看出,评分最高的参数分别是30 , 4.0 , 0 , 40.0
在这里插入图片描述利用AUC评分最高的参数,给用户推荐艺术家

推荐结果:

[unknown]:未知的原创歌手
Maroon 5:美国摇滚乐队
Bob Marley:雷鬼乐歌手
Eminem:说唱歌手
Green Day:美国朋克乐队

结论:

我们就可以看出,现在的结果就比较多样性。因为我们的听歌爱好其实不仅仅与歌的类型有关系,还可能与国家地区、文化信仰甚至国情民势都有关系。所以大家在做这一方面的研究时,考虑的方面得全面!
在这里插入图片描述
用测试数据对10个用户进行推荐

推荐结果:

(1000190,ArrayBuffer(1361|Trans Am, 1233384|The Plastic Constellations, 1103971|Thunderbirds Are Now!, 1238030|The Advantage, 1014265|764-HERO))
(1001043,ArrayBuffer(1034635|[unknown], 1001348|ATB, 1238013|DJ Tiësto, 1043348|O-Zone, 1002065|Paul Oakenfold))
(1001129,ArrayBuffer(1003105|Frank Black and the Catholics, 1023028|Lo Fidelity Allstars, 4629|Unknown, 1005510|Eddie Izzard, 1004021|Porno for Pyros))
(1001139,ArrayBuffer(3941|Gotan Project, 1181|David Holmes, 6685231|DJ BC, 1293697|The Kleptones, 1003296|Mr. Scruff))
(1002431,ArrayBuffer(1034635|[unknown], 1233196|The Postal Service, 1000130|Bright Eyes, 1270639|The Killers, 3327|Weezer))
(1002605,ArrayBuffer(1010658|Kitaro, 5925|Carl Orff, 1015982|Antonio Vivaldi, 6684730|石井妥師, 1003552|Eric Serra))
(1004666,ArrayBuffer(1073131|Ludovico Einaudi, 1025666|Amiel, 1098360|Missy Higgins, 1006124|Hal, 1256628|王菲))
(1005158,ArrayBuffer(1034635|[unknown], 930|Eminem, 1854|Linkin Park, 4267|Green Day, 1001412|blink-182))
(1005439,ArrayBuffer(1034635|[unknown], 2439|HiM, 1000481|Slipknot, 4468|System of a Down, 1000024|Metallica))
(1005853,ArrayBuffer(1001907|Wilco, 1001531|Belle and Sebastian, 1233342|The Decemberists, 1003352|The Magnetic Fields, 1006094|Fiona Apple))

在这里插入图片描述

利用AUC评分最高的参数,给艺术家推荐喜欢他的用户

这里我是给我最喜欢的歌手Eminem进行用户推荐!

推荐结果:(由于用户的信息只有ID,因此我将所有ID都放在了一个ArrayBuffer里面)

ArrayBuffer(2173774, 2178787, 2261299, 2281019, 2217207, 2055010, 2179548, 1064023, 2109458, 2252030, 2280737, 2327614, 2174422, 2121210, 2120078, 2352459, 2043538, 2304494, 2262888, 2204489, 2140441, 2081641, 2065066, 1065216, 2004509, 2415376, 2341868, 2378796, 2017852, 2217164, 2295296, 2006262, 2149670, 2101834, 2291193, 2341056, 2286247, 2197383, 2053133, 2202543, 2344527, 2037378, 2013279, 2319286, 2248139, 1001869, 1006203, 2090112, 2214080, 2262555, 2322726, 2114528, 2006236, 2171978, 2137321, 1053089, 2344343, 1008973, 2047916, 2242446, 2245246, 2034889, 2178977, 2287332, 2107214, 2306499, 2052064, 2025515, 1050254, 2203614, 2111793, 2074526, 2052728, 2286554, 2170336, 2297403, 2151593, 2078903, 2155198, 2034470, 2212481, 2286158, 2334813, 2061479, 2336984, 2423743, 2358114, 2160481, 2352812, 1058112, 1042355, 2132878, 1052569, 1036805, 1065511, 2379741, 2074254, 2203461, 2320407, 2037357)
在这里插入图片描述

到此,咱们的音乐推荐就完成啦!

五、实验结论:

实验要点:

实验主要考察了大家对Dataframe的转换,也让大家熟悉基于 Mllib 的数据分析流程以及了解机器学习算法超参数的选择。

实验心得:

此实验的推荐我们可以广泛利用到其他的推荐系统,比如说电影推荐系统,购物推荐系统等等。通过这些推荐,我们可以更加明白用户需要什么、喜欢什么,从而进行针对性地推荐,因此来增加我们商品的销售以及其他利益!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38457.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注