文章出處

Spark Python API 官方文檔中文版》 之 pyspark.sql (二)

2017-11-04 22:13 by 牛仔褲的夏天, 365 閱讀, 0 評論, 收藏編輯

摘要:在Spark開發中,由于需要用Python實現,發現API與Scala的略有不同,而Python API的中文資料相對很少。每次去查英文版API的說明相對比較慢,還是中文版比較容易get到所需,所以利用閑暇之余將官方文檔翻譯為中文版,并親測Demo的代碼。在此記錄一下,希望對那些對Spark感興趣和從事大數據開發的人員提供有價值的中文資料,對PySpark開發人員的工作和學習有所幫助。

官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html            

pyspark.sql module

Module Context

Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分布式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口


pyspark.sql.GroupedData 由DataFrame.groupBy()創建的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法
pyspark.sql.DataFrameStatFunctions 統計功能的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用于處理窗口函數

3.class pyspark.sql.DataFrame(jdf, sql_ctx)

分布式的收集數據分組到命名列中。
一個DataFrame相當于在Spark SQL中一個相關的表,可在SQLContext使用各種方法創建,如:

people = sqlContext.read.parquet("...")

一旦創建, 可以使用在DataFrame、Column中定義的不同的DSL方法操作。
從data frame中返回一列使用對應的方法:

ageCol = people.age

一個更具體的例子:

# To create DataFrame using SQLContext
people = sqlContext.read.parquet("...")
department = sqlContext.read.parquet("...")
people.filter(people.age > 30).join(department, people.deptId == department.id)).groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})

3.1 agg(*exprs)

沒有組的情況下聚集整個DataFrame (df.groupBy.agg()的簡寫)。

復制代碼
>>> l=[('jack',5),('john',4),('tom',2)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]
復制代碼

3.2 alias(alias)

返回一個設置別名的新的DataFrame。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> from pyspark.sql.functions import *
>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
>>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect()
[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]
復制代碼

3.3 cache()

用默認的存儲級別緩存數據(MEMORY_ONLY_SER).

3.4 coalesce(numPartitions)

返回一個有確切的分區數的分區的新的DataFrame。
與在一個RDD上定義的合并類似, 這個操作產生一個窄依賴。 如果從1000個分區到100個分區,不會有shuffle過程, 而是每100個新分區會需要當前分區的10個。

>>> df.coalesce(1).rdd.getNumPartitions()
1

3.5 collect()

返回所有的記錄數為行的列表。

>>> df.collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

3.6 columns

返回所有列名的列表。

>>> df.columns
['age', 'name']

3.7 corr(col1, col2, method=None)

計算一個DataFrame相關的兩列為double值。通常只支持皮爾森相關系數。DataFrame.corr()和DataFrameStatFunctions.corr()類似。
參數:●  col1 – 第一列的名稱  
      ●  col2 – 第二列的名稱
           ●  method – 相關方法.當前只支持皮爾森相關系數

3.8 count()

返回DataFrame的行數。

>>> df.count()
2

3.9 cov(col1, col2)

計算由列名指定列的樣本協方差為double值。DataFrame.cov()和DataFrameStatFunctions.cov()類似。
參數:●  col1 – 第一列的名稱
      ●  col2 – 第二列的名稱

3.10 crosstab(col1, col2)

計算給定列的分組頻數表,也稱為相關表。每一列的去重值的個數應該小于1e4.最多返回1e6個非零對.每一行的第一列會是col1的去重值,列名稱是col2的去重值。第一列的名稱是$col1_$col2. 沒有出現的配對將以零作為計數。DataFrame.crosstab() and DataFrameStatFunctions.crosstab()類似。
參數:●  col1 – 第一列的名稱. 去重項作為每行的第一項。
      ●  col2 – 第二列的名稱. 去重項作為DataFrame的列名稱。

3.11 cube(*cols)

創建使用指定列的當前DataFrame的多維立方體,這樣可以聚合這些數據。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.cube('name', df.age).count().show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|   2|    1|
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|   5|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+
復制代碼

3.12 describe(*cols)

計算數值列的統計信息。
包括計數,平均,標準差,最小和最大。如果沒有指定任何列,這個函數計算統計所有數值列。

復制代碼
>>> df.describe().show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               3.5|
| stddev|2.1213203435596424|
|    min|                 2|
|    max|                 5|
+-------+------------------+
>>> df.describe(['age', 'name']).show()
+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 2|    2|
|   mean|               3.5| null|
| stddev|2.1213203435596424| null|
|    min|                 2|Alice|
|    max|                 5|  Bob|
+-------+------------------+-----+
復制代碼

