How to create a Spring Cloud Stream Binder from scratch

You will learn how to develop a custom Spring Cloud Stream Binder from scratch

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration. It aims to build event driven microservices that communicate through one or more shared messaging systems.

The core component of Spring Cloud Stream is called "Binder", which is the key abstraction implemented by the most common messaging systems (such as Apache Kafka, Kafka Streams, Google PubSub, RabbitMQ, Azure EventHub and Azure ServiceBus).

In this article, you will learn how to develop a custom Spring Cloud Stream binder from scratch.


introduce

The official Spring Cloud Stream documentation already provides very basic instructions on how to implement your own Spring Cloud Stream binder.

The following is a short excerpt from the binder service provider interface that must be created to create a custom binder:

Binder SPI consists of many interfaces, off the shelf utility classes, and discovery policies that provide pluggable mechanisms for connecting to external middleware. The key to SPI is the binder interface, a strategy for connecting input and output to external middleware. The following list shows the definition of the binder interface:

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
  Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
  Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}


 

This is another document fragment, basically a mini tutorial on developing a Spring Cloud Stream binder:

A typical Binder implementation includes the following: a class to implement the Binder interface; a Spring @Configuration class to create Binder type bean s and middleware connection infrastructure; a META-INF/spring.binders file found on the class path, which contains one or more Binder definitions, as shown in the following example:

kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration


 

Although the above documentation is very helpful for getting started, it will certainly help if you have more detailed guidance and practical examples.

Develop custom Binder

Let's develop a custom Spring Cloud Stream binder, which consumes events by reading files and generates events by writing files!

Create a new Maven project with a pom.xml file similar to the following, including dependencies on Spring Cloud Stream:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-cloud-stream-custom-binder</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-custom-binder</name>
<description>A demo custom Spring Cloud Stream Binder</description>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring-cloud.version>Hoxton.RC1</spring-cloud.version>
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>


 

Technically, we only need to provide org.springframework.cloud.stream.binder.Binderimplementation, but in fact, the binder depends on the other two components we need to provide first: ProvisioningProvider and MessageProducer.

The ProvisioningProvider is responsible for providing consumer and producer destinations. It is particularly useful to transform the logical destinations contained in the application.yml or application.properties file in the physical destination reference (you can find the Spring Bean by the destination name, or just trim the target name like the following code snippet):

package com.example.springcloudstreamcustombinder.provisioners;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
    @Override
    public ProducerDestination provisionProducerDestination(
            final String name,
            final ProducerProperties properties) {
        return new FileMessageDestination(name);
    }
    @Override
    public ConsumerDestination provisionConsumerDestination(
            final String name,
            final String group,
            final ConsumerProperties properties) {
        return new FileMessageDestination(name);
    }
    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {
        private final String destination;
        private FileMessageDestination(final String destination) {
            this.destination = destination;
        }
        @Override
        public String getName() {
            return destination.trim();
        }
        @Override
        public String getNameForPartition(int partition) {
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
        }
    }
}

 

Message producer, which is different from the name, is responsible for consuming events and sending them as messages to client applications configured to use such events.

This is an example of a message producer implementation that polls for a file that matches the pruned target name and is located in the project path, archives the read message and discards the subsequent same message:

package com.example.springcloudstreamcustombinder.producers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import static java.nio.file.StandardOpenOption.*;
import static java.util.concurrent.TimeUnit.*;
public class FileMessageProducer extends MessageProducerSupport {
    public static final String ARCHIVE = "archive.txt";
    private final ConsumerDestination destination;
    private String previousPayload;
    public FileMessageProducer(ConsumerDestination destination) {
        this.destination = destination;
    }
    @Override
    public void doStart() {
        receive();
    }
    private void receive() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleWithFixedDelay(() -> {
            String payload = getPayload();
            if(payload != null) {
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
                archiveMessage(payload);
                sendMessage(receivedMessage);
            }
        }, 0, 50, MILLISECONDS);
    }
    private String getPayload() {
        try {
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
            String currentPayload = allLines.get(allLines.size() - 1);
            if(!currentPayload.equals(previousPayload)) {
                previousPayload = currentPayload;
                return currentPayload;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    private void archiveMessage(String payload) {
        try {
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}


Finally, with all the necessary components, we can implement our own Binder by extending the AbstractMessageChannelBinder class, providing the required constructors and overriding the inherited abstract methods:

package com.example.springcloudstreamcustombinder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import com.example.springcloudstreamcustombinder.producers.FileMessageProducer;
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import static java.nio.file.StandardOpenOption.*;
public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {
    public FileMessageBinder(
            String[] headersToEmbed,
            FileMessageBinderProvisioner provisioningProvider) {
        super(headersToEmbed, provisioningProvider);
    }
    @Override
    protected MessageHandler createProducerMessageHandler(
            final ProducerDestination destination,
            final ProducerProperties producerProperties,
            final MessageChannel errorChannel) throws Exception {
        return message -> {
            String fileName = destination.getName();
            String payload = new String((byte[])message.getPayload()) + "\n";
            try {
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }
    @Override
    protected MessageProducer createConsumerEndpoint(
            final ConsumerDestination destination,
            final String group,
            final ConsumerProperties properties) throws Exception {
        return new FileMessageProducer(destination);
    }
}


 

 

Last but not least, we need to provide Spring Configuration for the binder as follows:

package com.example.springcloudstreamcustombinder.config;
import com.example.springcloudstreamcustombinder.FileMessageBinder;
import com.example.springcloudstreamcustombinder.provisioners.FileMessageBinderProvisioner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FileMessageBinderConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
        return new FileMessageBinderProvisioner();
    }
    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
    }
}


 

And the related src/main/resources/META-INF/spring.binders file, which contains the binder name, followed by the Spring configuration qualified name of the binder:

myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration


 

congratulations! Your custom binder implementation is now complete and can be installed in the local Maven repository by running mvn clean install.

Test custom Binder

go to start.spring.io And use Cloud Stream as the only required dependency to build the Spring Boot 2.2.0 project (or simply click on this link and build the project from there).

In the dependencies section, add the custom binder dependency to the pom.xml file:


<dependency>
    <groupId>com.example</groupId>
    <artifactId>spring-cloud-stream-custom-binder</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>


 

Replace the src/main/resources/application.properties file with the following application.yml file, which allows logging of all events managed by Spring Cloud Stream:


logging:
    level:
        org.springframework.cloud.stream.messaging.DirectWithAttributesChannel: DEBUG


 

Replace the contents of the src/main/java/SpringCloudStreamCustomBinderDemo.java file with the following:

@SpringBootApplication
@EnableBinding({Sink.class, Source.class})
public class SpringCloudStreamCustomBinderDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamCustomBinderDemoApplication.class, args);
    }
    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public String handle(String message) {
        return String.format("Received: %s", message);
    }
}


 

Finally, add a file called "input" to the main project directory, and write something to it.

With this configuration, you can now start the application by running MVN spring boot: run. Relying on the custom Spring Cloud Stream binder we just implemented, you can: continue to use the events in the newly created input file; write the processing results to the output file named output; and track all previous readings in the archive.txt file Of.

summary

The official Spring Cloud Stream reference document is very helpful to implement your own binder, but it is definitely not enough.

As a result, creating your own binder implementation is almost effortless, although at first it looks like a daunting task. In addition, knowing how to build a custom Spring Cloud Stream binder has considerable niche and practical skills in the development field!

In addition, understanding how spring cloud stream binders work makes it easy to customize existing binders, rather than just build new ones (which, to be honest, may be less common in most use cases).

 

Thank you for reading! Welcome to leave a message or private message.

Talk about those things in detail

Keywords: Java Spring Maven Apache

Added by geroido on Mon, 23 Dec 2019 20:38:26 +0200