Big data - Apache Flume

Apache Flume

1. General

Flume is a highly available, reliable and distributed software for massive log collection, aggregation and transmission provided by Cloudera.

The core of flume is to collect data from the data source, and then send the collected data to the specified destination (sink). In order to ensure the success of the transportation process, flume will cache the data (channel) before sending it to the destination (sink). After the data really reaches the destination (sink), flume will delete its cached data.

Flume supports customization of various data senders for collecting various types of data; At the same time, flume supports customization of various data receivers for the final storage of data. General collection requirements can be realized through simple configuration of flume. It also has good user-defined extension ability for special scenarios. Therefore, flume can be applied to most daily data collection scenarios.

There are currently two versions of flume. Flume 0.9X is collectively referred to as Flume OG (original generation), flume 1 The X version is collectively referred to as Flume NG (next generation). Flume NG is very different from Flume OG due to the reconstruction of core components, core configuration and code architecture. Please pay attention to the difference when using Flume NG. Another reason for the change is that flume is included in apache, and Cloudera Flume is renamed Apache Flume.

2. Operation mechanism

The core role of Flume system is agent. Agent itself is a Java process, which generally runs on the log collection node. ​

Agent: agent. In flume cluster, each node is an agent, including the process of flume single node: accepting, encapsulating, carrying and transmitting event s to the destination. This process consists of three parts (source, channel and sink).

Each agent is equivalent to a data transmitter. There are three internal components:

Source: collection source, which is used to connect with the data source to obtain data;

Sink: sink, the transmission purpose of collected data, which is used to transmit data to the next level agent or to the final storage system;

Channel: the internal data transmission channel of agent, which is used to transfer data from source to sink;

In the whole process of data transmission, event flows, which is the most basic unit of Flume internal data transmission. Event encapsulates the transmitted data. If it is a text file, it is usually a line of record, and event is also the basic unit of transaction. Event flows from source to channel and then to sink. It is a byte array and can carry header information. Event represents the smallest complete unit of data, from external data source to external destination.

A complete event includes: event headers, event body and event information, where event information is the journal records collected by flume.

1.1. Simple structure

Single agent collects data

[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-cfafrsxp-164190672960) (day04_flume. Assets / 1626619963786. PNG)]

1.2. Complex structure

Cascade between multi-level agent s

[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-0lqxowc1-164190672961) (day04_flume. Assets / 1626619972202. PNG)]

4. Flume installation and deployment

Upload the installation package to the node where the data source is located

Then unzip tar -zxvf apache-flume-1.9.0-bin.exe tar. gz

Then enter the flume directory and modify flume env. Under conf SH, configure Java in it_ HOME

5.flume initial experience

Enter the conf directory,

vim http_logger.properties

The configuration is as follows:

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 
a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
a1.sinks.k1.type  =  logger
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

Start service

Execute in / opt/servers/flume-1.9.0 /

bin/flume-ng agent -c conf -f conf/http_logger.properties -n a1 -Dflume.root.logger=INFO,console

Note:

-c conf specifies the directory of flume's own configuration file

-f conf/netcat-logger.con specifies the acquisition scheme we describe

-n a1 specify the name of our agent

After flume starts, occupy the current window, copy a new window, and execute the following in any directory

curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://hadoop01:22222

Detailed description of configuration file

#a1 is the user-defined agent name, which corresponds to the - n attribute in the startup command
 
a1.sources  =  r1     #Define the data source of the agent. There can be multiple data sources.
a1.sinks  =  k1       #Define the data source of the agent, which can be multiple.
a1.channels  =  c1    #Define the channels of the agent. Generally, there are as many channels as there are sink s
 
 
a1.sources.r1.type  =  http   #Specifies the type of source http
a1.sources.r1.bind  =  0.0.0.0  #Specify the source of the source. Generally local, passive reception
a1.sources.r1.port  =  22222    #Specify port
 
a1.sinks.k1.type  =  logger  //Front end log printout
 

  
a1.channels.c1.type  =  memory       #Specifies that the type of channel is memory
a1.channels.c1.capacity  =  1000     #Specify the storage capacity to avoid forcibly preempting memory and affecting the normal operation of other processes
a1.channels.c1.transactionCapacity  =  100  #Specify transaction capacity
 
a1.sources.r1.channels  =  c1    #Bind source
a1.sinks.k1.channel  =  c1       #Bind sink

Practice case

Source exercise

1.avro

vim avro_logger.conf
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 
a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
a1.sinks.k1.type  =  logger
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

1. Create the file log under / opt/data/flumedata Txt and edit and add data

2. Execute the command in the conf directory under the flume installation directory to start the agent

bin/flume-ng agent -c conf -f conf/avro_logger.conf -n a1 -Dflume.root.logger=INFO,console

3. Simulate sending avro in the bin directory of flume:

bin/flume-ng avro-client -c conf -H hadoop01 -p 22222 -F /opt/data/flumedata/log.txt

2.Spooldir

spooldir: source source, used to monitor file directories

be careful:

1) spooldir is not suitable for continuous writing in files.

