Detailed records of Apache Druid stand-alone environment construction and basic use

Apache Druid

Apache Druid is a real-time analytical database designed for rapid query analysis ("OLAP" queries) on large data sets. Druid is most often used as a database to support application scenarios of real-time ingestion, high-performance query and high stable operation. At the same time, Druid is also usually used to help the graphical interface of analytical applications, or as a high concurrency back-end API that needs rapid aggregation. Druid is most suitable for event oriented data.

Druid features

Column storage: Druid uses column storage, which means that it only needs to query specific columns in a specific data query, which greatly improves the performance of some column query scenarios. In addition, each column of data is optimized for specific data types to support rapid scanning and aggregation.

As a scalable distributed system, Druid is usually deployed in a cluster of dozens to hundreds of servers, and can provide the receiving rate of millions of records per second, the reserved storage of trillions of records, and the query delay of sub second to several seconds.

Large scale parallel processing, Druid can process queries in parallel in the whole cluster.

Real time or batch ingestion, Druid can ingest data in real time (the data that has been ingested can be used for query immediately) or in batch.

It is self-healing, self balancing and easy to operate. As a cluster operation and maintenance operator, if you want to scale the cluster, you only need to add or delete services, and the cluster will automatically rebalance itself in the background without causing any downtime. If any Druid server fails, the system will automatically bypass the damage. Druid is designed for 7 * 24-hour operation without planned downtime for any reason, including configuration changes and software updates.

Cloud native fault-tolerant architecture that will not lose data. Once Druid ingests data, Replicas are safely stored on deep storage media (usually cloud storage, HDFS or shared file system). Even if a druid service fails, your data can be recovered from deep storage. For limited failures that affect only a few Druid services, replicas ensure that queries can still be made during system recovery.

Index for fast filtering. Druid uses the compact or roaming bitmap index to create an index to support fast filtering and cross column search.

For time-based partitioning, Druid first partitions the data according to time. In addition, Druid can partition according to other fields at the same time. This means that time-based queries will only access partitions that match the query time range, which will greatly improve the performance of time-based data.

Approximate algorithm, Druid applies the algorithms of approximate count distinct, approximate sorting, approximate histogram and quantile calculation. These algorithms take up limited memory usage and are usually much faster than accurate calculations. For scenes where accuracy is more important than speed, Druid also provides accurate count distinct and accurate sorting.

Automatic summary and aggregation during ingestion. Druid supports optional data summary in the data ingestion stage. This summary will partially aggregate your data in advance, which can save a lot of cost and improve performance.

In what scenario should Druid be used

The data insertion frequency is high, but the data is rarely updated

Most query scenarios are aggregate query and group query( GroupBy),At the same time, there must be retrieval and scanning query

Locate the data query delay target between 100 milliseconds and a few seconds

