文章出處
原文轉自:http://www.cnblogs.com/charlotte77/p/5412709.html
文章列表
《Learning Spark》這本書算是Spark入門的必讀書了,中文版是《Spark快速大數據分析》,不過豆瓣書評很有意思的是,英文原版評分7.4,評論都說入門而已深入不足,中文譯版評分8.4,評論一片好評,有點意思。我倒覺得這本書可以作為官方文檔的一個補充,刷完后基本上對Spark的一些基本概念、碼簡單的程序是沒有問題的了。這本書有一個好處是它是用三門語言寫的,Python/Java/Scala,所以適用性很廣,我的觀點是,先精通一門語言,再去學其他語言。由于我工作中比較常用的是Python,所以就用把Python相關的命令總結一下。下一階段再深入學習Java和Scala。這一篇總結第一章-第三章的重點內容。
說到Spark,就不得不提到RDD,RDD,字面意思是彈性分布式數據集,其實就是分布式的元素集合。Python的基本內置的數據類型有整型、字符串、元祖、列表、字典,布爾類型等,而Spark的數據類型只有RDD這一種,在Spark里,對數據的所有操作,基本上就是圍繞RDD來的,譬如創建、轉換、求值等等。所有RDD的轉換都是lazy(惰性求值)的,RDD的轉換操作會生成新的RDD,新的RDD的數據依賴于原來的RDD的數據,每個RDD又包含多個分區。那么一段程序實際上就構造了一個由相互依賴的多個RDD組成的有向無環圖(DAG)。并通過在RDD上執行動作將這個有向無環圖作為一個Job提交給Spark執行。理解RDD后可以避免以后走很多彎路。關于RDD的特點,可以搜到很多資料,其實我們只需要理解兩點就可以了:
1.不可變
2.分布式
有人會覺得很奇怪,如果RDD不可變,那么在進行數據操作的時候,怎么改變它的值,怎么進行計算呢?其實RDD支持兩種操作:
1.Tansformation(轉化操作):返回值還是一個RDD
2.Action(行動操作):返回值不是一個RDD
第一種Transformation是返回一個新的RDD,如map(),filter()等。這種操作是lazy(惰性)的,即從一個RDD轉換生成另一個RDD的操作不是馬上執行,只是記錄下來,只有等到有Action操作是才會真正啟動計算,將生成的新RDD寫到內存或hdfs里,不會對原有的RDD的值進行改變。而Action操作才會實際觸發Spark計算,對RDD計算出一個結果,并把結果返回到內存或hdfs中,如count(),first()等。
通俗點理解的話,就是假設你寫了一堆程序,里面對數據進行了多次轉換,這個時候實際上沒有計算,就只是放著這里。在最后出結果的時候會用到Action操作,這個時候Action會執行與之相關的轉換操作,運算速度會非常快(一是Action不一定需要調用所有的transformation操作,二是只有在最后一步才會計算相關的transformation操作)。如果Transformation沒有lazy性質的話,每轉換一次就要計算一次,最后Action操作的時候還要計算一次,會非常耗內存,也會極大降低計算速度。
還有一種情況,如果我們想多次使用同一個RDD,每次都對RDD進行Action操作的話,會極大的消耗Spark的內存,這種情況下,我們可以使用RDD.persist()把這個RDD緩存下來,在內存不足時,可以存儲到磁盤(disk)里。在Python中,儲存的對象永遠是通過Pickle庫序列化過的,所以社不設置序列化級別不會產生影響。
RDD的性質和操作方式講完了,現在來說說怎么創建RDD,有兩種方式
1.讀取一個外部數據集
2.在內存中對一個集合進行并行化(parallelize)
第二種方式相對來說更簡單,你可以直接在shell里快速創建RDD,舉個例子:
1 A = [1,2,3,4,5] 2 lines = sc.parallelize(A) 3 #另一種方式 4 lines = sc.parallelize([1,2,3,4,5])
但是這種方式并不是很好,因為你需要把你的整個數據集放在內存里,如果數據量比較大,會很占內存。所以,可以在測試的時候用這種方式,簡單快速。
讀取外部數據及時需要用到SparkContext.textFile()
1 lines = sc.textFile("README.md")
RDD的操作命令很多,包括map(),filter()等Transformation操作以及reduce(),fold(),aggregate()等Action操作。
- 常見的Transformation操作:
map( )和flatMap( )的聯系和區別
map( ):接收一個函數,應用到RDD中的每個元素,然后為每一條輸入返回一個對象。 filter( ):接收一個函數,將函數的元素放入新的RDD中返回。 flatMap( ):接收一個函數,應用到RDD中的每個元素,返回一個包含可迭代的類型(如list等)的RDD,可以理解為先Map(),后flat().
用一個圖可以很清楚的理解:

