代码语言
.
CSharp
.
JS
Java
Asp.Net
C
MSSQL
PHP
Css
PLSQL
Python
Shell
EBS
ASP
Perl
ObjC
VB.Net
VBS
MYSQL
GO
Delphi
AS
DB2
Domino
Rails
ActionScript
Scala
代码分类
文件
系统
字符串
数据库
网络相关
图形/GUI
多媒体
算法
游戏
Jquery
Extjs
Android
HTML5
菜单
网页交互
WinForm
控件
企业应用
安全与加密
脚本/批处理
开放平台
其它
【
Spark
】
Spark MLlib之协同过滤
作者:
/ 发布于
2017/5/22
/
359
Spark MLlib之协同过滤
import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import scala.Tuple2; public class SparkMLlibColbFilter { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example"); JavaSparkContext sc = new JavaSparkContext(conf); // Load and parse the data String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/als/test.data"; JavaRDD<String> data = sc.textFile(path); JavaRDD<Rating> ratings = data.map(new Function<String, Rating>() { @Override public Rating call(String s) throws Exception { String[] sarray = s.split(","); return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), Double.parseDouble(sarray[2])); } }); // Build the recommendation model using ALS int rank = 10; int numIterations = 10; MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(new Function<Rating, Tuple2<Object, Object>>() { @Override public Tuple2<Object, Object> call(Rating r) throws Exception { return new Tuple2<Object, Object>(r.user(), r.product()); } }); JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD( model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { @Override public Tuple2<Tuple2<Integer, Integer>, Double> call( Rating r) throws Exception { return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()); } })); JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map( new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() { @Override public Tuple2<Tuple2<Integer, Integer>, Double> call( Rating r) throws Exception { return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()); } })).join(predictions).values(); double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(new Function<Tuple2<Double, Double>, Object>() { @Override public Object call(Tuple2<Double, Double> pair) throws Exception { return Math.pow((pair._1() - pair._2()),2); } }).rdd()).mean(); System.out.println("Mean Squared Error = " + MSE); // Save and load model model.save(sc.sc(), "target/tmp/myCollaborativeFilter"); MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(sc.sc(), "target/tmp/myCollaborativeFilter"); //为每个用户进行推荐,推荐的结果可以以用户id为key,结果为value存入redis或者hbase中 List<String> users = data.map(new Function<String, String>() { @Override public String call(String s) throws Exception { String[] sarray = s.split(","); return sarray[0]; } }).distinct().collect(); for (String user : users) { Rating[] rs = model.recommendProducts(Integer.parseInt(user), numIterations); String value = ""; int key = 0; for (Rating r : rs) { key = r.user(); value = value + r.product() + ":" + r.rating() + "," ; } System.out.println(key + " " + value); } } } 协同过滤ALS算法推荐过程如下: 加载数据到 ratings RDD,每行记录包括:user, product, rate 从 ratings 得到用户商品的数据集:(user, product) 使用ALS对 ratings 进行训练 通过 model 对用户商品进行预测评分:((user, product), rate) 从 ratings 得到用户商品的实际评分:((user, product), rate) 合并预测评分和实际评分的两个数据集,并求均方差
试试其它关键字
同语言下
.
spark多路输出
.
Spark MLlib之KMeans
.
Spark MLlib之协同过滤
可能有用的
.
spark多路输出
.
Spark MLlib之KMeans
.
Spark MLlib之协同过滤
贡献的其它代码
Label
Copyright © 2004 - 2024 dezai.cn. All Rights Reserved
站长博客
粤ICP备13059550号-3