Data has a time attribute( Druid (optimized and designed for time)

In the multi table scenario, each query hits only one large distributed table, and the query may hit multiple smaller distributed tables lookup surface

The scene contains high base dimension data columns (for example URL,user ID And need to quickly count and sort them

Need from Kafka,HDFS,Object storage (e.g Amazon S3)Load data in

Druid is usually applied to the following scenarios:

Click stream analysis( Web End and mobile end)
Network monitoring analysis (network performance monitoring)
Service indicator storage
 Supply chain analysis (manufacturing indicators)
Application performance index analysis
 Digital advertising analysis
 Business intelligence / OLAP

architecture design

Druid has a multi process, distributed architecture designed to be cloud friendly and easy to operate. Each Druid process can be configured and expanded independently, providing maximum flexibility on the cluster. This design also provides enhanced fault tolerance: an interruption of one component does not immediately affect other components.

Processes and services

Druid has several different types of processes, which are briefly described as follows:

Coordinator Data availability in process management cluster

Overlord Process control data ingestion load distribution

Broker The process processes query requests from external clients

Router Process is an optional process that routes requests to Brokers,Coordinators and Overlords

Historical The process stores queryable data

MiddleManager The process is responsible for ingesting data

Druid processes can be deployed as you like, but for ease of deployment, it is recommended to organize them into three server types: Master, Query and Data.

Master: function Coordinator and Overlord Process, manage data availability and ingestion

Query: function Broker And optional Router Process to process requests from external clients

Data: function Historical and MiddleManager Process, execute load ingestion and store all queryable data

Storage design

For more overview, please refer to the Chinese website: http://www.apache-druid.cn/ …

Install Jdk

Druid service operation depends on Java 8

https://www.oracle.com/java/technologies/downloads/#java8

Unzip to the appropriate directory

tar -zxvf jdk-8u311-linux-x64.tar.gz -C /usr/local/
cd /usr/local
mv jdk1.8.0_311 jdk1.8

Setting environment variables

export JAVA_HOME=/usr/local/jdk1.8
export PATH=$JAVA_HOME/bin:$PATH

Configuration effective command

source /etc/profile

Verify that the installation was successful

java

javac

java -version

Install Druid

Refer to Chinese website for installation and use: http://www.apache-druid.cn/

# wget https://archive.apache.org/dist/druid/0.17.0/apache-druid-0.17.0-bin.tar.gz
Download 0 from document.17 There is a problem with the version in use. It has been a pit for a long time. Download the latest version 0 from the official website.22.1 No problem has been found yet.

Official website: https://druid.apache.org/

Each version set: https://archive.apache.org/dist/druid/

tar -zxvf apache-druid-0.17.0-bin.tar.gz  

mv apache-druid-0.17.0/ druid

[root@administrator program]# cd druid
[root@administrator druid]# ls
bin  conf  extensions  hadoop-dependencies  lib  LICENSE  licenses  NOTICE  quickstart  README

The following files are in the installation package:

bin  Start stop and other scripts

conf   Sample configuration for single node deployment and cluster deployment

extensions  Druid Core extension

hadoop-dependencies  Druid Hadoop rely on

lib  Druid Core libraries and dependencies

quickstart Configuration files, sample data, and other files in the quick start textbook

Single server reference configuration

Nano-Quickstart: 1 CPU, 4GB Memory
 Start command: bin/start-nano-quickstart
 configure directory: conf/druid/single-server/nano-quickstart

Micro-Quickstart: 4 CPU, 16GB Memory
 Start command: bin/start-micro-quickstart
 configure directory: conf/druid/single-server/micro-quickstart

Small: 8 CPU, 64GB Memory (~i3.2xlarge)
Start command: bin/start-small
 configure directory: conf/druid/single-server/small

Medium: 16 CPU, 128GB Memory (~i3.4xlarge)
Start command: bin/start-medium
 configure directory: conf/druid/single-server/medium

Large: 32 CPU, 256GB Memory (~i3.8xlarge)
Start command: bin/start-large
 configure directory: conf/druid/single-server/large

X-Large: 64 CPU, 512GB Memory (~i3.16xlarge)
Start command: bin/start-xlarge
 configure directory: conf/druid/single-server/xlarge
[root@administrator druid]# ./bin/start-nano-quickstart 
[Fri Dec 24 10:52:06 2021] Running command[zk], logging to[/usr/local/program/druid/var/sv/zk.log]: bin/run-zk conf
[Fri Dec 24 10:52:06 2021] Running command[coordinator-overlord], logging to[/usr/local/program/druid/var/sv/coordinator-overlord.log]: bin/run-druid coordinator-overlord conf/druid/single-server/nano-quickstart
[Fri Dec 24 10:52:06 2021] Running command[broker], logging to[/usr/local/program/druid/var/sv/broker.log]: bin/run-druid broker conf/druid/single-server/nano-quickstart
[Fri Dec 24 10:52:06 2021] Running command[router], logging to[/usr/local/program/druid/var/sv/router.log]: bin/run-druid router conf/druid/single-server/nano-quickstart
[Fri Dec 24 10:52:06 2021] Running command[historical], logging to[/usr/local/program/druid/var/sv/historical.log]: bin/run-druid historical conf/druid/single-server/nano-quickstart
[Fri Dec 24 10:52:06 2021] Running command[middleManager], logging to[/usr/local/program/druid/var/sv/middleManager.log]: bin/run-druid middleManager conf/druid/single-server/nano-quickstart

Access: IP:8888

Data loading

Use the Data Loader to load data

Click Load data to enter the Load data page, select Local disk, and then click Connect data

An example data file is officially provided, which contains the Wikipedia page editing event on September 12, 2015. The sample data is located in QuickStart / tutorial / wikiticker-2015-09-12-sampled. In the root directory of Druid package json. GZ, page editing events are stored as JSON objects in text files.

[root@administrator druid]# ll ./quickstart/tutorial/
Total consumption 2412
-rw-r--r-- 1 501 wheel     295 1 June 22, 2020 compaction-day-granularity.json
-rw-r--r-- 1 501 wheel    1428 1 June 22, 2020 compaction-init-index.json
.........
-rw-r--r-- 1 501 wheel 2366222 1 June 22, 2020 wikiticker-2015-09-12-sampled.json.gz
[root@administrator druid]#

Enter quickstart/tutorial / in the Base directory and select wikipicker-2015-09-12-sampled in the File filter json. GZ or enter the file name and click Apply to make sure the data you see is correct

Click Next:Parse data

The data loader will attempt to automatically determine the correct parser for the data. In this case, it will successfully determine the json. Feel free to use different parser options to preview how Druid parses your data.

Click Next:Parse time to determine the main time column

Druid's architecture requires a primary time column (internally stored as a column named _time). If your data does not have a timestamp, select a Constant Value. In our example, the data loader will determine that the time column in the original data is the only candidate that can be used as the primary time column.

Click Next:Transform to set the use of ingestion time transformation

Click Next:Filter to set the filter

Click Next:Configure schema

Configure which dimensions and indicators will be ingested into Druid, which is what the data will look like after being ingested in Druid. Since the dataset is very small, turn off rollup and confirm the changes.

Click Next:Partition to adjust how the data is divided into segment files

Adjust how data is split into segments in Druid. Since this is a small dataset, no adjustments are required in this step.

Click Next:Tune

Click Next:Publish

Specify the name of the data source in Druid and name the data source wikiticker

Click Next:Edit JSON spec to view the ingestion specification

Get the data intake specification JSON, and finally generate the current JSON data from the parameters set on each previous page.

The JSON is the built specification. In order to see how the specification will be updated, you can go back to the previous steps to make changes. Similarly, you can edit the specification directly and see it in the previous steps.

When you are satisfied with the ingestion specification, click Submit, and then you will create a data ingestion task and jump to the task page

When a task completes successfully, it means that it has established one or more segments that will now be received by the Data server.

After the task is completed, click data sources to enter the data source page, and you can see the wikiticker data source

Wait until the data source (wikipicker) appears. It may take a few seconds to load the segment. Once you see the green (fully available) circle, you can Query the data source. At this time, you can go to the Query view to run SQL Query against the data source.

Click Query to enter the data Query page to Query the data

Loading data using spec (via console)

Druid's installation package is in QuickStart / tutorial / Wikipedia index The JSON file contains an example of a local batch ingestion task specification. The specification has been configured to read QuickStart / tutorial / wikiticker-2015-09-12-sampled json. GZ input file.

The specification will create a data source named "wikipedia"

[root@administrator druid]# cat ./quickstart/tutorial/wikipedia-index.json 
{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "timestampSpec": {
        "column": "time",
        "format": "iso"
      },
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/tutorial/",
        "filter" : "wikiticker-2015-09-12-sampled.json.gz"
      },
      "inputFormat" : {
        "type" : "json"
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000,
      "maxRowsInMemory" : 25000
    }
  }
}
[root@administrator druid]# 


