文章出處

兩天公司要學習kafka,結合之前的storm,做了一個簡單的集成,之前也參考了網上的例子一些例子,發現或多或少都有一些問題。所以自己做了一個。

 

    這個是網上其他人遇到的問題,給摘錄一下,防止以后自己和大家出現:

這幾天工作需要使用storm+kafka,基本場景是應用出現錯誤,發送日志到kafka的某個topic,storm訂閱該topic,然后進行后續處理。場景非常簡單,但是在學習過程中,遇到一個奇怪的異常情況:使用KafkaSpout讀取topic數據時,沒有向ZK寫offset數據,致使每次都從頭開始讀取。糾結了兩天,終于碰巧找到原因:應該使用BaseBasicBolt作為bolt的父類,而不是BaseRichBolt

通過本文記錄一下這種情況,后文中根據上述場景提供幾個簡單的例子。基礎理論查看storm筆記:storm基本概念,或查看Storm 簡介

基本訂閱

基本場景:訂閱kafka的某個topic,然后在讀取的消息前加上自定義的字符串,然后寫回到kafka另外一個topic。

從Kafka讀取數據的Spout使用storm.kafka.KafkaSpout,向Kafka寫數據的Bolt使用storm.kafka.bolt.KafkaBolt。中間進行進行數據處理的Bolt定義為TopicMsgBolt。閑言少敘,奉上代碼:


public class TopicMsgTopology {
    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology");
        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Properties props = new Properties();
        // 配置Kafka broker地址
        props.put("metadata.broker.list", "dev2_55.wfj-search:9092");
        // serializer.class為消息的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "msgTopic2");
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
        builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
        if (args.length == 0) {
            String topologyName = "kafkaTopicTopology";
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        } else {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
    }
}

storm.kafka.ZkHosts構造方法的參數是zookeeper標準配置地址的形式(ZooKeeper環境搭建可以查看ZooKeeper安裝部署),zk1、zk2、zk3在本地配置了host,因為服務器使用的偽分布式模式,因此幾個端口號不是默認的2181。

storm.kafka.SpoutConfig構造方法第一個參數為上述的storm.kafka.ZkHosts對象,第二個為待訂閱的topic名稱,第三個參數zkRoot為寫讀取topic時的偏移量offset數據的節點(zk node),第四個參數為該節點上的次級節點名(有個地方說這個是spout的id)。

backtype.storm.Config對象是配置storm的topology(拓撲)所需要的基礎配置。

backtype.storm.spout.SchemeAsMultiScheme的構造方法輸入的參數是訂閱kafka數據的處理參數,這里的MessageScheme是自定義的,代碼如下:

public class MessageScheme implements Scheme {
    private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

    @Override
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8");
            logger.info("get one message is {}", msg);
            return new Values(msg);
        } catch (UnsupportedEncodingException ignored) {
            return null;
        }
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

MessageScheme類中getOutputFields方法是KafkaSpout向后發送tuple(storm傳輸數據的最小結構)的名字,需要與接收數據的Bolt中統一(在這個例子中可以不統一,因為后面直接取第0條數據,但是在wordCount的那個例子中就需要統一了)。

TopicMsgBolt類是從storm.kafka.KafkaSpout接收數據的Bolt,對接收到的數據進行處理,然后向后傳輸給storm.kafka.bolt.KafkaBolt。代碼如下:

public class TopicMsgBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        String out = "Message got is '" + word + "'!";
        logger.info("out={}", out);
        collector.emit(new Values(out));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

此處需要特別注意的是,要使用backtype.storm.topology.base.BaseBasicBolt對象作為父類,否則不會在zk記錄偏移量offset數據。

需要編寫的代碼已完成,接下來就是在搭建好的storm、kafka中進行測試:

# 創建topic
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2

接下來需要分別對msgTopic1、msgTopic2啟動producer(生產者)與consumer(消費者):

# 對msgTopic1啟動producer,用于發送數據
./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1
# 對msgTopic2啟動consumer,用于查看發送數據的處理結果
./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning

然后將打好的jar包上傳到storm的nimbus(可以使用遠程上傳或先上傳jar包到nimbus節點所在服務器,然后本地執行):

# ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology

待對應的worker啟動好之后,就可以在msgTopic1的producer對應終端輸入數據,然后在msgTopic2的consumer對應終端查看輸出結果了。

有幾點需要注意的:

  1. 必須先創建msgTopic1、msgTopic2兩個topic;
  2. 定義的bolt必須使用BaseBasicBolt作為父類,不能夠使用BaseRichBolt,否則無法記錄偏移量;
  3. zookeeper最好使用至少三個節點的分布式模式或偽分布式模式,否則會出現一些異常情況;
  4. 在整個storm下,spout、bolt的id必須唯一,否則會出現異常。
  5. TopicMsgBolt類作為storm.kafka.bolt.KafkaBolt前的最后一個Bolt,需要將輸出數據名稱定義為message,否則KafkaBolt無法接收數據。

wordCount