3.13 distinct()

返回行去重的新的DataFrame。

>>> l=[('Alice',2),('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.distinct().count()
2

3.14 drop(col)

返回刪除指定列的新的DataFrame。
參數:●  col – 要刪除列的字符串類型名稱,或者要刪除的列。

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')] 
>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
復制代碼
>>> l1=[('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> l2=[('Bob',85)]
>>> df2 = sqlContext.createDataFrame(l2,['name','height'])
>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]
復制代碼

3.15 dropDuplicates(subset=None)

返回去掉重復行的一個新的DataFrame,通常只考慮某幾列。
drop_duplicates()和dropDuplicates()類似。

復制代碼
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
復制代碼
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

3.16 drop_duplicates(subset=None)

與以上相同。

3.17 dropna(how='any', thresh=None, subset=None)

返回一個刪除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()類似。
參數:●  how – 'any'或者'all'。如果'any',刪除包含任何空值的行。如果'all',刪除所有值為null的行。
   ●  thresh – int,默認為None,如果指定這個值,刪除小于閾值的非空值的行。這個會重寫'how'參數。
   ●  subset – 選擇的列名稱列表。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> dfnew = df.cube('name', df.age).count()
>>> dfnew.show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|   2|    1|
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|   5|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+
>>> dfnew.na.drop().show()
+-----+---+-----+
| name|age|count|
+-----+---+-----+
|  Bob|  5|    1|
|Alice|  2|    1|
+-----+---+-----+
復制代碼

3.18 dtypes

返回所有列名及類型的列表。

>>> df.dtypes
[('age', 'int'), ('name', 'string')]

3.19 explain(extended=False)

將(邏輯和物理)計劃打印到控制臺以進行調試。
參數:●  extended – boolean類型,默認為False。如果為False,只打印物理計劃。

>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
復制代碼
>>> df.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
復制代碼

3.20 fillna(value, subset=None)

替換空值,和na.fill()類似,DataFrame.fillna()和dataframenafunctions.fill()類似。
參數:●  value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset參數將被忽略。值必須是要替換的列的映射,替換值必須是int,long,float或者string.
      ●  subset - 要替換的列名列表。在subset指定的列,沒有對應數據類型的會被忽略。例如,如果值是字符串,subset包含一個非字符串的列,這個非字符串的值會被忽略。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> dfnew = df.cube('name', df.age).count()
>>> dfnew.show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|   2|    1|
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|   5|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+
>>> dfnew.na.fill(50).show()
+-----+---+-----+
| name|age|count|
+-----+---+-----+
| null|  2|    1|
|Alice| 50|    1|
|  Bob|  5|    1|
|  Bob| 50|    1|
| null|  5|    1|
| null| 50|    2|
|Alice|  2|    1|
+-----+---+-----+
>>> dfnew.na.fill({'age': 50, 'name': 'unknown'}).show()
+-------+---+-----+
|   name|age|count|
+-------+---+-----+
|unknown|  2|    1|
|  Alice| 50|    1|
|    Bob|  5|    1|
|    Bob| 50|    1|
|unknown|  5|    1|
|unknown| 50|    2|
|  Alice|  2|    1|
+-------+---+-----+
復制代碼

3.21 filter(condition)

用給定的條件過濾行。
where()和filter()類似。
參數:●  條件 - 一個列的bool類型或字符串的SQL表達式。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
復制代碼

3.22 first()

返回第一行。

>>> df.first()
Row(age=2, name=u'Alice')

3.23 flatMap(f)

返回在每行應用F函數后的新的RDD,然后將結果壓扁。
是df.rdd.flatMap()的簡寫。

>>> df.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']

3.24 foreach(f)

應用f函數到DataFrame的所有行。
是df.rdd.foreach()的簡寫。

>>> def f(person):
...     print(person.name)
>>> df.foreach(f)
Alice
Bob

3.25 foreachPartition(f)

應用f函數到DataFrame的每一個分區。
是 df.rdd.foreachPartition()的縮寫。

復制代碼
>>> def f(people):
...     for person in people:
...         print(person.name)
>>> df.foreachPartition(f)
Alice
Bob
復制代碼

3.26 freqItems(cols, support=None)

參數:●  cols – 要計算重復項的列名,為字符串類型的列表或者元祖。
      ●  support – 要計算頻率項的頻率值。默認是1%。參數必須大于1e-4.

3.27 groupBy(*cols)

使用指定的列分組DataFrame,這樣可以聚合計算。可以從GroupedData查看所有可用的聚合方法。 
groupby()和groupBy()類似。
參數:●  cols – 分組依據的列。每一項應該是一個字符串的列名或者列的表達式。

復制代碼
>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> df.groupBy(df.name).avg().collect()
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> df.groupBy(['name', df.age]).count().collect()
[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]
復制代碼

3.28 groupby(*cols)

和以上一致

3.29 head(n=None)

返回前n行
參數:●  n – int類型,默認為1,要返回的行數。
返回值: 如果n大于1,返回行列表,如果n為1,返回單獨的一行。

>>> df.head()
Row(age=2, name=u'Alice')
>>> df.head(1)
[Row(age=2, name=u'Alice')]

3.30 insertInto(tableName, overwrite=False)

插入DataFrame內容到指定表。
注:在1.4中已過時,使用DataFrameWriter.insertInto()代替。

3.31 intersect(other)

返回新的DataFrame,包含僅同時在當前框和另一個框的行。
相當于SQL中的交集。

3.32 intersect(other)

如果collect()和take()方法可以運行在本地(不需要Spark executors)那么返回True

3.33 join(other, on=None, how=None)

使用給定的關聯表達式,關聯另一個DataFrame。
以下執行df1和df2之間完整的外連接。
參數:● other – 連接的右側
   ● on – 一個連接的列名稱字符串, 列名稱列表,一個連接表達式(列)或者列的列表。如果on參數是一個字符串或者字符串列表,表示連接列的名稱,這些名稱必須同時存在join的兩個表中, 這樣執行的是一個等價連接。
   ● how – 字符串,默認'inner'。inner,outer,left_outer,right_outer,leftsemi之一。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> l2=[('Tom',80),('Bob',85)]
>>> df2 = sqlContext.createDataFrame(l2,['name','height'])
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]
>>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
[Row(name=u'Tom', height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]
>>> l3=[('Alice',2,60),('Bob',5,80)]
>>> df3 = sqlContext.createDataFrame(l3,['name','age','height'])
>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name=u'Bob', height=85)]
>>> l4=[('Alice',1),('Bob',5)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age'])
>>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
[Row(name=u'Bob', age=5)]

3.34 limit(num)

將結果計數限制為指定的數字。

>>> df.limit(1).collect()
[Row(age=2, name=u'Alice')]
>>> df.limit(0).collect()
[]

3.35 map(f)

通過每行應用f函數返回新的RDD。
是 df.rdd.map()的縮寫。

>>> df.map(lambda p: p.name).collect()
[u'Alice', u'Bob']

3.36 mapPartitions(f, preservesPartitioning=False)

通過每個分區應用f函數返回新的RDD
是df.rdd.mapPartitions()的縮寫。

>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
...
>>> rdd.mapPartitions(f).sum()
4

3.37 na

返回DataFrameNaFunctions用于處理缺失值。

3.38 orderBy(*cols, **kwargs)

返回按照指定列排序的新的DataFrame。
參數:● cols – 用來排序的列或列名稱的列表。
      ● ascending – 布爾值或布爾值列表(默認 True). 升序排序與降序排序。指定多個排序順序的列表。如果指定列表, 列表的長度必須等于列的長度。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.sort(df.age.desc()).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.sort("age", ascending=False).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.orderBy(df.age.desc()).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.orderBy(desc("age"), "name").collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
復制代碼

3.39 persist(storageLevel=StorageLevel(False, True, False, False, 1))

設置存儲級別以在第一次操作運行完成后保存其值。這只能用來分配新的存儲級別,如果RDD沒有設置存儲級別的話。如果沒有指定存儲級別,默認為(memory_only_ser)。

3.40 printSchema()

打印schema以樹的格式

>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

3.41 randomSplit(weights, seed=None)

按照提供的權重隨機的劃分DataFrame。
參數:● weights – doubles類型的列表做為權重來劃分DataFrame。權重會被恢復如果總值不到1.0。
       seed – random的隨機數。

復制代碼
>>> l4=[('Alice',1),('Bob',5),('Jack',8),('Tom',10)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age'])
>>> splits = df4.randomSplit([1.0, 2.0],24)
>>> splits[0].count()
1
>>> splits[1].count()
3
復制代碼

3.42 rdd

返回內容為行的RDD。

3.43 registerAsTable(name)

注:在1.4中已過時,使用registerTempTable()代替。

3.44 registerTempTable(name)

使用給定的名字注冊該RDD為臨時表
這個臨時表的有效期與用來創建這個DataFrame的SQLContext相關

>>> df.registerTempTable("people")
>>> df2 = sqlContext.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True

3.45 repartition(numPartitions, *cols)

按照給定的分區表達式分區,返回新的DataFrame。產生的DataFrame是哈希分區。
numPartitions參數可以是一個整數來指定分區數,或者是一個列。如果是一個列,這個列會作為第一個分區列。如果沒有指定,將使用默認的分區數。
1.6版本修改: 添加可選參數可以指定分區列。如果分區列指定的話,numPartitions也是可選的。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.repartition(10).rdd.getNumPartitions()
10
>>> data = df.unionAll(df).repartition("age")
>>> data.show()
+-----+---+
| name|age|
+-----+---+
|Alice|  2|
|Alice|  2|
|  Bob|  5|
|  Bob|  5|
+-----+---+
>>> data = data.repartition(7, "age")
>>> data.show()
+-----+---+
| name|age|
+-----+---+
|  Bob|  5|
|  Bob|  5|
|Alice|  2|
|Alice|  2|
+-----+---+
>>> data.rdd.getNumPartitions()
7
>>> data = data.repartition("name", "age")
>>> data.show()
+-----+---+
| name|age|
+-----+---+
|  Bob|  5|
|  Bob|  5|
|Alice|  2|
|Alice|  2|
+-----+---+
復制代碼

3.46 replace(to_replace, value, subset=None)

返回用另外一個值替換了一個值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace() 類似。
參數:● to_replace – 整形,長整形,浮點型,字符串,或者列表。要替換的值。如果值是字典,那么值會被忽略,to_replace必須是一個從列名(字符串)到要替換的值的映射。要替換的值必須是一個整形,長整形,浮點型,或者字符串。
           ● value – 整形,長整形,浮點型,字符串或者列表。要替換為的值。要替換為的值必須是一個整形,長整形,浮點型,或者字符串。如果值是列表或者元組,值應該和to_replace有相同的長度。
           ● subset – 要考慮替換的列名的可選列表。在subset指定的列如果沒有匹配的數據類型那么將被忽略。例如,如果值是字符串,并且subset參數包含一個非字符串的列,那么非字符串的列被忽略。

復制代碼
>>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age','height'])
>>> df4.na.replace(10, 20).show()
+-----+----+------+
| name| age|height|
+-----+----+------+
|Alice|  20|    80|
|  Bob|   5|  null|
|  Tom|null|  null|
| null|null|  null|
+-----+----+------+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+----+------+
|name| age|height|
+----+----+------+
|   A|  10|    80|
|   B|   5|  null|
| Tom|null|  null|
|null|null|  null|
+----+----+------+
復制代碼

3.47 rollup(*cols)

使用指定的列為當前的DataFrame創建一個多維匯總, 這樣可以聚合這些數據。

復制代碼
>>> l=[('Alice',2,80),('Bob',5,None)]
>>> df = sqlContext.createDataFrame(l,['name','age','height'])
>>> df.rollup('name', df.age).count().show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+
復制代碼

3.48 sample(withReplacement, fraction, seed=None)

返回DataFrame的子集采樣。

>>> df.sample(False, 0.5, 42).count()
2

3.49 sampleBy(col, fractions, seed=None)

根據每個層次上給出的分數,返回沒有替換的分層樣本。
返回沒有替換的分層抽樣 基于每層給定的一小部分 在給定的每層的片段
參數:● col – 定義層的列
           ● fractions – 每層的抽樣數。如果沒有指定層, 將其數目視為0.
           ● seed – 隨機數
返回值: 返回代表分層樣本的新的DataFrame

復制代碼
>>> from pyspark.sql.functions import col
>>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
>>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
>>> sampled.groupBy("key").count().orderBy("key").show()
+---+-----+
|key|count|
+---+-----+
|  0|    5|
|  1|    9|
+---+-----+
復制代碼

3.50 save(path=None, source=None, mode='error', **options)

保存DataFrame的數據到數據源。
注:在1.4中已過時,使用DataFrameWriter.save()代替。

3.51 saveAsParquetFile(path)

保存內容為一個Parquet文件,代表這個schema。
注:在1.4中已過時,使用DataFrameWriter.parquet() 代替。

3.52 saveAsTable(tableName, source=None, mode='error', **options)

將此DataFrame的內容作為表保存到數據源。
注:在1.4中已過時,使用DataFrameWriter.saveAsTable() 代替。

3.53  schema

返回DataFrame的schema為types.StructType。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.schema
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))

3.54  select(*cols)

提供一組表達式并返回一個新的DataFrame。
參數:● cols – 列名(字符串)或表達式(列)列表。 如果其中一列的名稱為“*”,那么該列將被擴展為包括當前DataFrame中的所有列。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.select('*').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
復制代碼

3.55 selectExpr(*expr)

投射一組SQL表達式并返回一個新的DataFrame。
這是接受SQL表達式的select()的變體。

>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]