On the "Tasks" page, click Submit task and select Raw JSON task


Enter the data extraction specification in the input box

After submitting the task specification, wait for data loading according to the same specification above, and then query.

Loading data using spec (from the command line)

A batch ingestion help script bin / post index task is provided in Druid's package

The script publishes the data ingestion task to the Druid overload and polls the Druid until the data can be queried.

Run the following command in the Druid root directory:

bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081
[root@administrator druid]# bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081
Beginning indexing data for wikipedia
Task started: index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z
Task log:     http://localhost:8081/druid/indexer/v1/task/index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z/log
Task status:  http://localhost:8081/druid/indexer/v1/task/index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z/status
Task index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z still running...
Task index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z still running...
Task index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z still running...
Task index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z still running...
Task index_parallel_wikipedia_hiapdgph_2021-12-24T02:22:25.778Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto the cluster...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia loading complete! You may now query your data
[root@administrator druid]# 

Do not use scripts to load data

curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task

Data cleaning

Data cleaning needs to shut down the service and cluster, and reset the service and cluster status by deleting the contents of var directory under druid package

Load data from Kafka

Installing Zookeeper

because Kafka Also required Zookeeper,So will Zookeeper Stand alone deployment and installation
docker run -id --name zk -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper:latest

docker logs -f zk

kafka installation

Pull image

docker pull wurstmeister/kafka

Start container

docker run -id --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=IP:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

