一、新建scala项目
二、构造程序
代码如下package xyz.pl8import java.io.Fileimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.evaluation.RegressionMetricsimport org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating, ALS}import org.apache.spark.rdd.RDDimport scala.util.Randomobject MovieLensALS { //1. Define a rating elicitation function // Seq[Rating] def elicitateRating(movies: Seq[(Int, String)])={ val prompt="Please rate the following movie(1-5(best) or 0 if not seen: )" println(prompt) val ratings= movies.flatMap{x=> var rating: Option[Rating] = None // Rating(user: Int, product: Int, rating: Double) var valid = false while(!valid){ println(x._2+" :") try{ val r = Console.readInt() if (r>5 || r<0){ println(prompt) } else { valid = true if (r>0){ rating = Some(Rating(0, x._1, r)) } } } catch{ case e:Exception => println(prompt) } } rating match { case Some(r) => Iterator(r) // FlatMap将结构解散成元素, 这里是Rating case None => Iterator.empty } } if (ratings.isEmpty){ error("No ratings provided!") } else { ratings } } //2. Define a RMSE computation function def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = { val prediction = model.predict(data.map(x=>(x.user, x.product))) val predDataJoined = prediction.map(x=> ((x.user,x.product),x.rating)).join(data.map(x=> ((x.user,x.product),x.rating))).values new RegressionMetrics(predDataJoined).rootMeanSquaredError } //3. Main def main(args: Array[String]) { //3.1 Setup env Logger.getLogger("org.apache.spark").setLevel(Level.WARN) if (args.length !=1){ print("Usage: movieLensHomeDir") sys.exit(1) } val conf = new SparkConf().setAppName("MovieLensALS") .set("spark.executor.memory","500m") val sc = new SparkContext(conf) //3.2 Load ratings data and know your data // ratings.dat 的格式 UserID::MovieID::Rating::Timestamp val movieLensHomeDir=args(0) // RDD[long, Rating] val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map {line => val fields = line.split("::") //timestamp, user, product, rating // 取模成分成10组 (fields(3).toLong%10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } // movies.dat格式 MovieID::Title::Genres // Map[Int,String] val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map {line => val fields = line.split("::") //movieId, movieName (fields(0).toInt, fields(1)) }.collectAsMap() val numRatings = ratings.count() val numUser = ratings.map(x=>x._2.user).distinct().count() val numMovie = ratings.map(_._2.product).distinct().count() println("Got "+numRatings+" ratings from "+numUser+" users on "+numMovie+" movies.") //3.3 Elicitate personal rating // = RDD[(long,Rating) -> Array[int] -> Map[Int, long] -> Seq[(Int, long)] -> Seq[(Int,long)] -> Seq[Int] val topMovies = ratings.map(_._2.product).countByValue().toSeq.sortBy(-_._2).take(50).map(_._1) val random = new Random(0) // Seq[(Int, String)] val selectMovies = topMovies.filter(x=>random.nextDouble() < 0.2).map(x=>(x, movies(x))) val myRatings = elicitateRating(selectMovies) val myRatingsRDD = sc.parallelize(myRatings, 1) //3.4 Split data into train(60%), validation(20%) and test(20%) val numPartitions = 10 // 6组(即60%),并上手工输入评价 val trainSet = ratings.filter(x=>x._1<6).map(_._2).union(myRatingsRDD).repartition(numPartitions).persist() val validationSet = ratings.filter(x=>x._1>=6 && x._1<8).map(_._2).persist() val testSet = ratings.filter(x=>x._1>=8).map(_._2).persist() val numTrain = trainSet.count() val numValidation = validationSet.count() val numTest = testSet.count() println("Training data: "+numTrain+" Validation data: "+numValidation+" Test data: "+numTest) //3.5 Train model and optimize model with validation set val numRanks = List(8, 12) val numIters = List(10, 20) val numLambdas = List(0.1, 10.0) var bestRmse = Double.MaxValue var bestModel: Option[MatrixFactorizationModel] = None var bestRanks = -1 var bestIters = 0 var bestLambdas = -1.0 // 寻找优化参数的模型 for(rank <- numRanks; iter <- numIters; lambda <- numLambdas){ val model = ALS.train(trainSet, rank, iter, lambda) val validationRmse = computeRmse(model, validationSet) println("RMSE(validation) = "+validationRmse+" with ranks="+rank+", iter="+iter+", Lambda="+lambda) if (validationRmse < bestRmse) { bestModel = Some(model) bestRmse = validationRmse bestIters = iter bestLambdas = lambda bestRanks = rank } } //3.6 Evaluate model on test set // 用测试集来评估模型 // 测试集均方根差 val testRmse = computeRmse(bestModel.get, testSet) println("The best model was trained with rank="+bestRanks+", Iter="+bestIters+", Lambda="+bestLambdas+ " and compute RMSE on test is "+testRmse) //3.7 Create a baseline and compare it with best model // 创建基线 并与模型进行比较 val meanRating = trainSet.union(validationSet).map(_.rating).mean() // 训练集与验证集和的均数 // 最佳根均方错误线(基线) val bestlineRmse = new RegressionMetrics(testSet.map(x=>(x.rating, meanRating))).rootMeanSquaredError // 测试集与均数的均方根差 // testRmse(这个数应该更优,值更小) val improvement = (bestlineRmse - testRmse)/bestlineRmse*100 println("The best model improves the baseline by "+"%1.2f".format(improvement)+"%.") //3.8 Make a personal recommendation // 进行个人推荐, 排除自己已经评分内容 val moviesId = myRatings.map(_.product) val candidates = sc.parallelize(movies.keys.filter(!moviesId.contains(_)).toSeq) val recommendations = bestModel.get .predict(candidates.map(x=>(0, x))) .sortBy(-_.rating) .take(50) var i = 0 println("Movies recommended for you:") recommendations.foreach{ line=> println("%2d".format(i)+" :"+movies(line.product)) i += 1 } sc.stop() }}
导入引用库
三、打包部署
程序运行时,需要指定输入数据路径,数据包含了ratings.dat和movies.dat,数据都包含在了一个数据包。, 然后解压。
配置运行参数- 点击edit configuration,在左侧点击该项目。在右侧在右侧VM options中输入“-Dspark.master=local”,指示本程序本地单线程运行
在Program argguemnts指定,上面解压的路径。
然后,在IDEA上选择MovieLensALS右键选择运行,即可运行了。 按照引导,输入自己的评价后,最后输出形式如下:The best model was trained with rank=12, Iter=20, Lambda=0.1 and compute RMSE on test is 0.868464888081759
The best model improves the baseline by 22.01%. Movies recommended for you: 0 :Julien Donkey-Boy (1999) 1 :Love Serenade (1996) 2 :Catwalk (1995)
四、HADOOP集群部署
导出jar包设置
选main类对后,点击OK确定, 这个时候配置已经完成了, 我们就可以进行编译 jar文件了, 选择菜单Build->Build Artifacts..., 生成的文件路径为/out/artifacts/MovieLensALS_jar/MovieLensALS.jar准备HADOOP环境
假设我们的HADOOP环境已经。 接下来我们要把需要计算的数据文件上传到hadoop; 首先,在hadoop上面创建文件夹,命令如下:
hdfs dfs -mkdir -p /recommendation/data
上传数据文件命令如下:
hdfs dfs -put *.dat /recommendation/data
这时时候我们可以通过命令查看,上传是否成功
hdfs dfs -cat /recommendation/data/users.dat
运行
在上面红框中,指定了生成的jar文件名, 所在路径, 以及MainClass。这面就是通过spark执行:/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --master local --class "xyz.pl8.MovieLensALS" /home/hartifacts/movielensals_jar/movielensals.jar /recommendation/data