Luckily, Kafka Mirror Maker was used to synchronize data in both locations. Here's your personal notes (not finished).
Open JMX
Monitoring Kafka's sensitive data using JMX requires opening the JMX port, which can be achieved by setting environment variables.My own practice is to add the following configuration to the kafka-server-start.sh script file
... if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" # Increase JMX_PORT configuration export JMX_PORT="9999" ...
Consumption Configuration
# Consumer Target Cluster bootstrap.servers=<kafka1host_from>:<port>,<kafka2host_from>:<port>,<kafka3host_from>:<port> # ID of consumption group group.id=mm-test-consumer-group # Select the start of the mirrored data?That is, mirror the data after MirrorMaker starts, the parameter latest, or the data before MirrorMaker starts, the parameter earliest auto.offset.reset=earliest # Consumer heartbeat data, default 3000, set here to 30 seconds due to remote mirroring heartbeat.interval.ms=30000 # Consumer connection timeout, default 10000, set here to 100 seconds due to remote mirroring session.timeout.ms=100000 # Changing the partition policy, defaulting to range, can lead to unfairness, especially when mirroring a large number of themes and partitions partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor # Maximum number of record s executed by a single poll(), defaulting to 500 max.poll.records=20000 # tcp receive buffer size when reading data, default is 65536 (64KiB) receive.buffer.bytes=4194304 # Set the total size of each partition, defaulting to 1048576 max.partition.fetch.bytes=10485760
Producer Configuration
# Producer Purpose Cluster bootstrap.servers=<kafka1host_to>:<port>,<kafka2host_to>:<port>,<kafka3host_to>:<port> # Turn on compression compression.type=snappy
Execute Command
nohup bin/kafka-mirror-maker --consumer.config config/consumer.properties --num.streams 2 --producer.config config/producer.properties --whitelist '*.test|*.temp' >/var/log/mirrormaker/mirrormaker.log 2>&1 &
Explain:
- --num.streams: A stream is a consumer, and all consumers share a producer.
- --whitelist: A whitelist indicating a need for synchronization can use'|'to connect multiple topic s or a java-style regular expression, which corresponds to a blacklist.
- For log files, it is recommended that you place them in the / var/log/folder and use logrotate for periodic management to make the logs searchable and avoid filling the entire disk with log files.
- Mirror Maker is generally deployed with a destination cluster, where a reliable producer (entering data into the destination cluster) is more important, and a consumer who cannot connect to the cluster is much safer than a producer who cannot connect to the cluster.
Configuration to keep data intact
For consumer
enable.auto.commit=false
For producer
max.in.flight.requests.per.connection=1 retries=Integer.MAX_VALUE max.block.ms=Long.MAX_VALUE
For MirrorMaker
Increase parameters
--abort.on.send.failure
Effect on MirrorMaker
- Mirror maker will only send one request to a broker at any given point.
- If any exception is caught in mirror maker thread, mirror maker will try to commit the acked offsets then exit immediately.
- For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
- For None-retriable exception, if --abort.on.send.fail is specified, stops the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.
As the last point stated if there is any error occurred your mirror maker process will be killed. So users are recommend to use a watchdog process like supervisord to restart the killed mirrormaker process.
Summary of issues
Java Version Switching
When installing multiple versions of java, sometimes debugging requires switching between multiple versions of java. This can be done with the command update-alternatives, which is
# update-alternatives --config java ... # update-alternatives --config javac
Then choose what you want based on the java version listed.
kafka cannot consume data
After the kafka mirror maker operation, the destination cluster cannot consume data (except kafka-console-consumer of Kafka itself)