1. Implementing kafka transaction by spring annotation
1. Add Configuration to the Configuration Class in the previous article
/** * The following three methods are used for management * Configure Kafka Transaction Manager, which is the transaction management class provided by Kafka to us. We need to use the producer factory to create this transaction management class. * Note that we need to turn on transaction functionality in producerFactory and set Transaction IdPrefix. * TransactionIdPrefix It is a prefix used to generate Transactional.id. * @return */ @Bean public ProducerFactory<Integer, String> producerFactory() { @SuppressWarnings("rawtypes") DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps()); factory.transactionCapable(); factory.setTransactionIdPrefix("trans"); return factory; } @Bean public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory); return manager; } /** * Configure the Kafka producer attribute * @return */ @Bean public Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); //Configuration of Connected Address for Kafka Instances props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.135.128.39:9092,10.135.128.39:9093"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return props; } /** * KafkaTemplate It also needs to be reconfigured * @param factory * @return */ @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory factory) { return new KafkaTemplate<String, String>(factory); }
2. New methods in kafkaSender.java
/** * kafka Annotation for Transaction */ @Transactional public void TransactionSend(){ kafkaTemplate.send("mytopic", "test transactional annotation"); throw new RuntimeException("fail"); }
3. SpringBoot Start Class Testing
ConfigurableApplicationContext context = SpringApplication.run(SpringBootForKafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); sender.TransactionSend();
4. Test results
The console reported the message that the exception was thrown:
See if the tool message has succeeded:
You can see that there is no news at the current time.
2. Open a transaction using KafkaTemplate.executeInTransaction
1. New methods in kafkaSender.java
/** * Open a transaction using KafkaTemplate.executeInTransaction */ public void transactionSend2(){ kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() { @Override public Object doInOperations(KafkaOperations kafkaOperations) { kafkaOperations.send("mytopic", "test executeInTransaction"); throw new RuntimeException("fail"); //return true; } }); }
2. Start Class Testing
ConfigurableApplicationContext context = SpringApplication.run(SpringBootForKafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); sender.transactionSend2();
3. Test results
The console reported an exception:
Tool messages are still missing.