文章出處
文章列表
在spark-streming 中調用spark-sql時過程遇到的問題
使用版本:spark-2.1.0
JDK1.8
1. spark-sql中對limit 的查詢結果使用sum() 聚合操作不生效
如下sql會報出 top10_sts 存在異常。
SELECT
SUM(mtime_show_times) AS top10_sts
FROM
tb_movie_bo_pt_params
ORDER BY mtime_persion_times DESC
LIMIT 10
改成如下sql邏輯正常執行
SELECT
SUM(mtime_show_times) AS top10_sts
FROM
(SELECT
*
FROM
tb_moive_bo_pt_params
ORDER BY mtime_persion_times DESC
LIMIT 10) a
2. spark-sql中使用union 連接兩個表;再將union結果進行過濾 != 操作不生效問題。
SELECT
'ALL_MOVIE' AS movie_id,
SUM(no_sale) AS persion_times,
COUNT(1) AS show_times
FROM
tb_bo_real_time
WHERE biz_date = '#{var_date}'
UNION
SELECT
'ALL_MOVIE2' AS movie_id,
'2017-12-31' AS persion_times,
'1123' AS show_times
基于union 的結果生成的臨時表 temp_tb;
執行如下操作得不到預期的結果:
SELECT
*
FROM
temp_tb
WHERE movie_id != 'ALL_MOVIE'
這 可能是spark的bug,經過調試后發現,使用 union關鍵字之后就會出現該問題。
測試發現有2種解決辦法:
將uinon的兩部分分別使用sql計算,之后在使用RDD的union操作,將兩個數據集合合并起來。
val movie_summary_realtime_Df = sparkSession.sql(config.getProperty("test_union_sql").replace("#{var_date}", biz_date)) movie_summary_realtime_Df.collect().foreach(println) println("-----------movie_summary_realtime_all_Df------------------") val movie_summary_realtime_all_Df = sparkSession.sql(config.getProperty("test_union_sql_all").replace("#{var_date}", biz_date)) movie_summary_realtime_all_Df.collect().foreach(println) println("-----------union_Df------------------") val unioDf = movie_summary_realtime_Df.union(movie_summary_realtime_all_Df) unioDf.collect().foreach(println) unioDf.createOrReplaceTempView("tb_bo_movie_summary_realtime") println("-----------test filter------------------") val test_DF = sparkSession.sql("SELECT movie_id FROM tb_bo_movie_summary_realtime WHERE movie_id != 'ALL_MOVIE'") test_DF.collect().foreach(println)
將相關依賴表cache后,再進行sql操作。
val movie_summary_realtime_Df = sparkSession.sql(config.getProperty("tb_bo_movie_summary_realtime").replace("#{var_date}", biz_date))
movie_summary_realtime_Df.cache()
3. spark 內存快照的更新
def updateSeatMapState(moviesKey: String, seatMap: Option[JSONObject], state: State[JSONObject]) = {
var newValue:JSONObject = seatMap match {
case None => { val temp =state.get();temp;}
case _ => { state.update(seatMap.get);seatMap.get; }
}
val output = (moviesKey, newValue)
output
}
- 當前值Option[JSONObject] 有可能為none,state.update(none) 會有空指針異常,造成程序退出。
- 當前值Option[JSONObject] 為none時,有兩種情況。一種是業務確實為空;另一種是當前key已經過期了。
不管那種情況,都不需要更新state的值。 - 如果一個state的可以過期了,再調用state.update()就會報出一個更新過期Key的異常,后程序退出。
- 狀態值如果不設置過期,就會一直存在,系統長時間運行性能會越來越差,并出現內存溢出,而異常退出。
- 更新方法需要返回值。
java.lang.IllegalArgumentException: requirement failed: Cannot update the state that is timing out
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.StateImpl.update(State.scala:156)
at com.mtime.bigdata.bo.RealTimeBoxOfficeCluster$.updateSeatMapState(RealTimeBoxOfficeCluster.scala:110)
at com.mtime.bigdata.bo.RealTimeBoxOfficeCluster$$anonfun$6.apply(RealTimeBoxOfficeCluster.scala:72)
at com.mtime.bigdata.bo.RealTimeBoxOfficeCluster$$anonfun$6.apply(RealTimeBoxOfficeCluster.scala:72)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at
文章列表
全站熱搜