久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Kafka實(shí)戰(zhàn)-簡單示例

 Baruch 2017-11-16

1.概述

  上一篇博客《Kafka實(shí)戰(zhàn)-Kafka Cluster》中,,為大家介紹了Kafka集群的安裝部署,以及對Kafka集群Producer/Consumer,、HA等做了相關(guān)測試,,今天我們來開發(fā)一個(gè)Kafka示例,練習(xí)如何在Kafka中進(jìn)行編程,,下面是今天的分享的目錄結(jié)構(gòu):

  • 開發(fā)環(huán)境
  • ConfigureAPI
  • Consumer
  • Producer
  • 截圖預(yù)覽

  下面開始今天的內(nèi)容分享,。

2.開發(fā)環(huán)境

  在開發(fā)Kafka相關(guān)應(yīng)用之前,我們得將Kafka得開發(fā)環(huán)境搭建完成,,這里我所使用得開發(fā)環(huán)境如下所示:

基礎(chǔ)軟件 工具名稱
IDE JBoss Studio 8
JDK 1.7

  關(guān)于基礎(chǔ)軟件的下載及相關(guān)配置,,大家可參考我寫的《高可用Hadoop平臺-啟航》一文的相關(guān)贅述,這里就不多做介紹了,。在安裝好相關(guān)基礎(chǔ)軟件后,,我們開始項(xiàng)目工程的創(chuàng)建,這里我們所使用的工程結(jié)構(gòu)是Maven,,關(guān)于Maven環(huán)境的相關(guān)配置信息,,可參考我在《Hadoop2源碼分析-準(zhǔn)備篇》一文對Maven環(huán)境配置的贅述。

  在準(zhǔn)備完成相關(guān)基礎(chǔ)軟件以及Maven環(huán)境后,,我們大家創(chuàng)建的工程,,在pom.xml文件中,添加Kafka的依賴包,,添加代碼如下所示:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>        

  下面開始編寫今天的代碼示例,。

3.ConfigureAPI

  首先是一個(gè)配置結(jié)構(gòu)類文件,配置Kafka的相關(guān)參數(shù),,代碼如下所示:

復(fù)制代碼
package cn.hadoop.hdfs.conf;

/**
 * @Date Apr 28, 2015
 *
 * @Author dengjie
 *
 * @Note Set param path
 */
public class ConfigureAPI {

    public interface KafkaProperties {
        public final static String ZK = "10.211.55.15:2181,10.211.55.17:2181,10.211.55.18:2181";
        public final static String GROUP_ID = "test_group1";
        public final static String TOPIC = "test2";
        public final static String BROKER_LIST = "10.211.55.15:9092,10.211.55.17:9092,10.211.55.18:9092";
        public final static int BUFFER_SIZE = 64 * 1024;
        public final static int TIMEOUT = 20000;
        public final static int INTERVAL = 10000;
    }

}
復(fù)制代碼

4.Consumer

  然后是一個(gè)消費(fèi)程序,,用于消費(fèi)Kafka的消息,代碼如下所示:

  • JConsumer

復(fù)制代碼
package cn.hadoop.hdfs.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note Kafka Consumer
 */
public class JConsumer extends Thread {

    private ConsumerConnector consumer;
    private String topic;
    private final int SLEEP = 1000 * 3;

    public JConsumer(String topic) {
        consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
        this.topic = topic;
    }

    private ConsumerConfig consumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("Receive->[" + new String(it.next().message()) + "]");
            try {
                sleep(SLEEP);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

}
復(fù)制代碼

5.Producer

  接著是Kafka的生產(chǎn)消息程序,,用于產(chǎn)生Kafka的消息供Consumer去消費(fèi),,具體代碼如下所示:

  • JProducer

復(fù)制代碼
package cn.hadoop.hdfs.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note Kafka JProducer
 */
public class JProducer extends Thread {

    private Producer<Integer, String> producer;
    private String topic;
    private Properties props = new Properties();
    private final int SLEEP = 1000 * 3;

    public JProducer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.211.55.18:9092");
        producer = new Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    @Override
    public void run() {
        int offsetNo = 1;
        while (true) {
            String msg = new String("Message_" + offsetNo);
            System.out.println("Send->[" + msg + "]");
            producer.send(new KeyedMessage<Integer, String>(topic, msg));
            offsetNo++;
            try {
                sleep(SLEEP);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

}
復(fù)制代碼

6.截圖預(yù)覽

  在開發(fā)完Consumer和Producer的代碼后,我們來測試相關(guān)應(yīng)用,,下面給大家編寫了一個(gè)Client去測試Consumer和Producer,,具體代碼如下所示:

  • KafkaClient

復(fù)制代碼
package cn.hadoop.hdfs.kafka.client;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import cn.hadoop.hdfs.kafka.JConsumer;
import cn.hadoop.hdfs.kafka.JProducer;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note To run Kafka Code
 */
public class KafkaClient {

    public static void main(String[] args) {
        JProducer pro = new JProducer(KafkaProperties.TOPIC);
        pro.start();

        JConsumer con = new JConsumer(KafkaProperties.TOPIC);
        con.start();
    }

}
復(fù)制代碼

  運(yùn)行截圖如下所示:

7.總結(jié)

  大家在開發(fā)Kafka的應(yīng)用時(shí),需要注意相關(guān)事項(xiàng),。若是使用Maven項(xiàng)目工程,,在添加相關(guān)Kafka依賴JAR包時(shí),有可能依賴JAR會(huì)下載失敗,,若出現(xiàn)這種情況,,可手動(dòng)將Kafka的依賴JAR包添加到Maven倉庫即可,,在編寫Consumer和Producer程序,這里只是給出一個(gè)示例讓大家先熟悉Kafka的代碼如何去編寫,,后面會(huì)給大家更加詳細(xì)復(fù)雜的代碼模塊案例,。

8.結(jié)束語

  這篇博客就和大家分享到這里,如果大家在研究學(xué)習(xí)的過程當(dāng)中有什么問題,,可以加群進(jìn)行討論或發(fā)送郵件給我,,我會(huì)盡我所能為您解答,與君共勉,!

 

  

 

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多