3.56 show(n=20, truncate=True) 

將前n行打印到控制臺。
參數:● n – 要顯示的行數。
           ● truncate – 是否截斷長字符串并對齊單元格。

復制代碼
>>> df
DataFrame[name: string, age: bigint]
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice|  2|
|  Bob|  5|
+-----+---+
復制代碼

3.57 sort(*cols, **kwargs)

返回按指定列排序的新DataFrame。
參數:● cols – 要排序的列或列名稱列表。
           ● ascending – 布爾值或布爾值列表(默認為True)。 排序升序降序。 指定多個排序順序的列表。 如果指定了列表,列表的長度必須等于列的長度。

復制代碼
>>> df.sort(df.age.desc()).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.sort("age", ascending=False).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.orderBy(df.age.desc()).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.orderBy(desc("age"), "name").collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]
復制代碼

3.58 sortWithinPartitions(*cols, **kwargs)

返回一個新的DataFrame,每個分區按照指定的列排序。
參數:● cols – 要排序的列或列名稱列表。
            ascending – 布爾值或布爾值列表(默認為True)。 排序升序降序。 指定多個排序順序的列表。 如果指定了列表,列表的長度必須等于列的長度。

>>> df.sortWithinPartitions("age", ascending=False).show()
+-----+---+
| name|age|
+-----+---+
|Alice|  2|
|  Bob|  5|
+-----+---+

