javaals算法实现_基于ALS算法电影推荐(java版)package spark;
import java.util.Arrays;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.endation.ALS;
import org.apache.endation.MatrixFactorizationModel;
import org.apache.endation.Rating;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
public class SparkALSDemo {
public static void main(String ... args) throws Exception {
Logger logger = Logger(SparkALSDemo.class);
// 设置⽇志的等级 并关闭jetty容器的⽇志
// 设置运⾏环境,并创建SparkContext
SparkConf sparkConf = new SparkConf().setAppName("MovieLensALS");
sparkConf.setMaster("local[4]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 装载样本评分数据,并按照Timestamp模10的分为10份
String movielensHomeDir = "F:/ml-1m";
JavaRDD> ratings = File(movielensHomeDir + "/ratings.dat").map(
line -> {
String[] fields = line.split("::");
return new Tuple2(Long.parseLong(fields[3]) % 10, new Rating(Integer.parseInt(fields[0]),
Integer.parseInt(fields[1]), Double.parseDouble(fields[2])));
});
// 装载⽤户评分,该评分由评分器⽣成(即⽣成⽂件)
JavaRDD data = File("F:/");
JavaRDD myRatingsRDD = data.map(s -> {
String[] sarray = s.split("::");
return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2]));
});
// 统计样本数据中的评分概要
logger.info("Got " + unt() + " ratings from " + ratings.map(tupe -> tupe._2.user()).distinct().count() + " users " + ratings.map(tupe -> tupe._2.product()).distinct().count() + " movies");
// ⽤于训练是rating中key=[0-5]的数据
JavaRDD training = ratings.filter(x -> x._1 < 6).map(tupe2 -> tupe2._2).union(myRatingsRDD)
.
repartition(4).persist(StorageLevel.MEMORY_ONLY());
// ⽤于校验是rating中key=[6-7]的数据
JavaRDD validation = ratings.filter(x -> x._1 >= 6 && x._1 < 8).map(tupe2 -> tupe2._2).repartition(4)
.persist(StorageLevel.MEMORY_ONLY());
// ⽤于测试的是rating中key=[8-9]的数据
JavaRDD test = ratings.filter(x -> x._1 >= 8).map(tupe2 -> tupe2._2).persist(StorageLevel.MEMORY_ONLY());
logger.info("Training: " + unt() + " validation: " + unt() + " test: " + unt());
// 定义不同的参数。计算均⽅根误差值,到均⽅根误差值最⼩的模型。即:最优模型
List ranks = (List)Arrays.asList(8, 10, 12);
List lambdas = (List)Arrays.asList(0.1, 2.5, 5.0);
List numIters = (List)Arrays.asList(10, 15, 20);
MatrixFactorizationModel bestModel = null;
double bestValidationRmse = Double.MAX_VALUE;
int bestRank = 0;
double bestLambda = -1.0;
int bestNumIter = -1;
for (int i = 0; i < ranks.size(); i++) {
MatrixFactorizationModel model = RDD(training), (i), (i), (i));
double validationRmse = SparkALSDemoputeRMSEAverage(model, validation, unt());
if (validationRmse < bestValidationRmse) {
bestModel = model;
bestValidationRmse = validationRmse;
bestRank = (i);
bestLambda = (i);
bestNumIter = (i);
}
}
double testRmse = SparkALSDemoputeRMSEAverage(bestModel, test, unt());
logger.info("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".");
// 创建⼀个基准数据集,该数据集是训练数据集[training]与校验数据集[validation]的交集.最优模型就是从这个基础数据集计算得来的
JavaRDD rdd = training.union(validation).map(d -> d.rating());
double meanRating = duce((a, b) -> a + b) / unt();
double baselineRmse = Math.sqrt(test.map(x -> (meanRating - x.rating()) * (meanRating - x.rating())).reduce((a1, a2) -> a1 + a2)/ unt());
double improvement = (baselineRmse - testRmse) / baselineRmse * 100;
logger.info("The best model improves the baseline by " + String.format("%1.2f", improvement) + "%.");
// 加载电影数据
JavaRDD> movies = File(movielensHomeDir + "/movies.dat").map(line -> {
String[] fields = line.split("::");
return new Tuple2(Integer.parseInt(fields[0]), fields[1]);
});
//将⽤户已经评过分的数据滤掉
List myRatedMovieIds = myRatingsRDD.map(d -> d.product()).collect();
JavaRDD candidates = movies.map(s -> s._1).filter(m -> !ains(m));
//预测⽤户100最喜欢的10部电影
JavaRDD rr = bestModel.predict(JavaPairRDD.fromJavaRDD(candidates.map(d -> new Tuple2(100, d)))).sortBy(f->f.rating(), false, 4);
logger.info("Movies recommended for you:");
rr.take(10).forEach(a -> logger.info("⽤户" + a.user() + "-[ " + a.product() + "]-[" + a.rating() + "]"));
//jsc.stop();
}
/** * 根据模型model计算data的平均均⽅根误差 * * @param model * @param data * @param n * @return */
public static double computeRMSEAverage(MatrixFactorizationModel model, JavaRDD data, long n)
{
JavaRDD jddRat = model.predict(JavaPairRDD.fromJavaRDD(data.map(d -> new Tuple2(d.user(), d
.product()))));
JavaPairRDD pre = JavaPairRDD.fromJavaRDD(jddRat.map(f -> new Tuple2(f.user() + "_"
+ f.product(), f.rating())));
JavaPairRDD rea = JavaPairRDD.fromJavaRDD(data.map(f -> new Tuple2(f.user() + "_"
+ f.product(), f.rating())));
// 相当于SQl中的内联
JavaRDD> d = pre.join(rea).values();
return d.map(f -> Math.pow(f._1 - f._2, 2)).reduce((a, b) -> a + b) / n; }
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论