文章出處

一、概述

  在實時應用之中,難免會遇到往NoSql數據如HBase中寫入數據的情景。題主在工作中遇到如下情景,需要實時查詢某個設備ID對應的賬號ID數量。踩過的坑也挺多,舉其中之一,如一開始選擇使用NEO4J圖數據庫存儲設備和賬號的關系,當然也有其他的數據,最終構成一個復雜的圖關系,但是這個圖數據庫免費版是單機安裝(集群要收費),在實時寫入和查詢關系的時候,導致我們一臺服務器內存和cpu損耗嚴重,為了保證Hadoop集群的穩定性,只好替換掉這個數據庫,采用流行的HBase。本文就HBase的使用心得做如下記錄。

二、解決方案

  1.rowkey設計:設備id是32位字母、數字組成的串,考慮到HBase長表掃描的查詢最快,所以rowkey的設計方式為,設備ID+賬號ID拼接而成,這樣在掃描某個設備ID時會很快計算出條數。

2.HBase表設計:在創建表的時候采用預分區建表,因為這樣的,如果知道hbase數據表的rowkey的分布情況,就可以在建表的時候對hbase進行region的預分區,這樣做的好處是防止大數據量插入的熱點問題,提高數據插入的效率。rowkey是字母或者數字開頭,所以建表語句如下(數據量再大的時候還可以在細分分區):

create 'T_TEST', 'data', SPLITS => ['0', '1','2', '3','4', '5','6','7','8','9','a', 'b', 'c', 'd', 'e', 'f', 'g']

此處入坑:創建表的時候將HBase表映射到Hive外部表,語句如下。這樣做是為了方便導入歷史數據,但是Hive跑批將歷史數據導入之后,從HBase查詢已經導入的某一數據的時候,無法查詢導數據,也無法通過API寫入到HBase,這個問題很詭異,后來想了下Hive導入的數據編碼和HBase的不同,于是重新將表刪除,不采用映射表,直接使用Spark將歷史數據導入,問題解決。

CREATE external TABLE tmp.H_T_TEST(key string ,num string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,data:num")
TBLPROPERTIES ("hbase.table.name" = "T_TEST");

3.設計好rowkey和表之后,我們就開始寫Spark代碼了。

此處入坑,我把HBase的連接池寫在了和Spark的同一位置,這樣會遇到一個問題,Spark程序運行的時候報HBaseConnection沒有序列化,按照網上的做法,將對象加上 @transient注解,雖然不報錯誤,還是無法將數據寫入到Hba之中。后來經過查找,找到了解決辦法,將HBase的連接放到消息的循環之內,即一個分區建立一個HBase連接,代碼如下。

def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.createSparkContext(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
    val messages = SparkUtil.createDStreamFromKafka(
      "T_TEST",
      topicSet,
      ssc)//創建消息接收器

    messages.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {//循環分區
        try {
          val connection = HBaseUtil.getHbaseConn //獲取HBase連接,分區創建一個連接,分區不跨節點,不需要序列化
          partitionRecords.foreach(s => {
            val data = JSON.parseObject(s._2)//將數據轉化成JSON格式
            val tableName = TableName.valueOf("T_TEST")
            val table = connection.getTable(tableName)//獲取表連接

            val put = new Put(Bytes.toBytes(data.getString("id1") + "_" + data.getString("id2")))
            put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("num"), Bytes.toBytes("1"))

            Try(table.put(put)).getOrElse(table.close())//將數據寫入HBase,若出錯關閉table
            table.close()//分區數據寫入HBase后關閉連接
          })
        } catch {
          case e: Exception => logger.error("寫入HBase失敗,{}", e.getMessage)
        }
      })
    })
    ssc.start()
    ssc.awaitTermination()

  }

至此問題解決,數據正常,還沒出現過問題,等待時間驗證吧。

4.歷史數據導入,在導入歷史數據的時候,由于數據放在了Hive的兩個不同表之中,一開始想要一次性讀入,使用Spark SQL的dataframe,創建一個hivecontext,寫HiveSQL將兩個表結果執行union all操作,但是Spark程序報rpc錯誤。將兩個表的結果分別查出,使用dataframe 的union all操作,也是不行,也是rpc錯誤,查了很多資料,還是沒解決,莫名其妙的錯誤,后來兩個表分開執行導入歷史數據,問題不再出現,可能Spark還是不夠成熟,總是遇到莫名其妙的問題。

三、總結

  在使用Hbase的時候要預分區。不要為了方便使用Hive外部映射表。HBase的連接池要放在分區循環開始的地方,不然創建很多的連接,會導致HBase垮掉。

 


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()