Apache Flume
1. General
Flume is a highly available, reliable and distributed software for massive log collection, aggregation and transmission provided by Cloudera.
The core of flume is to collect data from the data source, and then send the collected data to the specified destination (sink). In order to ensure the success of the transportation process, flume will cache the data (channel) before sending it to the destination (sink). After the data really reaches the destination (sink), flume will delete its cached data.
Flume supports customization of various data senders for collecting various types of data; At the same time, flume supports customization of various data receivers for the final storage of data. General collection requirements can be realized through simple configuration of flume. It also has good user-defined extension ability for special scenarios. Therefore, flume can be applied to most daily data collection scenarios.
There are currently two versions of flume. Flume 0.9X is collectively referred to as Flume OG (original generation), flume 1 The X version is collectively referred to as Flume NG (next generation). Flume NG is very different from Flume OG due to the reconstruction of core components, core configuration and code architecture. Please pay attention to the difference when using Flume NG. Another reason for the change is that flume is included in apache, and Cloudera Flume is renamed Apache Flume.
2. Operation mechanism
The core role of Flume system is agent. Agent itself is a Java process, which generally runs on the log collection node.
Agent: agent. In flume cluster, each node is an agent, including the process of flume single node: accepting, encapsulating, carrying and transmitting event s to the destination. This process consists of three parts (source, channel and sink).
Each agent is equivalent to a data transmitter. There are three internal components:
Source: collection source, which is used to connect with the data source to obtain data;
Sink: sink, the transmission purpose of collected data, which is used to transmit data to the next level agent or to the final storage system;
Channel: the internal data transmission channel of agent, which is used to transfer data from source to sink;
In the whole process of data transmission, event flows, which is the most basic unit of Flume internal data transmission. Event encapsulates the transmitted data. If it is a text file, it is usually a line of record, and event is also the basic unit of transaction. Event flows from source to channel and then to sink. It is a byte array and can carry header information. Event represents the smallest complete unit of data, from external data source to external destination.
A complete event includes: event headers, event body and event information, where event information is the journal records collected by flume.
1.1. Simple structure
Single agent collects data
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-cfafrsxp-164190672960) (day04_flume. Assets / 1626619963786. PNG)]
1.2. Complex structure
Cascade between multi-level agent s
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-0lqxowc1-164190672961) (day04_flume. Assets / 1626619972202. PNG)]
4. Flume installation and deployment
Upload the installation package to the node where the data source is located
Then unzip tar -zxvf apache-flume-1.9.0-bin.exe tar. gz
Then enter the flume directory and modify flume env. Under conf SH, configure Java in it_ HOME
5.flume initial experience
Enter the conf directory,
vim http_logger.properties
The configuration is as follows:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Start service
Execute in / opt/servers/flume-1.9.0 /
bin/flume-ng agent -c conf -f conf/http_logger.properties -n a1 -Dflume.root.logger=INFO,console
Note:
-c conf specifies the directory of flume's own configuration file
-f conf/netcat-logger.con specifies the acquisition scheme we describe
-n a1 specify the name of our agent
After flume starts, occupy the current window, copy a new window, and execute the following in any directory
curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://hadoop01:22222
Detailed description of configuration file
#a1 is the user-defined agent name, which corresponds to the - n attribute in the startup command a1.sources = r1 #Define the data source of the agent. There can be multiple data sources. a1.sinks = k1 #Define the data source of the agent, which can be multiple. a1.channels = c1 #Define the channels of the agent. Generally, there are as many channels as there are sink s a1.sources.r1.type = http #Specifies the type of source http a1.sources.r1.bind = 0.0.0.0 #Specify the source of the source. Generally local, passive reception a1.sources.r1.port = 22222 #Specify port a1.sinks.k1.type = logger //Front end log printout a1.channels.c1.type = memory #Specifies that the type of channel is memory a1.channels.c1.capacity = 1000 #Specify the storage capacity to avoid forcibly preempting memory and affecting the normal operation of other processes a1.channels.c1.transactionCapacity = 100 #Specify transaction capacity a1.sources.r1.channels = c1 #Bind source a1.sinks.k1.channel = c1 #Bind sink
Practice case
Source exercise
1.avro
vim avro_logger.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1. Create the file log under / opt/data/flumedata Txt and edit and add data
2. Execute the command in the conf directory under the flume installation directory to start the agent
bin/flume-ng agent -c conf -f conf/avro_logger.conf -n a1 -Dflume.root.logger=INFO,console
3. Simulate sending avro in the bin directory of flume:
bin/flume-ng avro-client -c conf -H hadoop01 -p 22222 -F /opt/data/flumedata/log.txt
2.Spooldir
spooldir: source source, used to monitor file directories
be careful:
1) spooldir is not suitable for continuous writing in files.
2) For monitored files, if the same file name is put into the monitoring directory again, the service will report an error and will no longer monitor.
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/data/spooldir a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- Create the folder spooldir in the / home/data directory
- start-up
- Add the vim file in spooldir and save it. Print edits found in flume log.
3. Collect directory to HDFS
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##Note: you cannot repeatedly throw files with the same name into the monitoring target a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/data/spooldir #a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 #Get time a1.sinks.k1.hdfs.useLocalTimeStamp = true #The generated file type is Sequencefile by default. If DataStream is available, it is normal text a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Parameter resolution:
· rollInterval
Default: 30
How long is the hdfs sink interval? Scroll the temporary file to the final target file, unit: second;
If set to 0, it means that the file is not scrolled according to time;
Note: roll means that hdfs sink renames the temporary file to the final target file and opens a new temporary file to write data;
· rollSize
Default value: 1024
When the temporary file reaches this size (unit: bytes), scroll to the target file;
If set to 0, it means that the file is not scrolled according to the temporary file size;
· rollCount
Default: 10
When the events data reaches this amount, scroll the temporary file into the target file;
If set to 0, it means that the file is not scrolled according to the events data;
· round
Default: false
Scroll through the file directory.
Whether to enable discard in time. Discard here is similar to rounding.
· roundValue
Default: 1
The value of "discard" in time;
· roundUnit
Default: seconds
The units for "discarding" in time include: second,minute,hour
4. Collect files to HDFS
exec can only specify one file for monitoring, which monitors the files written continuously.
# Name the components on this agent a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/data/exec/test.log a1.sources.r2.type = exec a1.sources.r2.command = tail -F /opt/data/exec/test1.log a1.sources.r3.type = exec a1.sources.r3.command = tail -F /opt/data/exec/test2.log # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/exec/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #The generated file type is Sequencefile by default. If DataStream is available, it is normal text a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1
Develop shell scripts and regularly append file contents
mkdir -p /opt/servers/shells/ cd /opt/servers/shells/ vim exec.sh
#!/bin/bash while true do date >> /opt/data/exec/test.log; sleep 0.5; done
Startup script
sh /opt/servers/shells/exec.sh
5. Use of taildir
Monitor the continuous writing of multiple files at the same time
Fixed file: 1 txt 2. txt
Continuous writing to file: test log 3. txt
source:taildir
sink:hdfs
channel:memory
# Define core components a1.sources = r1 a1.channels = c1 a1.sinks = k1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/data/taildir/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/data/taildir/a/.*txt.* a1.sources.r1.filegroups.f2 = /opt/data/taildir/b/test.log a1.sources.ri.maxBatchCount = 1000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /taildir/events/%y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileType = DataStream # memory channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Define the connection relationship between score sink and channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Cluster deployment
Hadoop01: JDK,Hadoop,Flume
Hadoop02: JDK,Flume
Hadoop03: JDK,Flume
Just send the Flume folder installed in Hadoop 01 to the corresponding locations of nodes 02 and 03.
Case exercise
multistage
Hadoop01
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
hadoop02
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
hadoop03
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Start the nodes from Hadoop 03 in sequence
Fan in
Hadoop01
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Hadoop02
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Hadoop03
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Fan out
Hadoop01
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 a1.sources.r1.type = http a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 22222 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop03 a1.sinks.k2.port = 22222 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
Hadoop02
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Hadoop03
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 22222 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1