簡單的輸入輸出做完了,來點復雜點兒的場景:從某個topic定于消息,然后根據空格分詞,統計單詞數量,然后將當前輸入的單詞數量推送到另一個topic。

首先規劃需要用到的類:

  1. 從KafkaSpout接收數據并進行處理的backtype.storm.spout.Scheme子類;
  2. 數據切分bolt:SplitSentenceBolt
  3. 計數bolt:WordCountBolt
  4. 報表bolt:ReportBolt
  5. topology定義:WordCountTopology
  6. 最后再加一個原樣顯示訂閱數據的bolt:SentenceBolt

backtype.storm.spout.Scheme子類可以使用上面已經定義過的MessageScheme,此處不再贅述。

SplitSentenceBolt是對輸入數據進行分割,簡單的使用String類的split方法,然后將每個單詞命名為“word”,向后傳輸,代碼如下:

public class SplitSentenceBolt extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getStringByField("msg");
        String[] words = sentence.split(" ");
        Arrays.asList(words).forEach(word -> collector.emit(new Values(word)));
    }
}

SentenceBolt是從KafkaSpout接收數據,然后直接輸出。在拓撲圖上就是從輸入分叉,一個進入SplitSentenceBolt,一個進入SentenceBolt。這種結構可以應用在Lambda架構中,代碼如下:

public class SentenceBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String msg = tuple.getStringByField("msg");
        logger.info("get one message is {}", msg);
        basicOutputCollector.emit(new Values(msg));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

WordCountBolt是對接收到的單詞進行匯總統一,然后將單詞“word”及其對應數量“count”向后傳輸,代碼如下:

public class WordCountBolt extends BaseBasicBolt {
    private Map<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counts = new ConcurrentHashMap<>();
        super.prepare(stormConf, context);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        collector.emit(new Values(word, count));
    }
}

ReportBolt是對接收到的單詞及數量進行整理,拼成json格式,然后繼續向后傳輸,代碼如下:

public class ReportBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}";
        collector.emit(new Values(reportMessage));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("message"));
    }
}

最后是定義topology(拓撲)WordCountTopology,代碼如下:

public class WordCountTopology {
    private static final String KAFKA_SPOUT_ID = "kafkaSpout";
    private static final String SENTENCE_BOLT_ID = "sentenceBolt";
    private static final String SPLIT_BOLT_ID = "sentenceSplitBolt";
    private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt";
    private static final String REPORT_BOLT_ID = "reportBolt";
    private static final String KAFKA_BOLT_ID = "kafkabolt";
    private static final String CONSUME_TOPIC = "sentenceTopic";
    private static final String PRODUCT_TOPIC = "wordCountTopic";
    private static final String ZK_ROOT = "/topology/root";
    private static final String ZK_ID = "wordCount";
    private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka";

    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig));
        builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID);
        builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID);

        Config config = new Config();
        Map<String, String> map = new HashMap<>();
        map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址
        map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class為消息的序列化類
        config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties
        config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}

除了上面提過應該注意的地方,此處還需要注意,storm.kafka.SpoutConfig定義的zkRoot與id應該與第一個例子中不同(至少保證id不同,否則兩個topology將使用一個節點記錄偏移量)。

 


基本場景是應用出現錯誤,發送日志到kafka的某個topic,storm訂閱該topic,然后進行后續處理。場景非常簡單,但是在學習過程中,遇到一個奇怪的異常情況:使用KafkaSpout讀取topic數據時,沒有向ZK寫offset數據,致使每次都從頭開始讀取。糾結了兩天,終于碰巧找到原因:應該使用BaseRichBolt

        

基本訂閱 :

基本場景:訂閱kafka的某個topic,然后在讀取的消息前加上自定義的字符串,然后寫回到kafka另外一個topic。  從Kafka讀取數據的Spout使用storm.kafka.KafkaSpout,向Kafka寫數據的Bolt使用storm.kafka.bolt.KafkaBolt。中間進行進行數據處理的Bolt定義為TopicMsgBolt。
  

 

 


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
 
import java.util.Properties;
 
public class TopicMsgTopology {
    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
        // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1""/topology/root1""topicMsgTopology");
        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Properties props = new Properties();
        // 配置Kafka broker地址
        props.put("metadata.broker.list""localhost:9092");
        // serializer.class為消息的序列化類
        props.put("serializer.class""kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);
        // 配置KafkaBolt生成的topic
        conf.put("topic""msgTopic2");
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgKafkaSpout"new KafkaSpout(spoutConfig));
        builder.setBolt("msgSentenceBolt", (IBasicBolt) new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
        builder.setBolt("msgKafkaBolt"new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
        if (args.length == 0) {
            String topologyName = "kafkaTopicTopology";
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        else {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
    }
}



文章列表


不含病毒。www.avast.com
全站熱搜
創作者介紹
創作者 大師兄 的頭像
大師兄

IT工程師數位筆記本

大師兄 發表在 痞客邦 留言(0) 人氣()