Kafka: Topic concept and API introduction

Topic

Events are organized and persistently stored in Topic, which is similar to a folder in the file system, and events are the files in the folder. Topics in Kafka are always multi producers and multi subscribers: a Topic can have zero, one or more producers write events to it, or zero, one or more consumers subscribe to these events. The events in the Topic can be read at any time as needed. Unlike the traditional messaging system, the events will not be deleted after use. On the contrary, the time when Kafka should retain the events can be defined through the configuration of each Topic, and then the old events will be discarded. Kafka's performance is actually constant in terms of data size, so it is very good to store data for a long time.

Partition

Topics are partitioned, which means that a Topic can be distributed on multiple Kafka nodes. This distributed placement of data is important for scalability because it allows client applications to read and write data from Kafka nodes at the same time. When publishing a new event to a Topic, it will actually be attached to a Partition of the Topic. Events with the same event key will be written to the same Partition. Kafka guarantees that any user of the Partition of a given Topic will always read the events of the Partition in the same order as when writing.

Replication

In order to make the data fault-tolerant and highly available, each Topic can have multiple replications, so that multiple Kafka nodes always have data replicas to prevent problems. The common production setting is that the replicationFactor is 3, that is, there are always three copies of data. This Replication is performed at the Partition level of the Topic.

Kafka replicates the Partition of each topic on a specified number of servers (through replicationFactor), which allows automatic failover in case of failure of some servers in the cluster, so that the service is still available in case of failure. The unit of Replication is the Partition of topic. Under non fault conditions, each Partition in Kafka has a leader and zero or more followers. replicationFactor is the total number of replicas, including leaders. All read and write operations will go to the leader of the Partition. Generally, there are many more partitions than Kafka nodes, and the leaders of these partitions are evenly distributed among Kafka nodes. The data on the follower needs to be synchronized with the data of the leader, and all data have the same offset and order (of course, at any given time, there may be some data at the end of the leader's data that has not been copied). Followers will use messages from leaders like ordinary Kafka consumers and apply them to their own data. As shown in the figure below, there are two topics (topic 0 and topic 1) on the three Kafka nodes. Topic 0 has two partitions and the replicationFactor is 3 (the red Partition is the leader). Topic 1 has three partitions and the replicationFactor is also 3 (the red Partition is the leader).

API

Add dependency:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

The Kafka clients version used here is consistent with the Kafka version previously deployed by the blogger:

client

The client operating Topic is created through the AdminClient abstract class. The source code is as follows:

package org.apache.kafka.clients.admin;

import java.util.Map;
import java.util.Properties;

public abstract class AdminClient implements Admin {

    /**
     * Create a new Admin with the given configuration
     * props: Admin Configuration of
     * Return KafkaAdminClient instance
     */
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    /**
     * Overloading methods 
     * Create a new Admin with the given configuration
     * props: Admin Configuration of
     * Return KafkaAdminClient instance
     */
    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient) Admin.create(conf);
    }
}

In fact, a KafkaAdminClient instance will be returned (KafkaAdminClient class is a subclass of AdminClient abstract class). There are many methods of KafkaAdminClient class, in which the private method serves the public method (the service provided to users).

The public method provided by KafkaAdminClient class is the implementation of Admin interface.

create

Create a new Topic.

package com.kaven.kafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class Admin {

    private static final AdminClient adminClient = Admin.getAdminClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Admin admin = new Admin();
        admin.createTopic();
        Thread.sleep(100000);
    }

    public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }
}

Create AdminClient (simply configure BOOTSTRAP_SERVERS_CONFIG):

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }

Create a Topic (pass in a new Topic instance, and configure the NewTopic instance with name, numPartitions and replicationFactor):

    public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

Most of the methods provided are in asynchronous programming mode. These basic knowledge will not be introduced. The output is shown in the following figure:

list

Get a list of topics.

    public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.
                listTopics(new ListTopicsOptions().listInternal(true));
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }

