久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Kafka的安裝與使用(轉(zhuǎn))

 印度阿三17 2019-12-21

9.1 Kafka 基礎(chǔ)知識(shí)


?

9.1.1 消息系統(tǒng)


點(diǎn)對點(diǎn)消息系統(tǒng):生產(chǎn)者發(fā)送一條消息到queue,一個(gè)queue可以有很多消費(fèi)者,,但是一個(gè)消息只能被一個(gè)消費(fèi)者接受,,當(dāng)沒有消費(fèi)者可用時(shí),這個(gè)消息會(huì)被保存直到有 一個(gè)可用的消費(fèi)者,所以Queue實(shí)現(xiàn)了一個(gè)可靠的負(fù)載均衡。

發(fā)布訂閱消息系統(tǒng):發(fā)布者發(fā)送到topic的消息,只有訂閱了topic的訂閱者才會(huì)收到消息,。topic實(shí)現(xiàn)了發(fā)布和訂閱,當(dāng)你發(fā)布一個(gè)消息,,所有訂閱這個(gè)topic的服務(wù)都能得到這個(gè)消息,,所以從1到N個(gè)訂閱者都能得到這個(gè)消息的拷貝。

?
9.1.2 kafka術(shù)語


消息由producer產(chǎn)生,,消息按照topic歸類,,并發(fā)送到broker中,broker中保存了一個(gè)或多個(gè)topic的消息,,consumer通過訂閱一組topic的消息,,通過持續(xù)的poll操作從broker獲取消息,并進(jìn)行后續(xù)的消息處理,。

Producer :消息生產(chǎn)者,,就是向broker發(fā)指定topic消息的客戶端。

Consumer :消息消費(fèi)者,,通過訂閱一組topic的消息,,從broker讀取消息的客戶端。

Broker :一個(gè)kafka集群包含一個(gè)或多個(gè)服務(wù)器,,一臺(tái)kafka服務(wù)器就是一個(gè)broker,,用于保存producer發(fā)送的消息。一個(gè)broker可以容納多個(gè)topic,。

Topic :每條發(fā)送到broker的消息都有一個(gè)類別,,可以理解為一個(gè)隊(duì)列或者數(shù)據(jù)庫的一張表,。

Partition:一個(gè)topic的消息由多個(gè)partition隊(duì)列存儲(chǔ)的,,一個(gè)partition隊(duì)列在kafka上稱為一個(gè)分區(qū)。每個(gè)partition是一個(gè)有序的隊(duì)列,,多個(gè)partition間則是無序的,。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。

?Offset:偏移量。kafka為每條在分區(qū)的消息保存一個(gè)偏移量offset,,這也是消費(fèi)者在分區(qū)的位置,。kafka的存儲(chǔ)文件都是按照offset.kafka來命名,位于2049位置的即為2048.kafka的文件,。比如一個(gè)偏移量是5的消費(fèi)者,,表示已經(jīng)消費(fèi)了從0-4偏移量的消息,下一個(gè)要消費(fèi)的消息的偏移量是5,。

Consumer Group (CG):若干個(gè)Consumer組成的集合,。這是kafka用來實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個(gè)consumer)的手段。一個(gè)topic可以有多個(gè)CG,。topic的消息會(huì)復(fù)制(不是真的復(fù)制,,是概念上的)到所有的CG,但每個(gè)CG只會(huì)把消息發(fā)給該CG中的一個(gè)consumer,。如果需要實(shí)現(xiàn)廣播,,只要每個(gè)consumer有一個(gè)獨(dú)立的CG就可以了。要實(shí)現(xiàn)單播只要所有的consumer在同一個(gè)CG,。用CG還可以將consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的topic,。

假如一個(gè)消費(fèi)者組有兩個(gè)消費(fèi)者,訂閱了一個(gè)具有4個(gè)分區(qū)的topic的消息,,那么這個(gè)消費(fèi)者組的每一個(gè)消費(fèi)者都會(huì)消費(fèi)兩個(gè)分區(qū)的消息,。消費(fèi)者組的成員是動(dòng)態(tài)維護(hù)的,如果新增或者減少了消費(fèi)者組中的消費(fèi)者,,那么每個(gè)消費(fèi)者消費(fèi)的分區(qū)的消息也會(huì)動(dòng)態(tài)變化,。比如原來一個(gè)消費(fèi)者組有兩個(gè)消費(fèi)者,其中一個(gè)消費(fèi)者因?yàn)楣收隙荒芾^續(xù)消費(fèi)消息了,,那么剩下一個(gè)消費(fèi)者將會(huì)消費(fèi)全部4個(gè)分區(qū)的消息,。