2) For monitored files, if the same file name is put into the monitoring directory again, the service will report an error and will no longer monitor.

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 
a1.sources.r1.type  =  spooldir 
a1.sources.r1.spoolDir = /opt/data/spooldir
 
a1.sinks.k1.type  =  logger
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

  1. Create the folder spooldir in the / home/data directory
  2. start-up
  3. Add the vim file in spooldir and save it. Print edits found in flume log.

3. Collect directory to HDFS

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##Note: you cannot repeatedly throw files with the same name into the monitoring target
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/data/spooldir
#a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
#Get time
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#The generated file type is Sequencefile by default. If DataStream is available, it is normal text
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Parameter resolution:

· rollInterval

Default: 30

How long is the hdfs sink interval? Scroll the temporary file to the final target file, unit: second;

If set to 0, it means that the file is not scrolled according to time;

Note: roll means that hdfs sink renames the temporary file to the final target file and opens a new temporary file to write data;

· rollSize

Default value: 1024

When the temporary file reaches this size (unit: bytes), scroll to the target file;

If set to 0, it means that the file is not scrolled according to the temporary file size;

· rollCount

Default: 10

When the events data reaches this amount, scroll the temporary file into the target file;

If set to 0, it means that the file is not scrolled according to the events data;

· round

Default: false

Scroll through the file directory.

Whether to enable discard in time. Discard here is similar to rounding.

· roundValue

Default: 1

The value of "discard" in time;

· roundUnit

Default: seconds

The units for "discarding" in time include: second,minute,hour

4. Collect files to HDFS

exec can only specify one file for monitoring, which monitors the files written continuously.

# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/exec/test.log

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /opt/data/exec/test1.log

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /opt/data/exec/test2.log

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/exec/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#The generated file type is Sequencefile by default. If DataStream is available, it is normal text
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

Develop shell scripts and regularly append file contents
mkdir -p /opt/servers/shells/

cd  /opt/servers/shells/

vim exec.sh
#!/bin/bash

while true

do

 date >> /opt/data/exec/test.log;

  sleep 0.5;

done

Startup script

sh /opt/servers/shells/exec.sh

5. Use of taildir

Monitor the continuous writing of multiple files at the same time

Fixed file: 1 txt 2. txt

Continuous writing to file: test log 3. txt

source:taildir

sink:hdfs

channel:memory

# Define core components
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# taildir  source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/data/taildir/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/data/taildir/a/.*txt.*
a1.sources.r1.filegroups.f2 = /opt/data/taildir/b/test.log
a1.sources.ri.maxBatchCount = 1000

# hdfs  sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /taildir/events/%y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream


# memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Define the connection relationship between score sink and channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


Cluster deployment

Hadoop01: JDK,Hadoop,Flume

Hadoop02: JDK,Flume

Hadoop03: JDK,Flume

Just send the Flume folder installed in Hadoop 01 to the corresponding locations of nodes 02 and 03.

Case exercise

multistage

Hadoop01

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 
a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname  =  hadoop02
a1.sinks.k1.port  =  22222
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

hadoop02

a1.sources  =  r1
a1.sinks  =  k1 
a1.channels  =  c1
 
a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname  =  hadoop03
a1.sinks.k1.port  =  22222

 
a1.channels.c1.type  =  memory 
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100

 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

hadoop03

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

a1.sinks.k1.type  =  logger

a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

Start the nodes from Hadoop 03 in sequence

Fan in

Hadoop01

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 

a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
  
a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname  =  hadoop03
a1.sinks.k1.port  =  22222

a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100

a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

Hadoop02

a1.sources  =  r1
 
a1.sinks  =  k1
 
a1.channels  =  c1
 
 
 
a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname  =  hadoop03
a1.sinks.k1.port  =  22222
 
 
 
a1.channels.c1.type  =  memory
 
a1.channels.c1.capacity  =  1000
 
a1.channels.c1.transactionCapacity  =  100
 
 
 
a1.sources.r1.channels  =  c1
 
a1.sinks.k1.channel  =  c1

Hadoop03

a1.sources  =  r1 
a1.sinks  =  k1 
a1.channels  =  c1
 

a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
a1.sinks.k1.type  =  logger
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

Fan out

Hadoop01

a1.sources  =  r1
a1.sinks  =  k1 k2
a1.channels  =  c1 c2
 
a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname  =  hadoop02
a1.sinks.k1.port  =  22222
 
a1.sinks.k2.type  =  avro
a1.sinks.k2.hostname  =  hadoop03
a1.sinks.k2.port  =  22222
 
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
a1.channels.c2.type  =  memory
a1.channels.c2.capacity  =  1000
a1.channels.c2.transactionCapacity  =  100
 
 
 
a1.sources.r1.channels  =  c1 c2
a1.sinks.k1.channel  =  c1
a1.sinks.k2.channel  =  c2

Hadoop02

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 
 
 
a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
 
a1.sinks.k1.type  =  logger
 
 
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
 
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

Hadoop03

a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1
 
 
 
a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
 
 
 
a1.sinks.k1.type  =  logger
 
 
 
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
 
 
 
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1

Keywords: Big Data Apache flume

Added by manamino on Tue, 11 Jan 2022 13:39:36 +0200