The get method waits for future to complete and then returns its results. The output is shown in the following figure:

The built-in Topic of Kafka can be obtained through the following configuration.

new ListTopicsOptions().listInternal(true)

By default, the built-in Topic of Kafka will not be obtained.

    public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }

delete

Delete Topic.

    public void deleteTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
        deleteTopicsResult.topicNameValues().forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

The output is shown in the following figure:

Now get the list of topics, and the output is as shown in the following figure (the deleted topics are no longer available):

describe

Gets the description of the Topic.

    public void describeTopic() {
        Map<String, KafkaFuture<TopicDescription>> values =
                adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
        for (String name : values.keySet()) {
            values.get(name).whenComplete((describe, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                System.out.println(describe);
            });
        }
    }

The output is shown in the following figure:

The output is as expected because the configuration that created the Topic is:

new NewTopic("new-topic-kaven", 1, (short) 1)

config

Get the configuration of Topic.

    public void describeTopicConfig() throws ExecutionException, InterruptedException {
        DescribeConfigsResult describeConfigsResult = adminClient
                .describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
        describeConfigsResult.all().get().forEach(((configResource, config) -> {
            System.out.println(configResource);
            System.out.println(config);
        }));
    }

The output is shown in the following figure:

Obviously, the describeconfigures method can also get the configuration of other resources (by specifying the type of resource).

    public enum Type {
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        ...
    }

alter

Incrementally update the configuration of Topic.

    public void incrementalAlterConfig() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        
        Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
        alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );
        
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
        alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
            voidKafkaFuture.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(configResource);
                latch.countDown();
            });
        }));
        latch.await();
    }

The output is shown in the following figure:

Obviously, the incrementalAlterConfigs method can also incrementally update the configuration of other resources (by specifying the type of resource).

ConfigResource defines the resource whose configuration needs to be modified, and collection < alterconfigop > defines the specific configuration modification operation of the resource.

Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();

The configEntry defines the configuration entry that the resource needs to modify, and the operationType defines the type of modification operation.

    public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
        this.configEntry = configEntry;
        this.opType =  operationType;
    }

Modify the type of operation.

    public enum OpType {
        /**
         * Set the value of the configuration entry
         */
        SET((byte) 0),
        /**
         * Restore the configuration entry to the default value (possibly empty)
         */
        DELETE((byte) 1),
        /**
         * Applies only to list type configuration entries
         * Adds the specified value to the current value of the configuration entry
         * If the configuration value has not been set, it is added to the default value
         */
        APPEND((byte) 2),
        /**
         * Applies only to list type configuration entries
         * Deletes the specified value from the current value of the configuration entry
         * It is legal to delete values that are not currently in the configuration entry
         * Deleting all entries from the current configuration value will leave an empty list and will not revert to the default value of the entry
         */
        SUBTRACT((byte) 3);
        ...
    }

Configuration entries of resources, including configuration name, value, etc.

public class ConfigEntry {

    private final String name;
    private final String value;
    private final ConfigSource source;
    private final boolean isSensitive;
    private final boolean isReadOnly;
    private final List<ConfigSynonym> synonyms;
    private final ConfigType type;
    private final String documentation;
    ...
}

These configuration entries can also be found in the output of getting Topic configuration.

Obviously, the compression of the Topic named new Topic Kaven is modified here Type configuration entry (compression type).

        alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );

compression. The default value of the type configuration entry is producer (which means that the original compression codec set by the producer is retained), which also corresponds to the figure above. The blogger modified the configuration entry to gzip.

Then get the configuration of the Topic, as shown in the following figure (obviously, the configuration has been modified successfully):

That's all for Kafka's Topic concept and API introduction. If the blogger is wrong or you have different opinions, you are welcome to comment and supplement.

Keywords: kafka Zookeeper Distribution

Added by maseeha on Mon, 03 Jan 2022 11:25:07 +0200