-
- 例子1
object RunRF {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("rf")
val sc = new SparkContext(sparkConf)
//读取数据
val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv")
val data = rawData.map{ line =>
val values = line.split(",").map(_.toDouble)
//init返回除了最后一个元素的所有元素,作为特征向量
//Vectors.dense向量化,dense密集型
val feature = Vectors.dense(values.init)
val label = values.last
LabeledPoint(label, feature)
}
//训练集、交叉验证集和测试集,各占80%,10%和10%
//10%的交叉验证数据集的作用是确定在训练数据集上训练出来的模型的最好参数
//测试数据集的作用是评估CV数据集的最好参数
val Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))
trainData.cache()
cvData.cache()
testData.cache()
//构建随机森林
val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", "gini", 4, 32)
val metrics = getMetrics(model, cvData)
println("-----------------------------------------confusionMatrix-----------------------------------------------------")
//混淆矩阵和模型精确率
println(metrics.confusionMatrix)
println("---------------------------------------------precision-------------------------------------------------")
println(metrics.precision)
println("-----------------------------------------(precision,recall)---------------------------------------------------")
//每个类别对应的精确率与召回率
(0 until 2).map(target => (metrics.precision(target), metrics.recall(target))).foreach(println)
//保存模型
model.save(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")
}
/**
* model 随机森林模型
* data 用于交叉验证的数据集
* */
def getMetrics(model: RandomForestModel, data: RDD[LabeledPoint]): MulticlassMetrics = {
//将交叉验证数据集的每个样本的特征向量交给模型预测,并和原本正确的目标特征组成一个tuple
val predictionsAndLables = data.map { d =>
(model.predict(d.features), d.label)
}
//将结果交给MulticlassMetrics,其可以以不同的方式计算分配器预测的质量
new MulticlassMetrics(predictionsAndLables)
}
/**
* 在训练数据集上得到最好的参数组合
* trainData 训练数据集
* cvData 交叉验证数据集
* */
def getBestParam(trainData: RDD[LabeledPoint], cvData: RDD[LabeledPoint]): Unit = {
val evaluations = for (impurity <- Array("gini", "entropy");
depth <- Array(1, 20);
bins <- Array(10, 300)) yield {
val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", impurity, depth, bins)
// 2:classes
// 20: numTrees
// auto:subSampleStratry
val metrics = getMetrics(model, cvData)
((impurity, depth, bins), metrics.precision)
}
evaluations.sortBy(_._2).reverse.foreach(println)
}
/**
* 模拟对新数据进行预测1
*/
val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv")
val pdata = rawData.map{ line =>
val values = line.split(",").map(_.toDouble)
//转化为向量并去掉标签(init去掉最后一个元素,即去掉标签)
val feature = Vectors.dense(values.init)
feature
}
//读取模型
val rfModel = RandomForestModel.load(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")
//进行预测
val preLabel = rfModel.predict(pdata)
preLabel.take(10)
/**
* 模拟对新数据进行预测2
*
*/
val dataAndPreLable = rawData.map{ line =>
//转化为向量并去掉标签(init去掉最后一个元素,即去掉标签)
val vecData = Vectors.dense(line.split(",").map(_.toDouble).init)
val preLabel = rfModel.predict(vecData)
line + "," + preLabel
}//.saveAsTextFile("....")
dataAndPreLable.take(10)
}
-
- 例子2:处理Hive数据
val hc = new HiveContext(sc)
import hc.implicits._
// 调用HiveContext
// 取样本,样本的第一列为label(0或者1),其他列可能是姓名,手机号,以及真正要参与训练的特征columns
val data = hc.sql(s"""select * from database1.traindata_userprofile""".stripMargin)
//提取schema,也就是表的column name,drop(2)删掉1,2列,只保留特征列
val schema = data.schema.map(f=>s"${f.name}").drop(2)
//ML的VectorAssembler是一个transformer,要求数据类型不能是string,将多列数据转化为单列的向量列,比如把age、income等等字段列合并成一个 userFea 向量列,方便后续训练
val assembler = new VectorAssembler().setInputCols(schema.toArray).setOutputCol("userFea")
val userProfile = assembler.transform(data.na.fill(-1e9)).select("label","userFea")
val data_train = userProfile.na.fill(-1e9)
// 取训练样本
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(userProfile)
val featureIndexer = new VectorIndexer().setInputCol("userFea").setOutputCol("indexedFeatures").setMaxCategories(4).fit(userProfile)
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = userProfile.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
rf.setMaxBins(32).setMaxDepth(6).setNumTrees(90).setMinInstancesPerNode(4).setImpurity("gini")
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
println("training finished!!!!")
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "indexedLabel", "indexedFeatures").show(5)
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))