3.59 stat 

返回統計功能的DataFrameStatFunctions。

3.60 subtract(other)

返回一個新的DataFrame,這個DataFrame中包含的行不在另一個DataFrame中。
這相當于SQL中的EXCEPT。

3.61 take(num)

返回前num行的行列表

>>> df.take(2)
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

3.62 toDF(*cols)

返回一個新類:具有新的指定列名稱的DataFrame。
參數:● cols – 新列名列表(字符串)。

>>> df.toDF('f1', 'f2').collect()
[Row(f1=u'Alice', f2=2), Row(f1=u'Bob', f2=5)]

3.63 toJSON(use_unicode=True)

將DataFrame轉換為字符串的RDD。
每行都將轉換為JSON格式作為返回的RDD中的一個元素。

>>> df.toJSON().first()
u'{"name":"Alice","age":2}'

3.64 toPandas()

將此DataFrame的內容返回為Pandas pandas.DataFrame。
這只有在pandas安裝和可用的情況下才可用。

>>> df.toPandas()  
   age   name
0    2  Alice
1    5    Bob

3.65 unionAll(other)

返回包含在這個frame和另一個frame的行的聯合的新DataFrame。
這相當于SQL中的UNION ALL。

3.66 unpersist(blocking=True)

將DataFrame標記為非持久性,并從內存和磁盤中刪除所有的塊。

