Hadoop master's road -- 06--Flume log collection

Flume log collection

1, Flume introduction


Logo refers to the transfer of wood (data) from one place (data source) to another place (data destination) through the river channel (channel)

Working with documents

2, Installation configuration of Flume

1. Download and upload to the server

2. Decompression

3. Configuration

Copy flume env. From the template sh


Modify flume env sh

4. Configure environment variables

5. Make environmental variables work

3, Getting started with Flume

1. Configure acquisition scheme

View official website



Case: connect and monitor a port, collect data and display
Configure with notepad + +

create a new file


Example of copying the collection configuration of the official website:


Source code:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# Define the name of the agent and the components sources, sinks, and channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# Define data source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
# Define the destination of the data (sink)
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# Assembly components
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2. Specify a collection scheme to start flume

View official website

Command details

3. Data collection test

You can use the telnet command to send data to port 444 of this machine
Install telnet first


After installing Telnet, start telnet


Send data at telnet

Data collected after flume is short:

4, Case 1: monitor the change of a folder and save the new file collection to hdfs

If a new file is generated in a folder (log folder) under the collection server, the data in the file will be transferred to a folder on hdfs

1. The acquisition scheme needs to determine three parts

Data source: view the official website


Data sinking:


2. Acquisition profile:


Final profile:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# Define the name a1 of the agent and the components sources, sinks, and channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# Define data source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
a1.sources.r1.fileHeader = true

# Describe the sink
# Define the destination of the data (sink)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true


# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# Assembly components
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# Start command
# flume-ng agent --conf conf --conf-file conf/spoolingdir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

3. Start flume

4. Test

Flume will detect whether a new file is generated in the / var/log/apache/flumeSpool / folder, upload a file to the folder, and flume will upload the file to the hdfs cluster,

5. An error occurred

Caused by inconsistent guava versions

6. Restart flume and upload a file to the log folder to view the results


If the file is garbled, it indicates that the collection scheme needs to be improved

7. Modify the acquisition scheme, restart flume and upload the file test results

There's no garbled code

5, Case 2: monitor the change of a file and store the changed content on hdfs

1. Acquisition scheme

data source


Do not modify the data


Configuration code:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# Define the name a1 of the agent and the components sources, sinks, and channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# Define data source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/text.log


# Describe the sink
# Define the destination of the data (sink)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# Create folder circularly
a1.sinks.k1.hdfs.round = true
# The time interval for circular folder creation is 10 minutes
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use local time format
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# time interval
a1.sinks.k1.hdfs.rollInterval = 3
# Size interval
a1.sinks.k1.hdfs.rollSize = 20
# The number of event s. Whoever satisfies these three parameters first will start the loop scrolling
a1.sinks.k1.hdfs.rollCount = 5
# Batch quantity
a1.sinks.k1.hdfs.batchSize = 1
# File format, representing normal text format
a1.sinks.k1.hdfs.fileType = DataStream 

# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# Assembly components
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# Start command
# flume-ng agent --conf conf --conf-file conf/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

2. Test acquisition function

Simulation scenario
First write a shell script to continuously output the current date to the monitoring file / var / log / test Log, the file that simulates the server log

3. Clone another session and view the new content

4. Start flume

5. View the results on HDFS

6, Flume's reliability assurance - load balancing

The configured acquisition scheme is to receive the subsequent required data through the only Sink as the receiver, but the current Sink failure or large amount of data collection requests may occur. At this time, a single Sink configuration may not guarantee the reliability of Flume development. Therefore, Flume provides Flume Sink Processors to solve the above problems.
The sink processor allows you to define Sink groups and group multiple sinks into one entity. The sink processor can provide load balancing functions for services through multiple sinks in the group.

1. Build and configure flume cluster

flume cluster of three servers: Hadoop 01, Hadoop 02 and Hadoop 03

1) Distribute the flume file on Hadoop 01 to Hadoop 02 and Hadoop 03


2) Distributing environment variable profiles

3) Make environment variables work

2. Configure acquisition scheme

1) View an example of an official document

2) Configure the first level acquisition scheme on Hadoop 01

Two level configuration scheme


Avro



code:

3) Configure the second level scheme on Hadoop 02 and Hadoop 03

Hadoop02:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# Define the name a1 of the agent and the components sources, sinks, and channels

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

#Define the properties of the group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

#Define load balancing
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000


# Describe/configure the source
# Define data source
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/test1.log

# Describe the sink
# Define destination 1 of data (sinking)
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 4545

# Define Destination 2 of data (sinking)
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 4545


# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Start command
# flume-ng agent --conf conf --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file exec-avro.conf --name a1 -Dflume.root.logger=INFO,console


code:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# Define the name a1 of the agent and the components sources, sinks, and channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# Define data source

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02
a1.sources.r1.port = 4545
# Describe the sink
# Define the destination of the data (sink)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# Create folder circularly
a1.sinks.k1.hdfs.round = true
# The time interval for circular folder creation is ten minutes
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Number of local time used
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// In column editing mode, press and hold alt to select multiple columns

# time interval
a1.sinks.k1.hdfs.rollInterval = 3
# Size interval
a1.sinks.k1.hdfs.rollSize = 20
# The number of event s. Whoever satisfies these three parameters first will start the loop scrolling
a1.sinks.k1.hdfs.rollCount = 5
# Batch quantity
a1.sinks.k1.hdfs.batchSize = 1 
# The file format represents a plain text file
a1.sinks.k1.hdfs.fileType = DataStream


# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# Assembly components
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# Start command
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

Hadoop03:

3. Start flume

1) flume on Hadoop 02 and Hadoop 03

Start flume from the last level
Hadoop02:


Hadoop03:

2) Start flume on Hadoop 01


At the same time, it also prompts that the connection is successful on Hadoop 02 and Hadoop 03

4. Load balancing test

Clone a session of Hadoop 01, write a script and run it

5. View results

Hadoop02:


Hadoop03:


On Hadoop cluster:


7, Flume's reliability assurance - fault recovery

1. Configure acquisition scheme

Only part of the first level acquisition scheme needs to be changed


Change to fault recovery:


The second level acquisition scheme does not need to be modified

2. Start flume

Start flume from the last level
Restart the test script

3. Fault recovery

Turn off the flume of Hadoop 03 and wait for ten seconds (timeout, defined in the acquisition scheme)

8, Flume interceptor

1. Scenario:

In the actual application scenario, two servers A and B generate log data in real time. The log type is mainly access log,nginx.log and web log. Now you need to access the log data generated by servers A and B log,nginx.log and web Log is collected and summarized on server C, and uniformly collected and uploaded to HDFS file system for saving. The files storing log data in HDFS must be classified and counted according to the following requirements (20180723 represents the current date of collecting log data):
/source/logs/access/20180723/**
/source/logs/nginx/20180723/**
/source/logs/web/20180723/**

2. Flow chart of log data collection

3. Configuration files of Hadoop 02 and Hadoop 03

# example.conf: A single-node Flume configuration

# Name the components on this agent
# Define the name a1 of the agent and the components sources, sinks, and channels

a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# Define r1 data source

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/access.log

# Define interceptor r1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# Define r2 data sources

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/nginx.log

# Define interceptor r2
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# Define r3 data sources

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /var/log/web.log

# Define interceptor r3
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Describe the sink
# Define the destination of the data (sink)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01
a1.sinks.k1.port = 41414



# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# Assembly components
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

# Start command
# flume-ng agent --conf conf --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console

4. Collection scheme configuration file on Hadoop 01

# Name the components on this agent
# Define the name a1 of the agent and the components sources, sinks, and channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# Define data source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop01
a1.sources.r1.port = 41414

#Define interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
# Define the destination of the data (sink)

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/logs/%{type}/%y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
# Create folder circularly
a1.sinks.k1.hdfs.round = true
# The time interval for circular folder creation is ten minutes
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Number of local time used
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// In column editing mode, press and hold alt to select multiple columns

# time interval
a1.sinks.k1.hdfs.rollInterval = 0
# Size interval
a1.sinks.k1.hdfs.rollSize = 10485760
# The number of event s. Whoever satisfies these three parameters first will start the loop scrolling
a1.sinks.k1.hdfs.rollCount = 0
# Batch quantity
a1.sinks.k1.hdfs.batchSize = 10 
# The file format represents a plain text file
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.threadsPoolSize=10
a1.sinks.k1.hdfs.callTimeout=30000


# Use a channel which buffers events in memory
# Define pipe
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# Assembly components
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# Start command
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

5. Start flume

Start flume on Hadoop 01 first


Restart flume on Hadoop 02 and Hadoop 03

6. Test effect

Clone three sessions on Hadoop 02 and Hadoop 03, and execute the following three scripts to generate production log data

while true;do echo "access log ......"  >> /var/log/access.log;sleep 5;done



result:

Keywords: Big Data Hadoop hdfs flume

Added by future_man on Sun, 09 Jan 2022 10:17:28 +0200