?
9.1.3 kafka安裝和使用
在Windows安裝運(yùn)行Kafka:https://blog.csdn.net/weixin_38004638/article/details/91893910


?

9.1.4 kafka運(yùn)行

?


一次寫入,支持多個(gè)應(yīng)用讀取,,讀取信息是相同的

?

kafka-study.pom

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.24</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
Producer生產(chǎn)者?

發(fā)送消息的方式,,只管發(fā)送,不管結(jié)果:只調(diào)用接口發(fā)送消息到 Kafka 服務(wù)器,,但不管成功寫入與否,。由于 Kafka 是高可用的,因此大部分情況下消息都會(huì)寫入,,但在異常情況下會(huì)丟消息
同步發(fā)送:調(diào)用 send() 方法返回一個(gè) Future 對象,,我們可以使用它的 get() 方法來判斷消息發(fā)送成功與否
異步發(fā)送:調(diào)用 send() 時(shí)提供一個(gè)回調(diào)方法,當(dāng)接收到 broker 結(jié)果后回調(diào)此方法

public class MyProducer {
private static KafkaProducer<String, String> producer;
//初始化
static {
Properties properties = new Properties();
//kafka啟動(dòng),,生產(chǎn)者建立連接broker的地址
properties.put("bootstrap.servers", "127.0.0.1:9092");
//kafka序列化方式
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//自定義分區(qū)分配器
properties.put("partitioner.class", "com.imooc.kafka.CustomPartitioner");
producer = new KafkaProducer<>(properties);
}

/**
* 創(chuàng)建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
* --replication-factor 1 --partitions 1 --topic kafka-study
* 創(chuàng)建消費(fèi)者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
* --topic imooc-kafka-study --from-beginning
*/
//發(fā)送消息,,發(fā)送完后不做處理
private static void sendMessageForgetResult() {
ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "ForgetResult");
producer.send(record);
producer.close();
}
//發(fā)送同步消息,,獲取發(fā)送的消息
private static void sendMessageSync() throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "sync");
RecordMetadata result = producer.send(record).get();
System.out.println(result.topic());//imooc-kafka-study
System.out.println(result.partition());//分區(qū)為0
System.out.println(result.offset());//已發(fā)送一條消息,此時(shí)偏移量 1
producer.close();
}
/**
* 創(chuàng)建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
* --replication-factor 1 --partitions 3 --topic kafka-study-x
* 創(chuàng)建消費(fèi)者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
* --topic kafka-study-x --from-beginning
*/
private static void sendMessageCallback() {
ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study-x", "name", "callback");
producer.send(record, new MyProducerCallback());
//發(fā)送多條消息
record = new ProducerRecord<>("kafka-study-x", "name-x", "callback");
producer.send(record, new MyProducerCallback());
producer.close();
}
//發(fā)送異步消息
//場景:每條消息發(fā)送有延遲,,多條消息發(fā)送,,無需同步等待,可以執(zhí)行其他操作,,程序會(huì)自動(dòng)異步調(diào)用
private static class MyProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
return;
}
System.out.println("*** MyProducerCallback ***");
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
}
}
public static void main(String[] args) throws Exception {
//sendMessageForgetResult();
//sendMessageSync();
sendMessageCallback();
}
}
自定義分區(qū)分配器:決定消息存放在哪個(gè)分區(qū).,。默認(rèn)分配器使用輪詢存放,輪到已滿分區(qū)將會(huì)寫入失敗,。

public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
//獲取topic所有分區(qū)
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfos.size();
//消息必須有key
if (null == keyBytes || !(key instanceof String)) {
throw new InvalidRecordException("kafka message must have key");
}
//如果只有一個(gè)分區(qū),,即0號(hào)分區(qū)
if (numPartitions == 1) {return 0;}
//如果key為name,發(fā)送至最后一個(gè)分區(qū)
if (key.equals("name")) {return numPartitions - 1;}
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
啟動(dòng)生產(chǎn)者發(fā)送消息,,通過自定義分區(qū)分配器分配,,查詢到topic信息的value、partitioner

?

Kafka消費(fèi)者(組)

* 自動(dòng)提交位移 * 手動(dòng)同步提交當(dāng)前位移 * 手動(dòng)異步提交當(dāng)前位移 * 手動(dòng)異步提交當(dāng)前位移帶回調(diào) * 混合同步與異步提交位移

