文章出處

        kafka是LinkedIn開發并開源的一個分布式MQ系統,現在是Apache的一個孵化項目。在它的主頁描述kafka為一個高吞吐量的分布式(能 將消息分散到不同的節點上)MQ。在這片博文中,作者簡單提到了開發kafka而不選擇已有MQ系統的原因。兩個原因:性能和擴展性。Kafka僅僅由 7000行Scala編寫,據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。

 

安裝準備

版本

Kafka版本:kafka_2.10-0.8.2.0

Zookeeper版本:3.4.6

Zookeeper 集群:hadoop104,hadoop107,hadoop108

Zookeeper集群的搭建參見:在CentOS上安裝ZooKeeper集群

物理環境

安裝兩臺物理機:

192.168.40.104  hadoop104(運行3個Broker)

192.148.40.105  hadoop105(運行2個Broker)

該集群的創建主要分為三步,單節點單Broker,單節點多Broker,多節點多Broker

 

單節點單Broker

本節以hadoop104上創建一個Broker為例

下載kafka

 

 

下載路徑:http://kafka.apache.org/downloads.html

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #tar -xvf kafka_2.10-0.8.2.0.tgz  
  2. # cd kafka_2.10-0.8.2.0  

配置

修改config/server.properties

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. broker.id=1    
  2. port=9092  
  3. host.name=hadoop104    
  4. socket.send.buffer.bytes=1048576    
  5. socket.receive.buffer.bytes=1048576    
  6. socket.request.max.bytes=104857600    
  7. log.dir=./kafka1-logs  
  8. num.partitions=10  
  9. zookeeper.connect=hadoop107:2181,hadoop104:2181,hadoop108:2181    


啟動Kafka服務

 

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #bin/kafka-server-start.sh config/server.properties  

 

 

創建Topic

 

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #bin/kafka-topics.sh --create --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181 --replication-factor 1 --partitions 1 --topic test  

 

查看Topic

 

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #bin/kafka-topics.sh --list --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181  

 

輸出:

 

producer發送消息

 

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test   

 


 

consumer接收消息

 

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #bin/kafka-console-consumer.sh --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181 --topic test --from-beginning  

 

 

如果要最新的數據,可以不帶--from-beginning參數即可。

/bin/kafka-console-consumer.sh --zookeeper  hadoop107:2181,hadoop104:2181,hadoop108:2181  --topic test

 

單節點多個Broker

 

配置

將上個章節中的文件夾再復制兩份分別為kafka_2,kafka_3

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #cp -r kafka_2.10-0.8.2.0 kafka_2  
  2.   
  3. #cp -r kafka_2.10-0.8.2.0 kafka_3  

 

分別修改kafka_2/config/server.properties以及kafka_3/config/server.properties 文件中的broker.id,以及port屬性,確保唯一性

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. kafka_2/config/server.properties  
  2. broker.id=2  
  3. port=9093  
  4. kafka_3/config/server.properties  
  5. broker.id=3  
  6. port=9094  

 

啟動

啟動另外兩個Broker
[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #cd kafka_2  
  2. # bin/kafka-server-start.sh config/server.properties &  
  3. #cd ../kafka_3  
  4. # bin/kafka-server-start.sh config/server.properties &  

創建一個replication factor為3的topic

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #bin/kafka-topics.sh --create --zookeeper hadoop107:2181,hadoop104:2181,hadoop108:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic  

查看Topic的狀態

[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. bin/kafka-topics.sh --describe --zookeeper  hadoop107:2181,hadoop104:2181,hadoop108:2181  --topic my-replicated-topic  
從上面的內容可以看出,該topic包含1個part,replicationfactor為3,且Node3 是leador
解釋如下:
  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader. 
 
再來看一下之前創建的test topic, 從下圖可以看出沒有進行replication
 

多個節點的多個Broker

 
在hadoop105上分別把下載的文件解壓縮到kafka_4,kafka_5兩個文件夾中,再將hadoop104上的server.properties配置文件拷貝到這連個文件夾中
[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #scp -r config/ root@hadoop105:/root/hadoop/kafka_4/  
  2. #scp -r config/ root@hadoop105:/root/hadoop/kafka_5/  

配置

并分別修改內容如下:
[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. kafka_4  
  2.     brokerid=4  
  3.     port=9095  
  4.     host.name=hadoop105  
  5. kafka_5  
  6.     brokerid=5  
  7.     port=9096  
  8.     host.name=hadoop105  
啟動服務
[html] view plaincopyprint?在CODE上查看代碼片派生到我的代碼片
 
  1. #cd kafka_4  
  2. # bin/kafka-server-start.sh config/server.properties &  
  3. #cd ../kafka_5  
  4. # bin/kafka-server-start.sh config/server.properties &  
 
到目前為止,兩臺物理機上的5個Broker已經啟動完畢

 

總結

在kafka的核心思路中,不需要在內存里緩存數據,因為操作系統的文件緩存已經足夠完善和強大,只要不做隨機寫,順序讀寫的性能是非常高效的。 kafka的數據只會順序append,數據的刪除策略是累積到一定程度或者超過一定時間再刪除。Kafka另一個獨特的地方是將消費者信息保存在客戶端 而不是MQ服務器,這樣服務器就不用記錄消息的投遞過程,每個客戶端都自己知道自己下一次應該從什么地方什么位置讀取消息,消息的投遞過程也是采用客戶端 主動pull的模型,這樣大大減輕了服務器的負擔。Kafka還強調減少數據的序列化和拷貝開銷,它會將一些消息組織成Message Set做批量存儲和發送,并且客戶端在pull數據的時候,盡量以zero-copy的方式傳輸,利用sendfile(對應java里的 FileChannel.transferTo/transferFrom)這樣的高級IO函數來減少拷貝開銷。可見,kafka是一個精心設計,特定于 某些應用的MQ系統,這種偏向特定領域的MQ系統我估計會越來越多,垂直化的產品策略值的考慮。

只要磁盤沒有限制并且不出現損失,kafka可以存儲相當長時間的消息(一周)。

 

轉:http://www.centoscn.com/CentosServer/cluster/2015/0312/4863.html


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()