文章出處
文章列表
此前用自己實現的隨機森林算法,應用在titanic生還者預測的數據集上。事實上,有很多開源的算法包供我們使用。無論是本地的機器學習算法包sklearn 還是分布式的spark mllib,都是非常不錯的選擇。
Spark是目前比較流行的分布式計算解決方案,同時支持集群模式和本地單機模式。由于其通過scala語言開發,原生支持scala,同時由于python在科學計算等領域的廣泛應用,Spark也提供了python的接口。
Spark的常用操作詳見官方文檔:
http://spark.apache.org/docs/latest/programming-guide.html
在終端下面鍵入如下命令,切換到spark的目錄,進入相應的環境:
cd $SPARK_HOME
cd ./bin
./pyspark
可以看到,出現了python 的版本號以及spark的logo
此時,仍然是輸入一句,運行一句并輸出。可以事先編輯好腳本保存為filename然后:
./spark-submit filename
下面給出詳細的代碼:
- import pandas as pd
- import numpy as np
- from pyspark.mllib.regression import LabeledPoint
- from pyspark.mllib.tree import RandomForest
- #將類別數量大于2的類別型變量進行重新編碼,并把數據集變成labeledPoint格式
- #df=pd.read_csv('/home/kim/t.txt',index_col=0)
- #for col in ['Pclass','embrk']:
- # values=df[col].drop_duplicates()
- # for v in values:
- # col_name=col+str(v)
- # df[col_name]=(df[col]==v)
- # df[col_name]=df[col_name].apply(lambda x:int(x))
- #df=df.drop(['Pclass','embrk'],axis=1)
- #df.to_csv('train_data')
- #讀入數據集變成彈性分布式數據集RDD ,由于是有監督學習,需要轉換為模型輸入的格式LabeledPoint
- rdd=pyspark.SparkContext.textFile('/home/kim/train')
- train=rdd.map(lambda x:x.split(',')[1])
- train=train.map(lambda line:LabeledPoint(line[1],line[2:]))
- #模型訓練
- model=RandomForest.trainClassifier\
- (train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,\
- featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)
- #包含LabeledPoint對象的RDD,應用features方法返回其輸入變量的值,label方法返回其真實類別
- data_p=train.map(lambda lp:lp.features)
- v=train.map(lambda lp:lp.label)
- prediction=model.predict(data_p)
- vp=v.zip(prediction)
- #最后輸出模型在訓練集上的正確率
- MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()
- print("MEAN SQURE ERROR: "+str(MSE))
import pandas as pd import numpy as np from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.tree import RandomForest #將類別數量大于2的類別型變量進行重新編碼,并把數據集變成labeledPoint格式 #df=pd.read_csv('/home/kim/t.txt',index_col=0) #for col in ['Pclass','embrk']: # values=df[col].drop_duplicates() # for v in values: # col_name=col+str(v) # df[col_name]=(df[col]==v) # df[col_name]=df[col_name].apply(lambda x:int(x)) #df=df.drop(['Pclass','embrk'],axis=1) #df.to_csv('train_data') #讀入數據集變成彈性分布式數據集RDD ,由于是有監督學習,需要轉換為模型輸入的格式LabeledPoint rdd=pyspark.SparkContext.textFile('/home/kim/train') train=rdd.map(lambda x:x.split(',')[1]) train=train.map(lambda line:LabeledPoint(line[1],line[2:])) #模型訓練 model=RandomForest.trainClassifier\ (train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,\ featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32) #包含LabeledPoint對象的RDD,應用features方法返回其輸入變量的值,label方法返回其真實類別 data_p=train.map(lambda lp:lp.features) v=train.map(lambda lp:lp.label) prediction=model.predict(data_p) vp=v.zip(prediction) #最后輸出模型在訓練集上的正確率 MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count() print("MEAN SQURE ERROR: "+str(MSE))
后面可以多加測試,例如:
使用更大規模的數據集;
將數據集劃分為訓練集測試集,在訓練集上建模在測試集上評估模型性能;
使用mllib里面的其他算法并比較效果,等等
歡迎大家與我交流!
文章列表
全站熱搜