一、概述
在實時應用之中,難免會遇到往NoSql數據如HBase中寫入數據的情景。題主在工作中遇到如下情景,需要實時查詢某個設備ID對應的賬號ID數量。踩過的坑也挺多,舉其中之一,如一開始選擇使用NEO4J圖數據庫存儲設備和賬號的關系,當然也有其他的數據,最終構成一個復雜的圖關系,但是這個圖數據庫免費版是單機安裝(集群要收費),在實時寫入和查詢關系的時候,導致我們一臺服務器內存和cpu損耗嚴重,為了保證Hadoop集群的穩定性,只好替換掉這個數據庫,采用流行的HBase。本文就HBase的使用心得做如下記錄。
二、解決方案
1.rowkey設計:設備id是32位字母、數字組成的串,考慮到HBase長表掃描的查詢最快,所以rowkey的設計方式為,設備ID+賬號ID拼接而成,這樣在掃描某個設備ID時會很快計算出條數。
2.HBase表設計:在創建表的時候采用預分區建表,因為這樣的,如果知道hbase數據表的rowkey的分布情況,就可以在建表的時候對hbase進行region的預分區,這樣做的好處是防止大數據量插入的熱點問題,提高數據插入的效率。rowkey是字母或者數字開頭,所以建表語句如下(數據量再大的時候還可以在細分分區):
create 'T_TEST', 'data', SPLITS => ['0', '1','2', '3','4', '5','6','7','8','9','a', 'b', 'c', 'd', 'e', 'f', 'g']
此處入坑:創建表的時候將HBase表映射到Hive外部表,語句如下。這樣做是為了方便導入歷史數據,但是Hive跑批將歷史數據導入之后,從HBase查詢已經導入的某一數據的時候,無法查詢導數據,也無法通過API寫入到HBase,這個問題很詭異,后來想了下Hive導入的數據編碼和HBase的不同,于是重新將表刪除,不采用映射表,直接使用Spark將歷史數據導入,問題解決。
CREATE external TABLE tmp.H_T_TEST(key string ,num string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,data:num") TBLPROPERTIES ("hbase.table.name" = "T_TEST");
3.設計好rowkey和表之后,我們就開始寫Spark代碼了。
此處入坑,我把HBase的連接池寫在了和Spark的同一位置,這樣會遇到一個問題,Spark程序運行的時候報HBaseConnection沒有序列化,按照網上的做法,將對象加上 @transient注解,雖然不報錯誤,還是無法將數據寫入到Hba之中。后來經過查找,找到了解決辦法,將HBase的連接放到消息的循環之內,即一個分區建立一個HBase連接,代碼如下。
def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.createSparkContext(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(sc, Seconds(10)) val messages = SparkUtil.createDStreamFromKafka( "T_TEST", topicSet, ssc)//創建消息接收器 messages.foreachRDD(rdd => { rdd.foreachPartition(partitionRecords => {//循環分區 try { val connection = HBaseUtil.getHbaseConn //獲取HBase連接,分區創建一個連接,分區不跨節點,不需要序列化 partitionRecords.foreach(s => { val data = JSON.parseObject(s._2)//將數據轉化成JSON格式 val tableName = TableName.valueOf("T_TEST") val table = connection.getTable(tableName)//獲取表連接 val put = new Put(Bytes.toBytes(data.getString("id1") + "_" + data.getString("id2"))) put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("num"), Bytes.toBytes("1")) Try(table.put(put)).getOrElse(table.close())//將數據寫入HBase,若出錯關閉table table.close()//分區數據寫入HBase后關閉連接 }) } catch { case e: Exception => logger.error("寫入HBase失敗,{}", e.getMessage) } }) }) ssc.start() ssc.awaitTermination() }
至此問題解決,數據正常,還沒出現過問題,等待時間驗證吧。
4.歷史數據導入,在導入歷史數據的時候,由于數據放在了Hive的兩個不同表之中,一開始想要一次性讀入,使用Spark SQL的dataframe,創建一個hivecontext,寫HiveSQL將兩個表結果執行union all操作,但是Spark程序報rpc錯誤。將兩個表的結果分別查出,使用dataframe 的union all操作,也是不行,也是rpc錯誤,查了很多資料,還是沒解決,莫名其妙的錯誤,后來兩個表分開執行導入歷史數據,問題不再出現,可能Spark還是不夠成熟,總是遇到莫名其妙的問題。
三、總結
在使用Hbase的時候要預分區。不要為了方便使用Hive外部映射表。HBase的連接池要放在分區循環開始的地方,不然創建很多的連接,會導致HBase垮掉。
文章列表