文章出處

此前用自己實現的隨機森林算法,應用在titanic生還者預測的數據集上。事實上,有很多開源的算法包供我們使用。無論是本地的機器學習算法包sklearn 還是分布式的spark mllib,都是非常不錯的選擇。
  Spark是目前比較流行的分布式計算解決方案,同時支持集群模式和本地單機模式。由于其通過scala語言開發,原生支持scala,同時由于python在科學計算等領域的廣泛應用,Spark也提供了python的接口。

Spark的常用操作詳見官方文檔:
http://spark.apache.org/docs/latest/programming-guide.html


在終端下面鍵入如下命令,切換到spark的目錄,進入相應的環境:
cd $SPARK_HOME

cd ./bin

./pyspark

可以看到,出現了python 的版本號以及spark的logo

 

 

 

此時,仍然是輸入一句,運行一句并輸出。可以事先編輯好腳本保存為filename然后:

./spark-submit filename

 

下面給出詳細的代碼:

 

 

[python] view plain copy
 
print?
  1. import pandas as pd  
  2. import numpy as np  
  3. from pyspark.mllib.regression import LabeledPoint  
  4. from pyspark.mllib.tree import RandomForest  
  5.   
  6.   
  7. #將類別數量大于2的類別型變量進行重新編碼,并把數據集變成labeledPoint格式  
  8. #df=pd.read_csv('/home/kim/t.txt',index_col=0)  
  9. #for col in ['Pclass','embrk']:  
  10. #    values=df[col].drop_duplicates()  
  11. #    for v in values:      
  12. #        col_name=col+str(v)  
  13. #        df[col_name]=(df[col]==v)  
  14. #        df[col_name]=df[col_name].apply(lambda x:int(x))  
  15. #df=df.drop(['Pclass','embrk'],axis=1)  
  16. #df.to_csv('train_data')  
  17.   
  18. #讀入數據集變成彈性分布式數據集RDD ,由于是有監督學習,需要轉換為模型輸入的格式LabeledPoint  
  19. rdd=pyspark.SparkContext.textFile('/home/kim/train')  
  20. train=rdd.map(lambda x:x.split(',')[1])  
  21. train=train.map(lambda line:LabeledPoint(line[1],line[2:]))  
  22.   
  23. #模型訓練  
  24. model=RandomForest.trainClassifier\  
  25. (train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,\  
  26. featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)  
  27.   
  28. #包含LabeledPoint對象的RDD,應用features方法返回其輸入變量的值,label方法返回其真實類別  
  29. data_p=train.map(lambda lp:lp.features)  
  30. v=train.map(lambda lp:lp.label)  
  31. prediction=model.predict(data_p)  
  32. vp=v.zip(prediction)  
  33.   
  34. #最后輸出模型在訓練集上的正確率  
  35. MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()  
  36. print("MEAN SQURE ERROR: "+str(MSE))  
import pandas as pd
import numpy as np
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest


#將類別數量大于2的類別型變量進行重新編碼,并把數據集變成labeledPoint格式
#df=pd.read_csv('/home/kim/t.txt',index_col=0)
#for col in ['Pclass','embrk']:
#    values=df[col].drop_duplicates()
#    for v in values:    
#        col_name=col+str(v)
#        df[col_name]=(df[col]==v)
#        df[col_name]=df[col_name].apply(lambda x:int(x))
#df=df.drop(['Pclass','embrk'],axis=1)
#df.to_csv('train_data')

#讀入數據集變成彈性分布式數據集RDD ,由于是有監督學習,需要轉換為模型輸入的格式LabeledPoint
rdd=pyspark.SparkContext.textFile('/home/kim/train')
train=rdd.map(lambda x:x.split(',')[1])
train=train.map(lambda line:LabeledPoint(line[1],line[2:]))

#模型訓練
model=RandomForest.trainClassifier\
(train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,\
featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)

#包含LabeledPoint對象的RDD,應用features方法返回其輸入變量的值,label方法返回其真實類別
data_p=train.map(lambda lp:lp.features)
v=train.map(lambda lp:lp.label)
prediction=model.predict(data_p)
vp=v.zip(prediction)

#最后輸出模型在訓練集上的正確率
MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()
print("MEAN SQURE ERROR: "+str(MSE))

 

 

后面可以多加測試,例如:

使用更大規模的數據集;

將數據集劃分為訓練集測試集,在訓練集上建模在測試集上評估模型性能;

使用mllib里面的其他算法并比較效果,等等

 

歡迎大家與我交流!


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()