Integration of apache druid and kafka


In the previous article, we learned about the construction of apache druid and how to quickly import external data sources into apache druid for data analysis and use

In this article, we will talk about how apache druid can be used in a real project based on a simple application scenario

Business scenario

As shown below, it is a very common data analysis business. Generally speaking, many real-time or quasi real-time data (understood here as external data sources) need to be transferred through kafka, that is, sent to kafka,

apache druid provides the function of importing external data sources. It can receive data from the topic specified by kafka, and then support data analysis. After importing kafka data into apache druid, it can read the data through the program (background application) and read the data from kafka according to the actual business needs for logical processing

Finally, after the application processes the data, it writes to the library or outputs it as the data displayed on the large screen

On this basis, this process can be applied to many related scenarios. For example, the source data is the processing result from the big data engine, or the result obtained by the python program crawler

Let's give a complete demonstration of this process from operation to code implementation

Pre preparation

  • zookeeper and kafka built in docker or linux environment, and create a topic in advance
  • Start apache druid service

Do kafaka's data test to verify that topic can send and receive messages normally

1. apache druid console connection kafka

loada data select kafka

Just fill in kafka's connection information

Then wait all the way next for parsing. After parsing, check whether the following custom library name appears on the left through the query at the top

The above means that the data in a topic in kafka can be parsed into the apache druid library, and then the imported data can be managed and analyzed through apache druid

We might as well use sql to query. We can see that the data we just tested are displayed

2. Write a program to push messages to kafka regularly

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class KfkaTest {

    public static void main(String[] args) {
        AtomicLong atomicLong = new AtomicLong(1);
        Runnable runnable = new Runnable() {
            public void run() {
                //Push messages to kafka regularly
                long l = atomicLong.incrementAndGet();
        ScheduledExecutorService service = Executors
        // The second parameter is the delay time of the first execution, and the third parameter is the interval time of timed execution
        service.scheduleAtFixedRate(runnable, 10, 1, TimeUnit.SECONDS);

    public static void pushMessage(long num) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "IP:9092");
        properties.put("acks", "all");
        properties.put("retries", "3");
        properties.put("batch.size", "16384");
        properties.put("", 1);
        properties.put("buffer.memory", 33554432);
        //Serialization of key and value
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //Construct producer object
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ObjectMapper objectMapper = new ObjectMapper();
        Map<String, Object> map = new HashMap<>();
        map.put("name", "gaoliang:" + num);
        map.put("age", 19);
        map.put("city", "Shenzhen");
        String val = null;
        try {
            val = objectMapper.writeValueAsString(map);
        } catch (JsonProcessingException e) {
        producer.send(new ProducerRecord<>("study1", "congge ", val));
        //Close connection resource


3. Read the data of apache druid through the program

On this point, the method is very flexible. What is the processing of the read data? It depends on the specific needs of the business. For example, can the latest data read be returned to the page for display directly through the interface? Or do you store the data after logical processing? Or do you give it to other services for further use? Generally speaking, after reading, there are many application scenarios written to the library and displayed

Let's demonstrate how to read apache druid data in the program, which must be of concern to everyone

Add the following dependencies directly to the pom file


apache druid officially provides the connection method for querying data through jdbc. The following is the code directly

import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;

import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

public class DruidTest {

    private static final String DRUID_URL = "jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/";

    private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();

     * open a connection
     * @param
     * @return
     * @throws SQLException
    public static AvaticaConnection connection() throws SQLException {
        Properties properties = new Properties();
        AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
        return connection;

     * Close connection
     * @throws SQLException
    public static void closeConnection() throws SQLException{
        System.out.println("Close thread:"+threadLocal.get());
        AvaticaConnection conn = threadLocal.get();
        if(conn != null){

     * According to sql query results
     * @param
     * @param sql
     * @return
     * @throws SQLException
    public static ResultSet executeQuery (String sql) throws SQLException{
        AvaticaStatement statement = connection().createStatement();
        ResultSet resultSet = statement.executeQuery(sql);
        return resultSet;

    public static void main(String[] args) {
        try {
            String sql = "SELECT * FROM \"study1\" limit 10";
            for (int i = 0; i < 5; i++) {
                ResultSet resultSet = executeQuery(sql);
                System.out.println("Start connection"+i + ";   Connection thread:"+threadLocal.get());
                    String name = resultSet.getString("name");
                    System.out.println(name + "   ;   "+ name);
        } catch (SQLException throwables) {


At this time, you might as well push another message to kafka's topic of study1

Query on the interface and you can see that the data has come

Run the program again, and it can also be read successfully

Above, a simple business scenario of how to use java programs to connect kafka and apache druid is described through the program and console. This article is relatively simple and does not involve the integration of specific functional levels. It is mainly to pave the way for further in-depth use of apache druid. I hope it will be useful to the students!

Added by 486974 on Wed, 22 Sep 2021 02:11:30 +0300