博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
推荐系统-05-Spark电影推荐、评估与部署
阅读量:5788 次
发布时间:2019-06-18

本文共 7309 字,大约阅读时间需要 24 分钟。

一、新建scala项目

4577-20180516185620848-758954610.png

二、构造程序

4577-20180516185849833-378233054.png

代码如下

package xyz.pl8import java.io.Fileimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.evaluation.RegressionMetricsimport org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating, ALS}import org.apache.spark.rdd.RDDimport scala.util.Randomobject MovieLensALS {  //1. Define a rating elicitation function  // Seq[Rating]  def elicitateRating(movies: Seq[(Int, String)])={    val prompt="Please rate the following movie(1-5(best) or 0 if not seen: )"    println(prompt)    val ratings= movies.flatMap{x=>       var rating: Option[Rating] = None  //  Rating(user: Int, product: Int, rating: Double)       var valid = false      while(!valid){        println(x._2+" :")        try{          val r = Console.readInt()          if (r>5 || r<0){            println(prompt)          } else {            valid = true            if (r>0){              rating = Some(Rating(0, x._1, r))            }          }        } catch{          case e:Exception => println(prompt)        }      }      rating match {        case Some(r) => Iterator(r)  // FlatMap将结构解散成元素, 这里是Rating        case None => Iterator.empty      }    }    if (ratings.isEmpty){      error("No ratings provided!")    } else {      ratings    }  }  //2. Define a RMSE computation function  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]) = {    val prediction = model.predict(data.map(x=>(x.user, x.product)))    val predDataJoined = prediction.map(x=> ((x.user,x.product),x.rating)).join(data.map(x=> ((x.user,x.product),x.rating))).values    new RegressionMetrics(predDataJoined).rootMeanSquaredError  }  //3. Main  def main(args: Array[String]) {  //3.1 Setup env    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)    if (args.length !=1){      print("Usage: movieLensHomeDir")      sys.exit(1)    }    val conf = new SparkConf().setAppName("MovieLensALS")    .set("spark.executor.memory","500m")    val sc = new SparkContext(conf)  //3.2 Load ratings data and know your data  // ratings.dat 的格式 UserID::MovieID::Rating::Timestamp    val movieLensHomeDir=args(0)    // RDD[long, Rating]    val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map {line =>       val fields = line.split("::")      //timestamp, user, product, rating      // 取模成分成10组      (fields(3).toLong%10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))    }    // movies.dat格式 MovieID::Title::Genres    // Map[Int,String]    val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map {line =>      val fields = line.split("::")      //movieId, movieName      (fields(0).toInt, fields(1))    }.collectAsMap()    val numRatings = ratings.count()    val numUser = ratings.map(x=>x._2.user).distinct().count()    val numMovie = ratings.map(_._2.product).distinct().count()    println("Got "+numRatings+" ratings from "+numUser+" users on "+numMovie+" movies.")  //3.3 Elicitate personal rating    // = RDD[(long,Rating) -> Array[int] -> Map[Int, long] -> Seq[(Int, long)] -> Seq[(Int,long)] ->  Seq[Int]    val topMovies = ratings.map(_._2.product).countByValue().toSeq.sortBy(-_._2).take(50).map(_._1)    val random = new Random(0)    // Seq[(Int, String)]    val selectMovies = topMovies.filter(x=>random.nextDouble() < 0.2).map(x=>(x, movies(x)))    val myRatings = elicitateRating(selectMovies)    val myRatingsRDD = sc.parallelize(myRatings, 1)  //3.4 Split data into train(60%), validation(20%) and test(20%)    val numPartitions = 10        // 6组(即60%),并上手工输入评价        val trainSet = ratings.filter(x=>x._1<6).map(_._2).union(myRatingsRDD).repartition(numPartitions).persist()    val validationSet = ratings.filter(x=>x._1>=6 && x._1<8).map(_._2).persist()    val testSet = ratings.filter(x=>x._1>=8).map(_._2).persist()    val numTrain = trainSet.count()    val numValidation = validationSet.count()    val numTest = testSet.count()    println("Training data: "+numTrain+" Validation data: "+numValidation+" Test data: "+numTest)  //3.5 Train model and optimize model with validation set    val numRanks = List(8, 12)    val numIters = List(10, 20)    val numLambdas = List(0.1, 10.0)    var bestRmse = Double.MaxValue    var bestModel: Option[MatrixFactorizationModel] = None    var bestRanks = -1    var bestIters = 0    var bestLambdas = -1.0    // 寻找优化参数的模型    for(rank <- numRanks; iter <- numIters; lambda <- numLambdas){      val model = ALS.train(trainSet, rank, iter, lambda)      val validationRmse = computeRmse(model, validationSet)      println("RMSE(validation) = "+validationRmse+" with ranks="+rank+", iter="+iter+", Lambda="+lambda)      if (validationRmse < bestRmse) {        bestModel = Some(model)        bestRmse = validationRmse        bestIters = iter        bestLambdas = lambda        bestRanks = rank      }    }    //3.6 Evaluate model on test set    // 用测试集来评估模型    // 测试集均方根差    val testRmse = computeRmse(bestModel.get, testSet)    println("The best model was trained with rank="+bestRanks+", Iter="+bestIters+", Lambda="+bestLambdas+      " and compute RMSE on test is "+testRmse)    //3.7 Create a baseline and compare it with best model    // 创建基线 并与模型进行比较    val meanRating = trainSet.union(validationSet).map(_.rating).mean() // 训练集与验证集和的均数    // 最佳根均方错误线(基线)    val bestlineRmse = new RegressionMetrics(testSet.map(x=>(x.rating, meanRating))).rootMeanSquaredError  // 测试集与均数的均方根差    // testRmse(这个数应该更优,值更小)    val improvement = (bestlineRmse - testRmse)/bestlineRmse*100    println("The best model improves the baseline by "+"%1.2f".format(improvement)+"%.")    //3.8 Make a personal recommendation    // 进行个人推荐, 排除自己已经评分内容    val moviesId = myRatings.map(_.product)    val candidates = sc.parallelize(movies.keys.filter(!moviesId.contains(_)).toSeq)    val recommendations = bestModel.get    .predict(candidates.map(x=>(0, x)))    .sortBy(-_.rating)    .take(50)    var i = 0    println("Movies recommended for you:")    recommendations.foreach{ line=>      println("%2d".format(i)+" :"+movies(line.product))      i += 1    }  sc.stop()  }}

导入引用库

4577-20180516191959516-36087936.png

三、打包部署

程序运行时,需要指定输入数据路径,数据包含了ratings.dat和movies.dat,数据都包含在了一个数据包。, 然后解压。

配置运行参数

  • 点击edit configuration,在左侧点击该项目。在右侧在右侧VM options中输入“-Dspark.master=local”,指示本程序本地单线程运行
  • 在Program argguemnts指定,上面解压的路径。

    然后,在IDEA上选择MovieLensALS右键选择运行,即可运行了。
    按照引导,输入自己的评价后,最后输出形式如下:

    The best model was trained with rank=12, Iter=20, Lambda=0.1 and compute RMSE on test is 0.868464888081759

    The best model improves the baseline by 22.01%.
    Movies recommended for you:
    0 :Julien Donkey-Boy (1999)
    1 :Love Serenade (1996)
    2 :Catwalk (1995)

四、HADOOP集群部署

导出jar包设置

4577-20180516194855213-280074370.png

选main类对后,点击OK确定, 这个时候配置已经完成了, 我们就可以进行编译 jar文件了, 选择菜单Build->Build Artifacts..., 生成的文件路径为/out/artifacts/MovieLensALS_jar/MovieLensALS.jar

准备HADOOP环境

假设我们的HADOOP环境已经。 接下来我们要把需要计算的数据文件上传到hadoop; 首先,在hadoop上面创建文件夹,命令如下:

hdfs dfs -mkdir -p /recommendation/data

上传数据文件命令如下:

hdfs dfs -put *.dat /recommendation/data

这时时候我们可以通过命令查看,上传是否成功

hdfs dfs -cat /recommendation/data/users.dat

运行

4577-20180516210348275-2132994026.png

在上面红框中,指定了生成的jar文件名, 所在路径, 以及MainClass。这面就是通过spark执行:

/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --master local  --class "xyz.pl8.MovieLensALS" /home/hartifacts/movielensals_jar/movielensals.jar /recommendation/data

转载于:https://www.cnblogs.com/freebird92/p/9047575.html

你可能感兴趣的文章
机器学习实战_一个完整的程序(一)
查看>>
Web框架的常用架构模式(JavaScript语言)
查看>>
如何用UPA优化性能?先读懂这份报告!
查看>>
这些Java面试题必须会-----鲁迅
查看>>
Linux 常用命令
查看>>
CSS盒模型
查看>>
ng2路由延时加载模块
查看>>
使用GitHub的十个最佳实践
查看>>
脱离“体验”和“安全”谈盈利的游戏运营 都是耍流氓
查看>>
慎用!BLEU评价NLP文本输出质量存在严重问题
查看>>
JAVA的优势就是劣势啊!
查看>>
ELK实战之logstash部署及基本语法
查看>>
帧中继环境下ospf的使用(点到点模式)
查看>>
BeanShell变量和方法的作用域
查看>>
LINUX下防恶意扫描软件PortSentry
查看>>
由数据库对sql的执行说JDBC的Statement和PreparedStatement
查看>>
springmvc+swagger2
查看>>
软件评测-信息安全-应用安全-资源控制-用户登录限制(上)
查看>>
我的友情链接
查看>>
Java Web Application 自架构 一 注解化配置
查看>>