Parameter description

-e KAFKA_BROKER_ID=0  stay kafka In the cluster, each kafka There is one BROKER_ID To distinguish yourself

-e KAFKA_ZOOKEEPER_CONNECT=IP:2181 to configure zookeeper Administration kafka Path of

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092 register the address port of kafka with zookeeper

-e KAFKA_LISTENERS=PLAINTEXT://0.0. 0.0:9092 configure the listening port of kafka

-v /etc/localtime:/etc/localtime Container time synchronizes the time of the virtual machine

View container log

docker logs -f kafka

View zookeeper

Enter container

docker exec -it kafka /bin/bash

Enter the bin directory

bash-5.1# cd /opt/kafka_2.13-2.8.1/bin/
bash-5.1# ls
connect-distributed.sh               kafka-consumer-perf-test.sh          kafka-producer-perf-test.sh          kafka-verifiable-producer.sh
connect-mirror-maker.sh              kafka-delegation-tokens.sh           kafka-reassign-partitions.sh         trogdor.sh
connect-standalone.sh                kafka-delete-records.sh              kafka-replica-verification.sh        windows
kafka-acls.sh                        kafka-dump-log.sh                    kafka-run-class.sh                   zookeeper-security-migration.sh
kafka-broker-api-versions.sh         kafka-features.sh                    kafka-server-start.sh                zookeeper-server-start.sh
kafka-cluster.sh                     kafka-leader-election.sh             kafka-server-stop.sh                 zookeeper-server-stop.sh
kafka-configs.sh                     kafka-log-dirs.sh                    kafka-storage.sh                     zookeeper-shell.sh
kafka-console-consumer.sh            kafka-metadata-shell.sh              kafka-streams-application-reset.sh
kafka-console-producer.sh            kafka-mirror-maker.sh                kafka-topics.sh
kafka-consumer-groups.sh             kafka-preferred-replica-election.sh  kafka-verifiable-consumer.sh
bash-5.1# 

Create a Kafka topic / queue named "wikipedia" for sending data. This queue has one copy and one partition

bash-5.1# kafka-topics.sh --create --zookeeper IP:2181 --replication-factor 1 --partitions 1 --topic wikipedia
Created topic wikipedia.
bash-5.1# 

View created queues

bash-5.1# kafka-topics.sh -list -zookeeper IP:2181
wikipedia
bash-5.1# 

Test whether message sending and receiving are normal

# Start the consumer and listen to the wikipedia queue
bash-5.1# kafka-console-consumer.sh --bootstrap-server IP:9092 --topic wikipedia --from-beginning

hello kafka

# Open a new command window, start the producer and send messages to the wikipedia queue
bash-5.1# kafka-console-producer.sh --broker-list IP:9092 --topic wikipedia
>hello kafka
>

Modify Druid

 Due to independent use Zookeeper,So it needs to be closed Druid Associated Zookeeper to configure

Note Zookeeper configuration

[root@administrator druid]# cat  conf/supervise//single-server/nano-quickstart.conf 
:verify bin/verify-java
:verify bin/verify-default-ports
:kill-timeout 10

# notes! p10 zk bin/run-zk conf
# !p10 zk bin/run-zk conf
coordinator-overlord bin/run-druid coordinator-overlord conf/druid/single-server/nano-quickstart
broker bin/run-druid broker conf/druid/single-server/nano-quickstart
router bin/run-druid router conf/druid/single-server/nano-quickstart
historical bin/run-druid historical conf/druid/single-server/nano-quickstart
!p90 middleManager bin/run-druid middleManager conf/druid/single-server/nano-quickstart

Remove the detection of 2181 port

[root@administrator druid]# cat bin/verify-default-ports 
#!/usr/bin/env perl

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

use strict;
use warnings;
use Socket;

