Categories
Uncategorized

kafka news distribution policy analysis

When we use when specifying Topic kafka to send a message, if the Topic has multiple partition, no matter how many consumers will eventually ensure messages within a partition will only be a consumer of a Consumer Consumer group, that is to say the same Consumer the group will play automatically Consumer plurality of load balancing.

1, message construction

Here we send allocation strategy partition when to Topic message for calling kafka API, its internal analysis under specific source code implementation.

The key information can be specified Topic, partition, key, value, etc. of the message to be transmitted when the first look constructor API message body ProducerRecord class kafka, can see the configuration message.

    /**
     * Creates a record to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers The headers that will be included in the record
     */
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable
headers) { this(topic, partition, null, key, value, headers); } /** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); } /** * Create a record to be sent to Kafka * * @param topic The topic the record will be appended to * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }

2, distribution policy

In actual use, we generally do not specify a specific partition message sent, it will only pass key value, something like this way:

producer.send(new ProducerRecord(topic, key, data));

The hash value will be kafka you pass key by Methods I as much as possible to ensure a relatively uniform message can be apportioned to each available Partition;

Here is the internal kafka default distribution policy:

public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //

Get the list of partitions for this topic

List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); //

If the key value is null

if (keyBytes == null) { //

Maintain a value and increment value for the implementation of the key topic of ConcurrentHashMap, by way of CAS operations +1

int nextValue = nextValue(topic); //

Get a list of available partitions the topic

List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) {//如果可用分区大于0 //

Perform a redundant operation to ensure that the message falls on the available partition

int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { //

Partition not available, then it is not a given partition

return Utils.toPositive(nextValue) % numPartitions; } } else { //

By calculating the key hash, determine a message partition

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { //

Gets a AtomicInteger objects

AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) {//如果为空 //

Generates a random number

counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); //

Maintenance in the topicCounterMap

AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } //

The return value is incremented and execution

return counter.getAndIncrement(); } public void close() {} }

3, load custom policy

We can also customize the distribution strategy by implementing the Partitioner interface, take a look at the specific implementation of

Customize the implementation of the Partitioner interface

/**
 * 自定义实现Partitioner接口
 *
 */
public class KeyPartitioner implements Partitioner {

    /**
     * 实现具体分发策略
     */
    @Override
    public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        List availablePartitions = cluster.availablePartitionsForTopic(topic);//

Pull available partition

if (key == null||key.equals("")) { int random = (int) (Math.random() * 10); int part = random % availablePartitions.size(); return availablePartitions.get(part).partition(); } return Math.abs(key.toString().hashCode() % 6); } @Override public void configure(Map configs) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } }

While at initialization kafka producer, increased custom configuration

Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,KeyPartitioner.class); //

Adding custom configuration

producer = new KafkaProducer(properties);

4, summary

These are the policy kafka message distribution performed some analysis with a custom extension, I hope to help all of us in the use of kafka, which are subject to inadequate and not the right place also pointed out that hope and generosity.

 

Focus on micro-channel public number to see more technical articles.

 

Leave a Reply