public class MyConsumer {
private static KafkaConsumer<String, String> consumer;
private static Properties properties;
//初始化
static {
properties = new Properties();
//建立連接broker的地址
properties.put("bootstrap.servers", "127.0.0.1:9092");
//kafka反序列化
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//指定消費(fèi)者組
properties.put("group.id", "KafkaStudy");
}

//自動(dòng)提交位移:由consume自動(dòng)管理提交
private static void generalConsumeMessageAutoCommit() {
//配置
properties.put("enable.auto.commit", true);
consumer = new KafkaConsumer<>(properties);
//指定topic
consumer.subscribe(Collections.singleton("kafka-study-x"));
try {
while (true) {
boolean flag = true;
//拉取信息,,超時(shí)時(shí)間100ms
ConsumerRecords<String, String> records = consumer.poll(100);
//遍歷打印消息
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(
"topic = %s, partition = %s, key = %s, value = %s",
record.topic(), record.partition(), record.key(), record.value()
));
//消息發(fā)送完成
if (record.value().equals("done")) { flag = false; }
}
if (!flag) { break; }
}
} finally {
consumer.close();
}
}

//手動(dòng)同步提交當(dāng)前位移,,根據(jù)需求提交,但容易發(fā)送阻塞,,提交失敗會(huì)進(jìn)行重試直到拋出異常
private static void generalConsumeMessageSyncCommit() {
properties.put("auto.commit.offset", false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("kafka-study-x"));
while (true) {
boolean flag = true;
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(
"topic = %s, partition = %s, key = %s, value = %s",
record.topic(), record.partition(), record.key(), record.value()
));
if (record.value().equals("done")) { flag = false; }
}
try {
//手動(dòng)同步提交
consumer.commitSync();
} catch (CommitFailedException ex) {
System.out.println("commit failed error: " ex.getMessage());
}
if (!flag) { break; }
}
}

//手動(dòng)異步提交當(dāng)前位移,,提交速度快,但失敗不會(huì)記錄
private static void generalConsumeMessageAsyncCommit() {
properties.put("auto.commit.offset", false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("kafka-study-x"));
while (true) {
boolean flag = true;
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(
"topic = %s, partition = %s, key = %s, value = %s",
record.topic(), record.partition(), record.key(), record.value()
));
if (record.value().equals("done")) { flag = false; }
}
//手動(dòng)異步提交
consumer.commitAsync();
if (!flag) { break; }
}
}

//手動(dòng)異步提交當(dāng)前位移帶回調(diào)
private static void generalConsumeMessageAsyncCommitWithCallback() {
properties.put("auto.commit.offset", false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("kafka-study-x"));
while (true) {
boolean flag = true;
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(
"topic = %s, partition = %s, key = %s, value = %s",
record.topic(), record.partition(), record.key(), record.value()
));
if (record.value().equals("done")) { flag = false; }
}
//使用java8函數(shù)式編程
consumer.commitAsync((map, e) -> {
if (e != null) {
System.out.println("commit failed for offsets: " e.getMessage());
}
});
if (!flag) { break; }
}
}

//混合同步與異步提交位移
@SuppressWarnings("all")
private static void mixSyncAndAsyncCommit() {
properties.put("auto.commit.offset", false);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("kafka-study-x"));
try {
while (true) {
//boolean flag = true;
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(
"topic = %s, partition = %s, key = %s, " "value = %s",
record.topic(), record.partition(),
record.key(), record.value()
));
//if (record.value().equals("done")) { flag = false; }
}
//手動(dòng)異步提交,,保證性能
consumer.commitAsync();
//if (!flag) { break; }
}
} catch (Exception ex) {
System.out.println("commit async error: " ex.getMessage());
} finally {
try {
//異步提交失敗,,再嘗試手動(dòng)同步提交
consumer.commitSync();
} finally {
consumer.close();
}
}
}

public static void main(String[] args) {
//自動(dòng)提交位移
generalConsumeMessageAutoCommit();
//手動(dòng)同步提交當(dāng)前位移
//generalConsumeMessageSyncCommit();
//手動(dòng)異步提交當(dāng)前位移
//generalConsumeMessageAsyncCommit();
//手動(dòng)異步提交當(dāng)前位移帶回調(diào)
//generalConsumeMessageAsyncCommitWithCallback()
//混合同步與異步提交位移
//mixSyncAndAsyncCommit();
}
}
先啟動(dòng)消費(fèi)者等待接收消息,再啟動(dòng)生產(chǎn)者發(fā)送消息,,進(jìn)行消費(fèi)消息

?

?


————————————————
版權(quán)聲明:本文為CSDN博主「陳晨辰~」的原創(chuàng)文章,,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明,。
原文鏈接:https://blog.csdn.net/weixin_38004638/article/details/91975123

來源:https://www./content-4-601051.html

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請遵守用戶 評(píng)論公約

    類似文章 更多