KafkaMirror Maker Setup Notes

Luckily, Kafka Mirror Maker was used to synchronize data in both locations. Here's your personal notes (not finished).

Open JMX

Monitoring Kafka's sensitive data using JMX requires opening the JMX port, which can be achieved by setting environment variables.My own practice is to add the following configuration to the kafka-server-start.sh script file

...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
    # Increase JMX_PORT configuration
    export JMX_PORT="9999"
...

Consumption Configuration

# Consumer Target Cluster
bootstrap.servers=<kafka1host_from>:<port>,<kafka2host_from>:<port>,<kafka3host_from>:<port>
# ID of consumption group
group.id=mm-test-consumer-group
# Select the start of the mirrored data?That is, mirror the data after MirrorMaker starts, the parameter latest, or the data before MirrorMaker starts, the parameter earliest
auto.offset.reset=earliest
# Consumer heartbeat data, default 3000, set here to 30 seconds due to remote mirroring
heartbeat.interval.ms=30000
# Consumer connection timeout, default 10000, set here to 100 seconds due to remote mirroring
session.timeout.ms=100000
# Changing the partition policy, defaulting to range, can lead to unfairness, especially when mirroring a large number of themes and partitions
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
# Maximum number of record s executed by a single poll(), defaulting to 500
max.poll.records=20000
# tcp receive buffer size when reading data, default is 65536 (64KiB)
receive.buffer.bytes=4194304
# Set the total size of each partition, defaulting to 1048576
max.partition.fetch.bytes=10485760

Producer Configuration

# Producer Purpose Cluster
bootstrap.servers=<kafka1host_to>:<port>,<kafka2host_to>:<port>,<kafka3host_to>:<port>
# Turn on compression
compression.type=snappy

Execute Command

nohup bin/kafka-mirror-maker --consumer.config config/consumer.properties --num.streams 2 --producer.config config/producer.properties --whitelist '*.test|*.temp' >/var/log/mirrormaker/mirrormaker.log 2>&1 & 

Explain:

  1. --num.streams: A stream is a consumer, and all consumers share a producer.
  2. --whitelist: A whitelist indicating a need for synchronization can use'|'to connect multiple topic s or a java-style regular expression, which corresponds to a blacklist.
  3. For log files, it is recommended that you place them in the / var/log/folder and use logrotate for periodic management to make the logs searchable and avoid filling the entire disk with log files.
  4. Mirror Maker is generally deployed with a destination cluster, where a reliable producer (entering data into the destination cluster) is more important, and a consumer who cannot connect to the cluster is much safer than a producer who cannot connect to the cluster.

Configuration to keep data intact

For consumer

enable.auto.commit=false

For producer

max.in.flight.requests.per.connection=1
retries=Integer.MAX_VALUE

max.block.ms=Long.MAX_VALUE

For MirrorMaker

Increase parameters

--abort.on.send.failure

Effect on MirrorMaker

  • Mirror maker will only send one request to a broker at any given point.
  • If any exception is caught in mirror maker thread, mirror maker will try to commit the acked offsets then exit immediately.
  • For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
  • For None-retriable exception, if --abort.on.send.fail is specified, stops the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.

As the last point stated if there is any error occurred your mirror maker process will be killed. So users are recommend to use a watchdog process like supervisord to restart the killed mirrormaker process.

Summary of issues

Java Version Switching

When installing multiple versions of java, sometimes debugging requires switching between multiple versions of java. This can be done with the command update-alternatives, which is

# update-alternatives --config java
...
# update-alternatives --config javac

Then choose what you want based on the java version listed.

kafka cannot consume data

After the kafka mirror maker operation, the destination cluster cannot consume data (except kafka-console-consumer of Kafka itself)

Keywords: Linux kafka Java Session Apache

Added by cheesemunger on Mon, 29 Jul 2019 04:14:58 +0300