Kafka Initial Learning--Kafka Transaction Support

1. Implementing kafka transaction by spring annotation

1. Add Configuration to the Configuration Class in the previous article

/**
 * The following three methods are used for management
 * Configure Kafka Transaction Manager, which is the transaction management class provided by Kafka to us. We need to use the producer factory to create this transaction management class.
 * Note that we need to turn on transaction functionality in producerFactory and set Transaction IdPrefix.
 * TransactionIdPrefix It is a prefix used to generate Transactional.id.
 * @return
 */
@Bean
public ProducerFactory<Integer, String> producerFactory() {
    @SuppressWarnings("rawtypes")
	DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps());
    factory.transactionCapable();
    factory.setTransactionIdPrefix("trans");
    return factory;
}

@Bean
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
    KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
    return manager;
}

/**
 * Configure the Kafka producer attribute
 * @return
 */
@Bean
public Map<String, Object> senderProps() {
	Map<String, Object> props = new HashMap<>();
     //Configuration of Connected Address for Kafka Instances
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.135.128.39:9092,10.135.128.39:9093");
	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

	return props;
}

/**
 * KafkaTemplate  It also needs to be reconfigured
 * @param factory
 * @return
 */
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory factory) {
	return new KafkaTemplate<String, String>(factory);
}

2. New methods in kafkaSender.java

    /**
     * kafka  Annotation for Transaction
     */
    @Transactional
    public void TransactionSend(){
    	kafkaTemplate.send("mytopic", "test transactional annotation");
        throw new RuntimeException("fail");
    }

3. SpringBoot Start Class Testing

ConfigurableApplicationContext context = SpringApplication.run(SpringBootForKafkaApplication.class, args);
		KafkaSender sender = context.getBean(KafkaSender.class);
		
		sender.TransactionSend();

4. Test results
The console reported the message that the exception was thrown:

See if the tool message has succeeded:

You can see that there is no news at the current time.

2. Open a transaction using KafkaTemplate.executeInTransaction

1. New methods in kafkaSender.java

    /**
     * Open a transaction using KafkaTemplate.executeInTransaction
     */
    public void transactionSend2(){
    	 kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
             @Override
             public Object doInOperations(KafkaOperations kafkaOperations) {
                 kafkaOperations.send("mytopic", "test executeInTransaction");
                 throw new RuntimeException("fail");
                 //return true;
             }
         });
    }

2. Start Class Testing

ConfigurableApplicationContext context = SpringApplication.run(SpringBootForKafkaApplication.class, args);
		KafkaSender sender = context.getBean(KafkaSender.class);
		
		sender.transactionSend2();

3. Test results
The console reported an exception:

Tool messages are still missing.

Thirdly, there is a producer transaction, which is not described in detail here.

Keywords: kafka Apache Java Spring

Added by jaimitoc30 on Fri, 04 Oct 2019 03:06:45 +0300