sub try_bind {
  my ($port, $addr) = @_;

  socket(my $sock, PF_INET, SOCK_STREAM, Socket::IPPROTO_TCP) or die "socket: $!";
  setsockopt($sock, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or die "setsockopt: $!";
  if (!bind($sock, sockaddr_in($port, $addr))) {
    print STDERR <<"EOT";
Cannot start up because port $port is already in use.

If you need to change your ports away from the defaults, check out the
configuration documentation:

  https://druid.apache.org/docs/latest/configuration/index.html

If you believe this check is in error, or if you have changed your ports away
from the defaults, you can skip this check using an environment variable:

  export DRUID_SKIP_PORT_CHECK=1

EOT
    exit 1;
  }
  shutdown($sock, 2);
}

my $skip_var = $ENV{'DRUID_SKIP_PORT_CHECK'};
if ($skip_var && $skip_var ne "0" && $skip_var ne "false" && $skip_var ne "f") {
  exit 0;
}

my @ports = @ARGV;
if (!@ports) {
# Port monitoring
#  @ports = (1527, 2181, 8081, 8082, 8083, 8090, 8091, 8100, 8200, 8888);
  @ports = (1527, 8081, 8082, 8083, 8090, 8091, 8100, 8200, 8888);
}

for my $port (@ports) {
  try_bind($port, INADDR_ANY);
  try_bind($port, inet_aton("127.0.0.1"));
}
[root@administrator druid]# 

Modify public configuration

[root@administrator druid]# cat conf/druid/single-server/nano-quickstart/_common/common.runtime.properties 
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Extensions specified in the load list will be loaded by Druid
# We are using local fs for deep storage - not recommended for production - use S3, HDFS, or NFS instead
# We are using local derby for the metadata store - not recommended for production - use MySQL or Postgres instead

# If you specify `druid.extensions.loadList=[]`, Druid won't load any extension from file system.
# If you don't specify `druid.extensions.loadList`, Druid will load all the extensions under root extension directory.
# More info: https://druid.apache.org/docs/latest/operations/including-extensions.html
druid.extensions.loadList=["druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches"]

# If you have a different version of Hadoop, place your Hadoop client jar files in your hadoop-dependencies directory
# and uncomment the line below to point to your directory.
#druid.extensions.hadoopDependenciesDir=/my/dir/hadoop-dependencies


#
# Hostname
#
# It's not enough to use IP here
druid.host=localhost

#
# Logging
#

# Log all runtime properties on startup. Disable to avoid logging properties on startup:
druid.startup.logging.logProperties=true

#
# Zookeeper
#

# druid.zk.service.host=localhost
# Fill in the IP address of the independently deployed zookeeper
druid.zk.service.host=IP
druid.zk.paths.base=/druid

#
# Metadata storage
#

# For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over):
druid.metadata.storage.type=derby
druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
druid.metadata.storage.connector.host=localhost
druid.metadata.storage.connector.port=1527

# For MySQL (make sure to include the MySQL JDBC driver on the classpath):
#druid.metadata.storage.type=mysql
#druid.metadata.storage.connector.connectURI=jdbc:mysql://db.example.com:3306/druid
#druid.metadata.storage.connector.user=...
#druid.metadata.storage.connector.password=...

# For PostgreSQL:
#druid.metadata.storage.type=postgresql
#druid.metadata.storage.connector.connectURI=jdbc:postgresql://db.example.com:5432/druid
#druid.metadata.storage.connector.user=...
#druid.metadata.storage.connector.password=...

#
# Deep storage
#

# For local disk (only viable in a cluster if this is a network mount):
druid.storage.type=local
druid.storage.storageDirectory=var/druid/segments

# For HDFS:
#druid.storage.type=hdfs
#druid.storage.storageDirectory=/druid/segments

# For S3:
#druid.storage.type=s3
#druid.storage.bucket=your-bucket
#druid.storage.baseKey=druid/segments
#druid.s3.accessKey=...
#druid.s3.secretKey=...

#
# Indexing service logs
#

# For local disk (only viable in a cluster if this is a network mount):
druid.indexer.logs.type=file
druid.indexer.logs.directory=var/druid/indexing-logs

# For HDFS:
#druid.indexer.logs.type=hdfs
#druid.indexer.logs.directory=/druid/indexing-logs

# For S3:
#druid.indexer.logs.type=s3
#druid.indexer.logs.s3Bucket=your-bucket
#druid.indexer.logs.s3Prefix=druid/indexing-logs

#
# Service discovery
#

druid.selectors.indexing.serviceName=druid/overlord
druid.selectors.coordinator.serviceName=druid/coordinator

#
# Monitoring
#

druid.monitoring.monitors=["org.apache.druid.java.util.metrics.JvmMonitor"]
druid.emitter=noop
druid.emitter.logging.logLevel=info

# Storage type of double columns
# ommiting this will lead to index double as float at the storage layer

druid.indexing.doubleStorage=double

#
# Security
#
druid.server.hiddenProperties=["druid.s3.accessKey","druid.s3.secretKey","druid.metadata.storage.connector.password"]


#
# SQL
#
druid.sql.enable=true

#
# Lookups
#
druid.lookup.enableLookupSyncOnStartup=false
[root@administrator druid]# 

Restart the project to view Zookeeper

Send data to Kafka

cd quickstart/tutorial

gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json

docker cp ./wikiticker-2015-09-12-sampled.json kafka:/opt/kafka_2.13-2.8.1/bin

bash-5.1# kafka-console-producer.sh --broker-list IP:9092 --topic wikipedia < ./wikiticker-2015-09-12-sampled.json
bash-5.1# 

Console using data loader

Enter IP:9092 in Bootstrap servers and wikipedia in Topic


In the Tune step, it is important to set use early offset to True because you need to consume data from the beginning of the flow.

Name the data source kafkadata

Submit supervisor via console

Click the Tasks button to enter the task page

Click Submit after pasting the specification, which will start the supervisor, which will then generate some tasks, which will start listening to the incoming data.

Submit supervisor directly

In order to start the service directly, we can run the following command in the root directory of druid to submit a supervisor specification to Druid Overlord

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor

Java client operation druid

        <dependency>
            <groupId>org.apache.calcite.avatica</groupId>
            <artifactId>avatica-core</artifactId>
            <version>1.19.0</version>
        </dependency>
	@Test
    public void test throws Exception{
        Class.forName("org.apache.calcite.avatica.remote.Driver");
        Connection connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/");
       
        Statement st = null;
        ResultSet rs = null;
        try {
            st = connection.createStatement();
            rs = st.executeQuery("select * from wikipedia");
            ResultSetMetaData rsmd = rs.getMetaData();
            List<Map> resultList = new ArrayList();
            while (rs.next()) {
                Map map = new HashMap();
                for (int i = 0; i < rsmd.getColumnCount(); i++) {
                    String columnName = rsmd.getColumnName(i + 1);
                    map.put(columnName, rs.getObject(columnName));
                }
                resultList.add(map);
            }
            System.out.println("resultList = " + resultList.size());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (SQLException e) {
            }
        }
    }

Kafka sends data to Druid

        <dependency>
            <groupId>org.apache.calcite.avatica</groupId>
            <artifactId>avatica-core</artifactId>
            <version>1.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.0</version>
        </dependency>
spring:
  kafka:
    # kafka address
    bootstrap-servers: IP:9092
    # Specifies the number of threads in the listener container to increase concurrency
    listener:
      concurrency: 5
    producer:
      # retry count
      retries: 3
      # Number of messages sent per batch
      batch-size: 1000
      # Buffer size
      buffer-memory: 33554432
      # Specifies the encoding and decoding methods of the message key and message body, and the serialization and deserialization classes provided by Kafka
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # Specify the default consumer group id
      group-id: kafka-test
      # Specifies the encoding and decoding method of the message key and message body
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
@Component
@Slf4j
public class KafkaSender {
    public final static String MSG_TOPIC = "my_topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * Send message to kafka queue
     *
     * @param topic
     * @param message
     * @return
     */
    public boolean send(String topic, String message) {
        try {
            kafkaTemplate.send(topic, message);
            log.info("Message sent successfully:{} , {}", topic, message);
        } catch (Exception e) {
            log.error("Message sending failed:{} , {}", topic, message, e);
            return false;
        }
        return true;
    }
}
@RestController
@Slf4j
public class KafkaController {
    
    @Autowired
    private  KafkaSender kafkaSender;
    
    @PostMapping(value = "/send")
    public Object send(@RequestBody JSONObject jsonObject) {
        kafkaSender.send(KafkaSender.MSG_TOPIC, jsonObject.toJSONString());
        return "success";
    }
}
INFO 73032 --- [nio-8888-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 1000
	bootstrap.servers = [IP:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

  INFO 73032 --- [nio-8888-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
  INFO 73032 --- [nio-8888-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
  INFO 73032 --- [nio-8888-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1640330631065
  INFO 73032 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: DCSWcLrOTLuv6M_hwSCSmg
  INFO 73032 --- [nio-8888-exec-1] cn.ybzy.demo.druid.KafkaSender           : Message sent successfully: my_topic , {"businessId":"123456","content":"kafka test"}

View Druid

Keywords: Database Big Data Apache Cloud Native

Added by Brandon Jaeger on Sat, 25 Dec 2021 15:34:58 +0200