久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

消費者與分區(qū)

 怡紅公子0526 2022-11-04 發(fā)布于北京

前言


我們知道,,生產(chǎn)者發(fā)送消息到主題,消費者訂閱主題(以消費者組的名義訂閱),,而主題下是分區(qū),,消息是存儲在分區(qū)中的,所以事實上生產(chǎn)者發(fā)送消息到分區(qū),,消費者則從分區(qū)讀取消息,,那么,這里問題來了,,生產(chǎn)者將消息投遞到哪個分區(qū),?消費者組中的消費者實例之間是怎么分配分區(qū)的呢?接下來,,就圍繞著這兩個問題一探究竟。

主題的分區(qū)數(shù)設(shè)置


在server.properties配置文件中可以指定一個全局的分區(qū)數(shù)設(shè)置,,這是對每個主題下的分區(qū)數(shù)的默認(rèn)設(shè)置,,默認(rèn)是1。

img

當(dāng)然每個主題也可以自己設(shè)置分區(qū)數(shù)量,,如果創(chuàng)建主題的時候沒有指定分區(qū)數(shù)量,,則會使用server.properties中的設(shè)置。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1

在創(chuàng)建主題的時候,,可以使用--partitions選項指定主題的分區(qū)數(shù)量

[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc       PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: abc      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: abc      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

生產(chǎn)者與分區(qū)


首先提出一個問題:生產(chǎn)者將消息投遞到分區(qū)有沒有規(guī)律,?如果有,那么它是如何決定一條消息該投遞到哪個分區(qū)的呢,?

默認(rèn)的分區(qū)策略

The default partitioning strategy:

  • If a partition is specified in the record, use it
  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose a partition in a round-robin fashion

org.apache.kafka.clients.producer.internals.DefaultPartitioner

默認(rèn)的分區(qū)策略是:

  • 如果在發(fā)消息的時候指定了分區(qū),,則消息投遞到指定的分區(qū)
  • 如果沒有指定分區(qū),但是消息的key不為空,,則基于key的哈希值來選擇一個分區(qū)
  • 如果既沒有指定分區(qū),,且消息的key也是空,則用輪詢的方式選擇一個分區(qū)
/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

通過源代碼可以更加作證這一點

分區(qū)與消費者


消費者以組的名義訂閱主題,,主題有多個分區(qū),,消費者組中有多個消費者實例,,那么消費者實例和分區(qū)之前的對應(yīng)關(guān)系是怎樣的呢?

換句話說,,就是組中的每一個消費者負責(zé)那些分區(qū),,這個分配關(guān)系是如何確定的呢?

img

同一時刻,,一條消息只能被組中的一個消費者實例消費

消費者組訂閱這個主題,,意味著主題下的所有分區(qū)都會被組中的消費者消費到,如果按照從屬關(guān)系來說的話就是,,主題下的每個分區(qū)只從屬于組中的一個消費者,,不可能出現(xiàn)組中的兩個消費者負責(zé)同一個分區(qū)。

那么,,問題來了,。如果分區(qū)數(shù)大于或者等于組中的消費者實例數(shù),那自然沒有什么問題,,無非一個消費者會負責(zé)多個分區(qū),,(PS:當(dāng)然,最理想的情況是二者數(shù)量相等,,這樣就相當(dāng)于一個消費者負責(zé)一個分區(qū)),;但是,如果消費者實例的數(shù)量大于分區(qū)數(shù),,那么按照默認(rèn)的策略(PS:之所以強調(diào)默認(rèn)策略是因為你也可以自定義策略),,有一些消費者是多余的,一直接不到消息而處于空閑狀態(tài),。

話又說回來,,假設(shè)多個消費者負責(zé)同一個分區(qū),那么會有什么問題呢,?

我們知道,,Kafka它在設(shè)計的時候就是要保證分區(qū)下消息的順序,也就是說消息在一個分區(qū)中的順序是怎樣的,,那么消費者在消費的時候看到的就是什么樣的順序,,那么要做到這一點就首先要保證消息是由消費者主動拉取的(pull),其次還要保證一個分區(qū)只能由一個消費者負責(zé),。倘若,,兩個消費者負責(zé)同一個分區(qū),那么就意味著兩個消費者同時讀取分區(qū)的消息,,由于消費者自己可以控制讀取消息的offset,,就有可能C1才讀到2,而C1讀到1,,C1還沒處理完,,C2已經(jīng)讀到3了,,則會造成很多浪費,因為這就相當(dāng)于多線程讀取同一個消息,,會造成消息處理的重復(fù),,且不能保證消息的順序,這就跟主動推送(push)無異,。

消費者分區(qū)分配策略

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

如果是自定義分配策略的話可以繼承AbstractPartitionAssignor這個類,,它默認(rèn)有3個實現(xiàn)

range

range策略對應(yīng)的實現(xiàn)類是org.apache.kafka.clients.consumer.RangeAssignor

這是默認(rèn)的分配策略

可以通過消費者配置中partition.assignment.strategy參數(shù)來指定分配策略,它的值是類的全路徑,,是一個數(shù)組

/**
 * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p1, t1p0, t1p1]
 * C1: [t0p2, t1p2]
 */

range策略是基于每個主題的

對于每個主題,,我們以數(shù)字順序排列可用分區(qū),以字典順序排列消費者,。然后,,將分區(qū)數(shù)量除以消費者總數(shù),以確定分配給每個消費者的分區(qū)數(shù)量,。如果沒有平均劃分(PS:除不盡),,那么最初的幾個消費者將有一個額外的分區(qū)。

簡而言之,,就是,,

1、range分配策略針對的是主題(PS:也就是說,,這里所說的分區(qū)指的某個主題的分區(qū),,消費者值的是訂閱這個主題的消費者組中的消費者實例)

2、首先,,將分區(qū)按數(shù)字順序排行序,,消費者按消費者名稱的字典序排好序

3、然后,,用分區(qū)總數(shù)除以消費者總數(shù),。如果能夠除盡,,則皆大歡喜,,平均分配;若除不盡,,則位于排序前面的消費者將多負責(zé)一個分區(qū)

例如,,假設(shè)有兩個消費者C0和C1,兩個主題t0和t1,,并且每個主題有3個分區(qū),,分區(qū)的情況是這樣的:t0p0,t0p1,,t0p2,,t1p0,,t1p1,t1p2

那么,,基于以上信息,,最終消費者分配分區(qū)的情況是這樣的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

為什么是這樣的結(jié)果呢?

因為,,對于主題t0,,分配的結(jié)果是C0負責(zé)P0和P1,C1負責(zé)P2,;對于主題t2,,也是如此,綜合起來就是這個結(jié)果

上面的過程用圖形表示的話大概是這樣的:

img

閱讀代碼,,更有助于理解:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
    //    主題與消費者的映射                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();    //    主題
        List<String> consumersForTopic = topicEntry.getValue();    //    消費者列表

        //    partitionsPerTopic表示主題和分區(qū)數(shù)的映射
        //    獲取主題下有多少個分區(qū)
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;

        //    消費者按字典序排序
        Collections.sort(consumersForTopic);

        //    分區(qū)數(shù)量除以消費者數(shù)量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //    取模,,余數(shù)就是額外的分區(qū)
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //    分配分區(qū)
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

roundrobin(輪詢)

roundronbin分配策略的具體實現(xiàn)是org.apache.kafka.clients.consumer.RoundRobinAssignor

/**
 * The round robin assignor lays out all the available partitions and all the available consumers. It
 * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
 * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
 * will be within a delta of exactly one across all consumers.)
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p2, t1p1]
 * C1: [t0p1, t1p0, t1p2]
 *
 * When subscriptions differ across consumer instances, the assignment process still considers each
 * consumer instance in round robin fashion but skips over an instance if it is not subscribed to
 * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
 * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
 * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
 * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
 *
 * Tha assignment will be:
 * C0: [t0p0]
 * C1: [t1p0]
 * C2: [t1p1, t2p0, t2p1, t2p2]
 */

輪詢分配策略是基于所有可用的消費者和所有可用的分區(qū)的

與前面的range策略最大的不同就是它不再局限于某個主題

如果所有的消費者實例的訂閱都是相同的,那么這樣最好了,,可用統(tǒng)一分配,,均衡分配

例如,假設(shè)有兩個消費者C0和C1,,兩個主題t0和t1,,每個主題有3個分區(qū),分別是t0p0,,t0p1,,t0p2,t1p0,,t1p1,,t1p2

那么,最終分配的結(jié)果是這樣的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

用圖形表示大概是這樣的:

img

假設(shè),,組中每個消費者訂閱的主題不一樣,,分配過程仍然以輪詢的方式考慮每個消費者實例,但是如果沒有訂閱主題,,則跳過實例,。當(dāng)然,這樣的話分配肯定不均衡,。

什么意思呢,?也就是說,消費者組是一個邏輯概念,,同組意味著同一時刻分區(qū)只能被一個消費者實例消費,,換句話說,同組意味著一個分區(qū)只能分配給組中的一個消費者,。事實上,,同組也可以不同訂閱,,這就是說雖然屬于同一個組,但是它們訂閱的主題可以是不一樣的,。

例如,,假設(shè)有3個主題t0,t1,,t2,;其中,t0有1個分區(qū)p0,,t1有2個分區(qū)p0和p1,,t2有3個分區(qū)p0,p1和p2,;有3個消費者C0,,C1和C2;C0訂閱t0,,C1訂閱t0和t1,,C2訂閱t0,t1和t2,。那么,,按照輪詢分配的話,C0應(yīng)該負責(zé)

首先,,肯定是輪詢的方式,,其次,比如說有主題t0,,t1,,t2,它們分別有1,,2,,3個分區(qū),也就是t0有1個分區(qū),,t1有2個分區(qū),,t2有3個分區(qū);有3個消費者分別從屬于3個組,,C0訂閱t0,,C1訂閱t0和t1,,C2訂閱t0,,t1,t2,;那么,,按照輪詢分配的話,,C0應(yīng)該負責(zé)t0p0,C1應(yīng)該負責(zé)t1p0,,其余均由C2負責(zé),。

上述過程用圖形表示大概是這樣的:

img

為什么最后的結(jié)果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

這是因為,按照輪詢t0p1由C0負責(zé),,t1p0由C1負責(zé),,由于同組,C2只能負責(zé)t1p1,,由于只有C2訂閱了t2,,所以t2所有分區(qū)由C2負責(zé),綜合起來就是這個結(jié)果

細想一下可以發(fā)現(xiàn),,這種情況下跟range分配的結(jié)果是一樣的

測試代碼

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cjs.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka-demo</name>
    <description></description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
package com.cjs.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class HelloProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e) {
                        e.printStackTrace();
                    }else {
                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
                    }
                }
            });
        }
        producer.close();

    }
}
package com.cjs.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class HelloConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
//        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("foo", "bar", "abc"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點,。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多