3.67 where(condition)

使用給定表達式過濾行。
where()是filter()的別名。
參數:● condition – 一個布爾類型的列或一個SQL表達式的字符串。

復制代碼
>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.filter(df.age > 3).collect()
[Row(name=u'Bob', age=5)]
>>> df.where(df.age == 2).collect()
[Row(name=u'Alice', age=2)]

>>> df.filter("age > 3").collect()
[Row(name=u'Bob', age=5)]
>>> df.where("age = 2").collect()
[Row(name=u'Alice', age=2)]
復制代碼

3.68 withColumn(colName, col)

通過添加列或替換具有相同名稱的現有列來返回新的DataFrame。
參數:● colName – 字符串,新列的名稱
           ● col – 新列的列表達式

>>> df.withColumn('age2', df.age + 2).collect()
[Row(name=u'Alice', age=2, age2=4), Row(name=u'Bob', age=5, age2=7)]

3.69 withColumnRenamed(existing, new)

通過重命名現有列來返回新的DataFrame。
參數:● existing – 字符串,要重命名的現有列的名稱
           ● col – 字符串,列的新名稱

>>> df.withColumnRenamed('age', 'age2').collect()
[Row(name=u'Alice', age2=2), Row(name=u'Bob', age2=5)]

3.70 write

用于將DataFrame的內容保存到外部存儲的接口。
返回:DataFrameWriter

轉自:http://www.cnblogs.com/wonglu/p/7784825.html



文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()