TIDB - synchronize data to downstream Kafka using TICDC

1, TICDC

In the previous article, we introduced how to use TICDC to synchronize data to Mysql. From the previous task, we can see that TICDC is much simpler in configuration than tidb binlog, and we also know that the performance of TICDC is also better than tidb binlog. Today, we learn how to use TICDC to synchronize data to downstream Kafka to realize TIDB to ES, MongoDB Synchronization of Redis and other NoSql databases.

Last blog address:

https://blog.csdn.net/qq_43692950/article/details/121731278

Note: to use TiCDC, you need to upgrade the TIDB version to above v4.0.6.

2, TICDC configuration data synchronization Kafka

This article continues with the previous article. Let's take a look at the current cluster status:


It is also the CDC server we expanded in the previous article.

In the previous article, we created the task of data synchronization from TIDB to mysql. Now we create another task of data synchronization to Kafka:

./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"

Tidb CDC: indicates topic
Kafka version: downstream Kafka version number (optional, the default value is 2.4.0, and the currently supported minimum version is 0.11.0.2)
Kafka client ID: Specifies the ID of the Kafka client of the synchronization task (optional. The default value is the ID of the TiCDC_sarama_producer_synchronization task)
Partition num: downstream Kafka partition quantity (optional), which cannot be greater than the actual partition quantity. If it is not filled in, the partition quantity will be obtained automatically.
Protocol: indicates the message protocol output to kafka. The optional values are default, canal, avro, maxwell, canal JSON (the default value is default)
Max message bytes: the maximum data amount of messages sent to Kafka broker each time (optional, the default value is 64MB)
Replication factor: the number of copies of kafka messages saved (optional, default 1)
CA: CA certificate file path required to connect downstream Kafka instances (optional)
cert: path to the certificate file required to connect the downstream Kafka instance (optional)
Key: the path of the certificate key file required to connect the downstream Kafka instance (optional)


Created successfully.

All tasks can be seen by using the following command:

./cdc cli changefeed list --pd=http://192.168.40.160:2379

Or view the details of our task:

./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2

3, SpringBoot Kafka listening

Add POM dependency

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application

server:
  port: 8081

spring:
  kafka:
    # kafka server address (multiple)
#    bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
    bootstrap-servers: 192.168.40.1:9092
    consumer:
      # Specify a default group name
      group-id: kafkaGroup
      # Earlist: when there is a committed offset under each partition, the consumption starts from the submitted offset; when there is no committed offset, the consumption starts from the beginning
      # latest: when there is a submitted offset under each partition, consumption starts from the submitted offset; when there is no submitted offset, consumption of the newly generated data under the partition
      # none:topic when there is a committed offset in each partition, consumption starts after the offset; as long as there is no committed offset in one partition, an exception is thrown
      auto-offset-reset: earliest

      # Deserialization of key/value
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # Serialization of key/value
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Batch fetching 
      batch-size: 65536
      # Cache capacity
      buffer-memory: 524288
      #Failed retries
      retries: 3
      # server address
#      bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092

Consumer monitoring events

@Slf4j
@Component
public class Jms_Consumer {

    @KafkaListener(topics = "tidb-cdc")
    public void receive4(ConsumerRecord<?, ?> consumer) throws Exception {
        System.out.println("tidb tidb-cdc  Listener >> ");
        JSONObject jsonObject = JSONObject.parseObject(new String(consumer.value()));
        String type = jsonObject.getString("type");
        String db = jsonObject.getString("database");
        String table = jsonObject.getString("table");
        String data = jsonObject.getString("data");
        log.info("Operation type:{}",type);
        log.info("Database:{}",db);
        log.info("data sheet:{}",table);
        log.info("Updated data:{}",data);
    }

}

4, Test data synchronization

Insert data into TIDB:

insert into user(name,age) value('bxc','25');

kafka accept JSON

{
	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "INSERT",
	"es": 1638698748819,
	"ts": 0,
	"sql": "",
	"sqlType": {
		"age": -5,
		"id": -5,
		"name": 12
	},
	"mysqlType": {
		"age": "int",
		"id": "int",
		"name": "varchar"
	},
	"data": [{
		"age": "25",
		"id": "242219",
		"name": "bxc"
	}],
	"old": [null]
}


Update data:

update user set age=24 where name = 'bxc';

Kafka accepts JSON

{
	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "UPDATE",
	"es": 1638699660093,
	"ts": 0,
	"sql": "",
	"sqlType": {
		"age": -5,
		"id": -5,
		"name": 12
	},
	"mysqlType": {
		"age": "int",
		"id": "int",
		"name": "varchar"
	},
	"data": [{
		"age": "24",
		"id": "242216",
		"name": "bxc"
	}],
	"old": [{
		"age": "23",
		"id": "242216",
		"name": "bxc"
	}]
}


Delete data:

delete from user where name = 'bxc';

Kafka accepts JSON

{
	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "DELETE",
	"es": 1638699773943,
	"ts": 0,
	"sql": "",
	"sqlType": {
		"age": -5,
		"id": -5,
		"name": 12
	},
	"mysqlType": {
		"age": "int",
		"id": "int",
		"name": "varchar"
	},
	"data": [{
		"age": "25",
		"id": "242218",
		"name": "bxc"
	}],
	"old": [{
		"age": "25",
		"id": "242218",
		"name": "bxc"
	}]
}

5, Expand

Stop synchronization task:

./cdc cli changefeed pause --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2

Delete synchronization task

./cdc cli changefeed remove --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2


Love little buddy can pay attention to my personal WeChat official account and get more learning materials.

Keywords: Java kafka tidb

Added by clem_c_rock on Sun, 05 Dec 2021 15:23:30 +0200