- Tool Class Meaning Description
- kafka is one of the most typical and commonly used MQ technologies. It has gradually developed into a middleware for decoupling components of producers and consumers.
- code implementation
- Producer Tools
package com.tl.job002.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; importorg.apache.kafka.common.serialization.StringSerializer; /** * kafka Test Tool Class * @author tianliang */ public class KafkaProducerUtil { // Producer abstract object public KafkaProducer<String, String> producer; // Pass in brokerList, as hostname:port, separated by numbers public KafkaProducerUtil(String brokerList) { Properties props = new Properties(); // Server ip: port number, cluster separated by commas props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // key serializes the specified class props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value serializes the specified class props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Producer Object producer = new KafkaProducer<String, String>(props); } public void close(){ this.producer.close(); } public static void main(String[] args) { // Initialize broker list String brokerList = "sc-slave7:6667,sc-slave8:6667"; String topic="TestKafka"; // Initialize the producer tool class KafkaProducerUtil kafkaProducerUtil = new KafkaProducerUtil(brokerList); // Send hello, kafka to test_topic kafkaProducerUtil.producer.send(new ProducerRecord<String, String>( topic, "hello,Dawn Education!")); kafkaProducerUtil.close(); System.out.println("done!"); } }
- Consumer Tools
package com.tl.job002.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; importorg.apache.kafka.common.serialization.StringDeserializer; /** * Kafka Consumer Tools * * @author tianliang */ public class KafkaConsumerUtil { // Consumer Target public KafkaConsumer<String, String> kafkaConsumer; public KafkaConsumerUtil(String brokerList, String topic) { Properties props = new Properties(); // Server ip: port number, cluster separated by commas props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "sc-slave1:2181"); // Consumers can specify groups with arbitrary names. Note that consumers in the same consumer group can only consume once for the same partition. props.put(ConsumerConfig.GROUP_ID_CONFIG, "TestTL"); // Whether autocommit is enabled, default true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // Automatic submission interval 1s props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // key deserializes the specified class props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value deserializes the specified class, paying attention to the consistency between producer and consumer, or resolving the problem props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Consumer Target kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Arrays.asList(topic)); } public void close() { kafkaConsumer.close(); } public static void main(String[] args) { // Initialize broker list String brokerList = "sc-slave7:6667,sc-slave8:6667"; String topic = "TestKafka"; // Initialize the producer tool class KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(brokerList, topic); boolean runnable=true; while (runnable) { ConsumerRecords<String, String> records = kafkaConsumerUtil.kafkaConsumer .poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } kafkaConsumerUtil.close(); System.out.println("done!"); } }
Tianliang Education is a comprehensive Internet technology company engaged in large data cloud computing, artificial intelligence, education and training, product development, consulting services, talent selection.
The company was founded by a group of BAT and other Internet IT elites.
With the vision of "working happily, living earnestly and building a banner of IT vocational skill education", and with the mission of "making the world free of hard-to-find jobs",
Adhere to the value of "customer first, honesty, passion, embracing change".
Enabling students to improve their abilities wholeheartedly and practicing the original intention of technology to change their destiny.
More learning and discussion, please join us
Official-Dawn Big Data Exchange QQ Group-366784928
Group two-dimensional codes:
Official-Dawn web Front-end Exchange QQ Group-972788995
Group two-dimensional codes:
Welcome to pay attention to Dawn Education Public Number, Big Data Technology Information and Courses, Recruitment and Employment Dynamics, Education Information Dynamics, Business Process Sharing One-Stop Sharing, Official Wechat Public Number Two-Dimensional Number:
Reptilian and nlp qq q group 320349384
Hadoop & Spark & hive Technology Group 297585251
Education and Training Official Website: http://myhope365.com
Project R&D business Shangyun Science and Technology Official Website: http://shangyuninfo.com/
Dawn Education Open Course - From Little White to Big Man Chengji - Video Address Series: http://myhope365.com/news/index?id=66
Tianliangyun Classroom Small Program Edition, the following pictures of Wechat Scanning Code can be directly into learning!!!