Preface
Usually, we are used to using script tools under bin directory in Kafka to manage and view Kafka, but sometimes we need to integrate some functions of management and view into the system (such as Kafka Manager), so we need to call some API s to operate Kafka directly. Before Kafka version 0.11.0.0, some cluster management operations can be implemented by Admin Client and Admin Utils under the kafka-core package (Kafka server code, written in Scala), such as the creation of Topic in Kafka parsing (1) and the creation of Topic in Kafka parsing (2). The AdminUtils class is used. After Kafka version 0.11.0.0, there is another AdminClient, which is under the kafka-client package. This is an abstract class. The concrete implementation is org. apache. kafka. clients. admin. Kafka AdminClient, which is the focus of this article.
Introduction to Functions and Principles
AdminClient: The AdminClient API support management and inspecting topics, brokers, acls, and other Kafka objects are described on Kafka's official website. Specific Kafka AdminClient includes several functions (based on Kafka version 1.0.0):
Create Topic
createTopics(Collection newTopics)
Delete Topic
deleteTopics(Collection topics)
List all Topic s
listTopics()
Query Topic
describeTopics(Collection topicNames)
Query Cluster Information
describeCluster()
Query ACL information
describeAcls(AclBindingFilter filter)
Create ACL information
createAcls(Collection acls)
Delete ACL information
deleteAcls(Collection filters)
Query configuration information
describeConfigs(Collection resources)
Modify configuration information
alterConfigs(Map<ConfigResource, Config> configs)
Modify the copy log directory
alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
Query the log directory information of the node
describeLogDirs(Collection brokers)
Query for log directory information for replicas
describeReplicaLogDirs(Collection replicas)
Adding partitions
createPartitions(Map<String, NewPartitions> newPartitions)
Its internal principle is to use a set of binary protocols customized by Kafka. See the Kafka protocol for details. The main steps are as follows:
1. Clients create corresponding protocol requests based on method calls, such as creating Topic's createTopics method, which sends CreateTopicRequest requests internally.
2. The client sends the request to Kafka Broker.
3. Kafka Broker processes the corresponding request and receives it back. For example, CreateTopicResponse corresponds to CreateTopicRequest.
4. The client receives the corresponding receipt and parses it.
The requests and receipts related to the protocol are basically in the org.apache.kafka.common.requests package. AbstractRequest and AbstractResponse are the two basic parent classes of these requests and receipts.
Sample code
package com.donwait; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; /** * kafka Management interface * @author Administrator */ public class KafkaAdmin { private AdminClient adminClient = null; public KafkaAdmin(String brokerList){ Properties props = new Properties(); // kafka server address: IP:PORT, multiple separated by commas props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // New Management Client adminClient = AdminClient.create(props); } /** * New theme * @param topic * @return */ public boolean createTopic(String topic, int partitionNum, short replicationFactor){ ArrayList<NewTopic> topics = new ArrayList<NewTopic>(); NewTopic newTopic = new NewTopic(topic, partitionNum, replicationFactor); topics.add(newTopic); CreateTopicsResult result = adminClient.createTopics(topics); boolean success = false; try { result.all().get(); success = true; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return success; } /** * Delete theme * @param topic * @return */ public boolean deleteTopic(String topic){ DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList(topic)); boolean success = false; try { result.all().get(); success = true; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return success; } public List<String> findAllTopic(){ List<String> lst = new ArrayList<String>(); ListTopicsResult result = adminClient.listTopics(); try { Set<String> s = result.names().get(); for(String str: s){ lst.add(str); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return lst; } }
system configuration
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.easystudy</groupId> <artifactId>kafkaClient</artifactId> <version>0.0.1-SNAPSHOT</version> <!-- Project attributes:Submodules cannot reference parent projects properties variable --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- Log Pack --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <!-- Will automatically download scala-library and zookeeper Dependency Libraries --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.2.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/main/test</testSourceDirectory> <plugins> <!-- maven-compiler-plugin,He can't package all the libraries that the project relies on together jar inside --> <!-- All dependencies need to be made jar Packing together jar In this way, we can make jar Running independently, plug-ins must be added maven-assembly-plugin --> <!-- take pom All dependencies are packaged into one jar In the package: export become runnable jar The way the package works is not feasible. --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.dondown.WordCountApp</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- The following two lines are for student control jar The name of the package will be increased by default jar-with-dependencies Suffix --> <finalName>kafkaClient</finalName> <appendAssemblyId>false</appendAssemblyId> </configuration> <!-- Relevant to package Packing steps --> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>