Flink notes: Flink integrates Kafka

1. Preparation

1.ZooKeeper cluster installation: if you do not know about ZooKeeper installation, please click the link to learn: Installation of ZooKeeper cluster
2.Kafka cluster installation: if you do not know the installation of ZooKeeper, please click the link to learn: Installation of Kafka cluster
Learn from Flink to integrate Kafka. 1. First, you need to check whether Kafka cluster is installed. Kafka needs the support of ZooKeeper to complete the high availability of Kafka cluster. 2. Secondly, it needs to check whether ZooKeeper cluster has been installed
To ensure exactly once, Kafka version 1.0.0 and above is recommended here. The minimum version also needs to be 0.11.x. If the version is lower, it can't guarantee exactly once
If you have not installed ZooKeeper cluster and Kafka cluster, please refer to the above link to install. After the installation, let's learn how Flink integrates Kafka to consume information.

2. Official documents

Attachment: Flink integration Kafka, I am the official document (English version)

3.Flink integrates Kafka

3.1 join Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.1</version>
</dependency>

3.2 programming, consuming Kafka information

/**
 * TODO Reading data from Kafka
 * It is a Source that can be parallelized and can implement exactly once
 * @author liuzebiao
 * @Date 2020-2-5 18:29
 */
public class KafkaSource {

    public static void main(String[] args) throws Exception {
        //1. Create a Flink real-time environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //Kafka props
        Properties properties = new Properties();
        //Specify the Broker address of Kafka
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092");
        //Specified group ID
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "flinkDemoGroup");
        //If the offset is not recorded, the first consumption is from the beginning
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer("flink_demo", new SimpleStringSchema(), properties);
        //2. Create Kafka DataStream through addSource()
        DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);
        //3.Sink (no operation here, output directly to the console)
        kafkaDataStream.print();
        //4. Perform tasks
        env.execute("KafkaSource");
    }
}

3.3 cluster startup

To start a cluster, you need to pay attention to the order. You need to start the ZooKeeper cluster first, and then the Kafka cluster.
Please refer to the following link for cluster startup mode: Installation of ZooKeeper cluster,Installation of Kafka cluster (there are startup modes of ZooKeeper and Kafka clusters in the link)

3.4 test

3.4.1 Kafka creates a Topic

The create Topic command is as follows:

bin/kafka-topics.sh --create --zookeeper 192.168.204.201:2181,192.168.204.202:2181,192.168.204.203:2181 --replication-factor 1 --partitions 3 --topic test

Analysis:
Bin / kafka-topics.sh -- create ----- > Kafka's own command -- create means to create topic
-- zookeeper xxx.xxx.xxx.xxx: 2181 ----- > zookeeper cluster address
-- replication factor 1 ----- > number of backups (1 backup)
-- partitions 3 ----- > number of Kafka partitions (three partitions are divided)
-- topic test ----- > the name of the topic to be created

3.4.2 write data to Kafka

Here, we write some messages to Kafka in the form of commands. The command is as follows:

248 original articles published, 45 praised, 50000 visitors+
Private letter follow

Keywords: kafka Zookeeper Maven Apache

Added by airric on Fri, 07 Feb 2020 10:52:02 +0200