偽集合操作:
1 distinct( )、union( )、intersection( )、subtract( ) 2 distinct( ):去重 3 union( ):兩個RDD的并集 4 intersection( ):兩個RDD的交集 5 subtract( ):兩個RDD的補集 6 cartesian( ):兩個RDD的笛卡爾積(可以應用于計算相似度中,如計算各用戶對各種產品的預期興趣程度)
注:
1.intersection( )的性能比union( )差很多,因為它需要數據混洗來發現共同數據
2.substract( )也需要數據混洗
- 常見的Action操作:
1 reduce( ):接收一個函數作為參數,這個函數要操作兩個相同元素類型的RDD,也返回一個同樣類型的RDD,可以計算RDD中元素的和、個數、以及其他聚合類型的操作。 2 3 fold( ):和reduce一樣,但需要提供初始值。 4 5 aggregate( ):和fold類似,但通常返回不同類型的函數。 6 7 注:
關于fold()和aggregate(),再說點題外話。fold()只能做同構聚合操作,就是說,如果你有一個RDD[X],通過fold,你只能構造出一個X。但是如果你想通過RDD[X]構造一個Y呢?那就得用到aggregate()了,使用aggregate時,需要提供初始值(初始值的類型與最終返回的類型相同),然后通過一個函數把一RDD的元素合并起來放到累加器里,再提供一個函數將累加器兩兩相加。由此可以看出,fold()需要保證滅個partition能夠獨立進行運算,而aggregate()對于不同partition(分區)提交的最終結果專門定義了一個函數來進行處理。
RDD還有很多其他的操作命令,譬如collect(),count(),take(),top(),countByValue(),foreach()等,限于篇幅,就不一一表述了。
最后來講講如何向Spark傳遞函數:
兩種方式:
1.簡單的函數:lambda表達式。
適合比較短的函數,不支持多語句函數和無返回值的語句。
2.def函數
會將整個對象傳遞過去,但是最好不要傳遞一個帶字段引用的函數。如果你傳遞的對象是某個對象的成員,或者在某個函數中引用了一個整個字段,會報錯。舉個例子:
1 class MyClass(object): 2 def __init__(self): 3 self.field = “Hello” 4 5 def doStuff(self, rdd): 6 #報錯:因為在self.field中引用了整個self 7 return rdd.map(lambda s: self.field + x)
解決方法:直接把你需要的字段拿出來放到一個局部變量里,然后傳遞這個局部變量就可以了。
1 class MyClass(object): 2 def __init__(self): 3 self.field = “Hello” 4 5 def doStuff(self, rdd): 6 #將需要的字段提取到局部變量中即可 7 field = self.field 8 return rdd.map(lambda s: field + x)
前面三章講了Spark的基本概念和RDD的特性以及一些簡單的命令,比較簡單。后面三章主要講了鍵值對操作、數據的讀取和保存以及累加器、廣播變量等,下周再更新。
原文轉自:http://www.cnblogs.com/charlotte77/p/5412709.html
文章列表
全站熱搜