电影推荐系统-整体总结(五)实时推荐
电影推荐系统-整体总结(五)实时推荐
⼀、Scala代码实现
1.⾃定义数据类--Model.scala
package streamingRecommender
/**
* @Author : ASUS and xinrong
* @Version : 2020/9/4
*  数据格式转换类
*  ---------------电影表------------------------
*  1
*  Toy Story (1995)
*
*  81 minutes
java影视app源码
*  March 20, 2001
*  1995
*  English
*  Adventure|Animation|Children|Comedy|Fantasy
*  Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn
*  John Lasseter
*/
case class Movie(val mid:Int,val name:String,val descri:String,
val timelong:String,val cal_issue:String,val shoot:String,
val language:String,val genres :String,val actors:String,val directors:String)
/**
* -----⽤户对电影的评分数据集--------
* 1,31,2.5,1260759144
*/
case class MovieRating(val uid:Int,val mid:Int,val score:Double,val timastamp:Int)
/**
* --------⽤户对电影的标签数据集--------
* 15,339,sandra 'boring' bullock,1138537770
*/
case class Tag(val uid:Int,val mid:Int,val tag:String,val timestamp:Int)
/**
*
* MongoDB配置对象
* @param uri
* @param db
*/
case class MongoConfig(val uri:String,val db:String)
/**
* ES配置对象
* @param httpHosts
* @param transportHosts:保存的是所有ES节点的信息
* @param clusterName
*/
case class EsConfig(val httpHosts:String,val transportHosts:String,val index:String,val clusterName:String)
/**
* recs的⼆次封装数据类
* @param mid
* @param res
*/
case class Recommendation(mid:Int,res:Double)
/**
* Key-Value封装数据类
* @param genres
* @param recs
*/
case class GenresRecommendation(genres:String,recs:Seq[Recommendation])
//注:Seq-Sequence是⼀个特质,可以理解成⼀个列表;Recommendation是⼀个实现类
case class UserRecs(uid:Int,recs:Seq[Recommendation])
/**
* 定义电影相似度
* @param mid
* @param recs
* 注:Seq-Sequence是⼀个特质,可以理解成⼀个列表;Recommendation是⼀个⾃定义实现类
*/
case class MoviesRecs(mid:Int,recs:Seq[Recommendation])
2.StreamingRecommender类
package streamingRecommender
db.casbah
db.casbah.MongoClient
db.casbahmons.MongoDBObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafkamon.serialization.StringDeserializer
import redis.clients.jedis.Jedis
llection.JavaConversions._
llection.mutable
llection.mutable.ArrayBuffer
/**
* @Author : ASUS and xinrong
* @Version : 2020/9/25
*        实时推荐部分
*/
object ConnHelper{
lazy val jedis=new Jedis("192.168.212.21")
lazy val mongoClient=MongoClient(casbah.MongoClientURI("mongodb://192.168.212.21:27017/recom"))
}
object StreamingRecommender {
//声明
val MAX_USER_RATINGS_NUM=20 //从Redis中获取多少个⽤户的评分
val MAX_SIM_MOVIES_NUM=20 //相似电影候选表中取多少个电影
val MONGODB_MOVIE_RECES_COLLECTION="MovieRecs"
val MONGODB_RATING_COLLECTION="Rating"
val MONGODB_STREAM_RECS_COLLECTION="StreamRecs" //实时推荐写⼊哪张表
def main(args: Array[String]): Unit = {
//⼀、声明Spark的环境、Kafka和MongoDB的相关信息--------------------------------------------------------------------    val config = Map(
"" -> "local[3]",
"pic" -> "recom",
"mongo.uri" -> "mongodb://192.168.212.21:27017/recom",
"mongo.db" -> "recom"
)
val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config(""))
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = sparkSession.sparkContext
//使⽤SparkStreaming将连续的数据转化成不连续的RDD
//指定采样时间:2秒
val ssc = new StreamingContext(sc, Seconds(2))
//定义隐式参数⽤于连接MongoDB
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
import sparkSession.implicits._
//制作共享变量--这个变量保存了含有所有电影的相似度矩阵
val simMovieMatrix = sparkSession
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_RECES_COLLECTION)
.format("db.spark.sql")
.load() //load()之后,数据由DataFrameReader变成DataFrame
.as[MoviesRecs]
.rdd //数据变成RDD[MoviesRecs]
.map { //改变数据格式
items =>
(items.mid, s.map(x => (x.mid, x.res)).toMap)
}.collectAsMap() //将整体变成Map(映射的形式)--Map[Int,Map[Int,Double]]
/
/将共享变量(⼴播变量)共享出去
val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
//val abc=sc.makeRDD(1 to 2)
// abc.map(x=>(1)).count()
//**************************** Kafka的配置信息 ***********************************
//存放Kafka的配置信息(基本上不太会改动)
val kafkaconfig = Map(
"bootstrap.servers" -> "192.168.212.21:9092", //Kafa的IP
"key.deserializer" -> classOf[StringDeserializer], //编码解码⼯具
"value.deserializer" -> classOf[StringDeserializer], //编码解码⼯具
"group.id" -> "recomgroup" // 消费者组--注意:需要在Kafka的配置⽂件-server.properties⾥进⾏配置
)
//⼆、连接Kafka将数据获取进来--------------------------------------------------------------------------------------
//1.连接Kafka(直连的⽅式)
//1)LocationStrategies--从...取数据,⼀般⽤:PreferConsistent(⼀般都是固定的)
//查看其原理啥的参考:blog.csdn/Dax1n/article/details/61917718
//2)ConsumerStrategies:和消费有关的
//Subscribe后⾯最好加上:[String,String](不加也可)
val kafkaStream = ateDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String] (Array(config("pic")), kafkaconfig))
//三、接收评分流(Kafka中-实时数据)--UID | MID | Score | TIMESTAMP--------------------------------------------------------------------
//注意:Scala和java ⾥⾯符号"|"都需要⽤转义符进⾏转义
//将数据重新组织了⼀下--⽤户ID,电影ID,评分,时间戳
//kafkaStream是DStream
val ratingStream = kafkaStream.map {
items =>
val strings = items.value().split("\\|")
(strings(0).toInt, strings(1).toInt, strings(2).toDouble, strings(3).toInt)
}
//操作每⼀个RDD
ratingStream.foreachRDD {
rdd =>
rdd.map {
case (uid, mid, score, timestamp) =>
/
/>>>>>>####  实时计算逻辑的实现  >>>>>>###
//1)从redis中获取最近这段时间的M次评分(这⾥M=20)--Redis中-历史数据
//getUserRecentlyRating(评分次数,⽤户ID,定义的lazy变量)
val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis)
//2)获取最近浏览电影r最相似的M个电影--⽤共享变量的⽅式(电影之间的相似度矩阵已经在离线部分求过)
//getTopSimMovies(评分次数,电影ID,⽤户ID,共享变量--这个变量保存了含有所有电影的相似度矩阵)
//simMovieMatrixBroadCast---BroadCast[Map[Int,Map[Int,Double]]]
//simMovieMatrixBroadCast.value---Map[Int,Map[Int,Double]]
val simTopMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, uid, mid, simMovieMatrixBroadCast.value)
//3)计算待选电影的推荐优先级(就是实现那个数学分析公式)
//computMovieScores(共享变量-保存了含有所有电影的相似度矩阵,
// 从redis中获取当前最近的M次评分(这⾥M=20),获取电影P最相似的K个电影)
val streamRecs = computMovieScores(simMovieMatrixBroadCast.value, userRecentlyRatings, simTopMovies)
//4)将数据保存到MongoDB中
saveRecsToMongoDB(uid, streamRecs)
}.count() //触发计算才有显⽰,对count()的结果我们不感兴趣,只是⽤他触发计算
//运⾏Spark Streaming 启动流式计算
ssc.start()
//不会停的,等待⼿动停⽌
ssc.awaitTermination()
}
/**
* 从Redis 取数据--当前最近的(新加⼊的-实时的)num次评分
* 注:从Kafka中抽取的实时数据⾥⾯的评分-score并没有⽤上,⽤的只是⾥⾯的uid、mid,这⾥就是根据kafka中最近观看电影的⽤户名在Redis的数据中进⾏匹配,从⽽到对应的评分记录
* @param num  评分个数
* @param uid  谁评的分数
* @param jedis 连接Redis的⼯具(客户端)
* @return
*/
def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis) = {
//Redis中保存的数据格式:
//lpush uid:1 1129:2.0 1172:4.0 1263:2.0 1287:2.0 1293:2.0 1339:3.5 1343:2.0 1371:2.5
//.lrange()是因为获取Redis中的数据时,是这样的命令⾏,EG:192.168.212.21:6379> lrange uid:1 0 5
//其中,start这⾥是0,stop这⾥是5
//注意:jedis是Java⾥⾯的,.map()是Scala⾥⾯的东西,如果要将java ⾥⾯的东西⽤到Scala⾥⾯,需要引⼊:
// llection.JavaConversions._
jedis.lrange("uid" + String, 0, num - 1).map {
items =>
val strings = items.split("\\:")
(strings(0).Int, strings(1).Double)
}.toArray
}
/**
* 取出和当前电影r相似的前num个的相似电影
*
* @param num
* @param mid
* @param uid
* @param simMovies
* @param mongoConfig
*/
def getTopSimMovies(num: Int, uid: Int, mid: Int, simMovies: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
//1.从共享变量的电影相似度矩阵中获取和当前电影的所有相似电影
//.get(mid)之后的allSimMovies的数据类型是:Option[Map[Int,Double]]
//.get之后allSimMovies的数据类型变成:Map[Int,Double]
val allSimMovies = (mid).Array
//再.toArray之后allSimMovies的数据类型是:Array[(Int,Double)]
//2.获取⽤户已经观看过的电影
//从Ratings表⾥⾯取出
val usedMovies = Client(mongoConfig.db)(MONGODB_RATING_COLLECTION)
.find(MongoDBObject("uid" -> uid)).toArray.map {
items =>
<("mid").Int
}
//3.过滤掉已经评分的电影、排序输出(降序排序)
allSimMovies.filter(x => !ains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
}
/**
* 计算待选电影的推荐分数
*
* @param simMovies          电影的相似度矩阵(历史数据)
* @param userRecentlyRatings ⽤户最近对电影的评分-20个
* @param topSimMovies        当前电影最相近的电影-20个
*/
def computMovieScores(simMovies: collection.Map[Int, Map[Int, Double]], userRecentlyRatings: Array[(Int, Double)], topSimMovies: Array[Int]) = {      //保存每⼀个待选电影和最近评分的每个电影的权重得分
val score = ArrayBuffer[(Int, Double)]()
//保存每⼀个电影的增强因⼦数
//Key-mid ,Value-有多少个
val increMap = mutable.HashMap[Int, Int]()
//保存每⼀个电影的减弱因⼦数
//Key-mid ,Value-有多少个
val decreMap = mutable.HashMap[Int, Int]()
//⽤户最近的评分和当前电影最相近的电影要做双重for循环
//topSimMovies对应图中的A(B),userRecentlyRatings的对应图中的X Y Z
for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings) {
//userRecentlyRating._1 就相当于电影r ; topSimMovie 就相当于电影q
val simScore = getMoviesSimScore(simMovies, userRecentlyRating._1, topSimMovie)
if (simScore > 0.6) {
//score也就是公式中除号上⾯和号右侧的部分
score += ((userRecentlyRating._1, simScore * userRecentlyRating._2))
if (userRecentlyRating._2 >= 3) {
//增强因⼦起作⽤
increMap(topSimMovie) = OrDefault(topSimMovie, 0) + 1
} else {
//否则减弱因⼦起作⽤
decreMap(topSimMovie) = OrDefault(topSimMovie, 0) + 1
}
}
}
case (mid, sims) =>
(mid, sims.map(_._2).sum / sims.length + log(increMap(mid)) - log(decreMap(mid)))
}.toArray //变成⼀个数组保存
}
/
**
* ⾃写log()--实现lg()
*
* @param m
* @return
*/
def log(m: Int) = {
//loh(2)、log(10) 都是可以的,感觉按照公式的话应该写成log(10)
math.log(m) / math.log(10)
}
/**
* 获取电影之间的相似度
*
* @param simMovies      和之前电影p最相似的电影集合
* @param userRatingMovie ⽤户最近给最近电影的评分中的⼀个
* @param topSimMovie    和当前电影最相似的电影集合中的⼀个
*/
def getMoviesSimScore(simMovies: collection.Map[Int, Map[Int, Double]], userRatingMovie: Int, topSimMovie: Int) = {
//模式匹配
//注意写Some,这样最后得出结果的数据类型才会和simMovies⾥⾯的相同,都是Double

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。

发表评论