Flume acquisition tool

Flume

1, Overview

1.Flume definition

Flume is a highly available, reliable and distributed system for massive log collection, aggregation and transmission provided by Cloudera. Flume is based on streaming architecture, which is flexible and simple.

2.Flume advantages

① It can be integrated with any storage process

② When the input data rate is greater than the write destination storage rate, flume will buffer to reduce the pressure on hdfs.

③ The transaction in flume is based on channel and uses two transaction models (sender + receiver) to ensure that messages are sent reliably.

Flume uses two independent transactions to deliver events from source to channel and from channel to sink. Once all the data in the transaction is successfully committed to the channel, the source considers that the data reading is completed. Similarly, only the data successfully written by sink will be removed from the channel.

3.Flume composition structure

Figure Flume composition architecture

Figure detailed explanation of Flume composition structure

4.Flume assembly

  • Agent

    Agent is a JVM process that sends data from the source to the destination in the form of events.

    Agent is mainly composed of three parts: Source, Channel and Sink.

  • Source (data source, i.e. accept data)

    Source is the component responsible for receiving data to Flume Agent. The source component can handle various types and formats of log data, including avro, thrift, exec, jms, spooling directory, netcat, sequence generator, syslog, http and legacy.

  • Channel (cache data to solve the problem of different rates between Source and Sink)

    Channel is the buffer between Source and Sink. Therefore, channel allows Source and Sink to operate at different rates. Channel is thread safe and can handle write operations of several sources and read operations of several Sink at the same time.

    Flume comes with two channels: Memory Channel and File Channel.

    Memory Channel is a queue in memory. Memory Channel is applicable when there is no need to care about data loss. If you need to care about data loss, the Memory Channel should not be used because data loss will be caused by program death, machine downtime or restart.

    File Channel writes all events to disk. Therefore, data will not be lost in case of program shutdown or machine downtime.

  • Sink (destination, i.e. to whom the data is delivered and sent to the destination) (each sink takes data in only one channel, i.e. only one channel)

    Sink constantly polls events in the Channel and removes them in batches, and writes these events in batches to the storage or indexing system, or is sent to another Flume Agent.

    Sink is completely transactional. Before bulk deleting data from the Channel, each sink starts a transaction with the Channel. Once batch events are successfully written out to the storage system or the next Flume Agent, sink will commit transactions using the Channel. Once the transaction is committed, the Channel deletes the event from its own internal buffer.

    Sink component destinations include hdfs, kafka, logger, avro, thrift, ipc, file, null, HBase, solr, and custom.

  • Event source accepts packets

    Transmission unit, the basic unit of Flume data transmission, sends data from the source to the destination in the form of events. Event consists of an optional header and a byte array containing data. Header is a HashMap containing key value string pairs.

    headers message header (stored in the form of k-v) body message body (hexadecimal, message content)

2, Installation

1. Installation address

1) Flume official website address

http://flume.apache.org

2) Document viewing address

http://flume.apache.org/FlumeUserGuide.html

3) Download address

http://archive.apache.org/dist/flume/

2. Installation steps

Preparation: install JDK and configure environment variables (ip, firewall, domain name, hostname)

1) Add apache-flume-1.9.0-bin tar. GZ upload to the / opt/module directory of linux

2) Unzip apache-flume-1.9.0-bin.exe tar. GZ to / opt/installs directory

3) Set flume env under flume/conf Modify the sh.template file to flume env SH and configure flume env SH file

[root@flume45 installs]# tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/
[root@flume45 installs]# mv apache-flume-1.9.0-bin/ flume1.9
[root@flume45 conf]#pwd
/opt/installs/flume1.9/conf
[root@flume45 conf]# mv flume-env.sh.template flume-env.sh
[root@flume45 conf]# vi flume-env.sh
export JAVA_HOME=/opt/installs/jdk1.8/

[root@flume45 flume1.9]# bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

3, Enterprise development case

1. Monitoring port data cases

  1. Case requirements

First, start the Flume task to monitor the local 44444 port [server];
Then send a message [Client] to the local 44444 port through the netcat tool;
Finally, Flume displays the monitored data on the console in real time.

  1. requirement analysis

  2. Implementation steps

  1. Installing the netcat tool

    netcat can monitor the network port of the computer and realize data transmission based on Tcp/IP.

    [root@flume45 flume1.9]# yum install -y nc
    
  2. Create Flume Agent configuration file Demo1 netcat memory logger conf

    stay flume Create under directory job Folder and enter job folder
    [root@flume45 flume1.9]# mkdir job
     stay job Create under folder Flume Agent configuration file demo1-netcat-memory-logger.conf
    [root@flume45 job]# touch demo1-netcat-memory-logger.conf
    [root@flume45 job]# ls
    demo1-netcat-memory-logger.conf
    
    stay flume-netcat-logger.conf Add the following content to the file
    [root@flume45 job]# vim demo1-netcat-memory-logger.conf
    
    # Add the following
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

Note: the configuration file comes from the official manual http://flume.apache.org/FlumeUserGuide.html

  1. Start flume listening port first
[root@flume0 apache-flume-1.9.0-bin]# bin/flume-ng agent --conf conf --name a1 --conf-file job/demo1-netcat-memory-logger.conf -Dflume.root.logger=INFO,console

Parameter Description:

-- conf conf /: indicates that the configuration file is stored in the conf / directory (also abbreviated as - c)

-- name a1: indicates that the agent is named a1

​ --conf-file job/xxxx.conf: flume the configuration file read this time is XXXX in the job folder Conf file.

​ -Dflume.root.logger==INFO,console: - D indicates that flume is dynamically modified when flume is running root. The logger parameter attribute value, and set the console log printing level to info level. Log levels include: log, info, warn and error.

  1. Use the netcat tool to send content to port 444 of this machine
[root@flume45 flume1.9]# nc localhost 44444
hah
OK
8997534
OK
568974
OK
  1. Observe the received data on the Flume monitoring page
2021-07-05 17:43:16,961 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 61 68    		hah }
2021-07-05 17:43:33,406 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 38 39 39 37 35 33 34   8997534 }
2021-07-05 17:43:50,033 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 35 36 38 39 37 34      568974 }

2. Read local files to HDFS cases in real time

Preparation: set up hadoop environment and start hdfs

1) Case requirements: monitor the contents of log files in real time and upload them to HDFS

2) Demand analysis:

3) Implementation steps:

  1. Flume must hold Hadoop related jar packages to output data to HDFS

    Copy the following jar package to / opt / installs / flume1 9 / lib folder.

  1. In the job directory, create demo2-exec-memory-hdfs Conf file

    [root@flume45 job]# touch demo2-exec-memory-hdfs.conf
    
    # The contents are as follows
    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = exec(Command type, the command is given below)
    a2.sources.r2.command = tail -F /opt/a.log
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://flume0:9000/flume/%y-%m-%d/%H
    # Increase the timestamp, otherwise the upload of hdfs fails
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #Set the file type and modify the upload file format type. By default, the serialization cannot be understood and is changed to text type
    a2.sinks.k2.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    

    For all time-related escape sequences, there must be a key with "timestamp" in the Event Header

    (unless hdfs.uselocaltimestamp is set to true, this method will automatically add a timestamp using the TimestampInterceptor). (a3.sinks.k3.hdfs.useLocalTimeStamp = true)

  2. Perform monitoring configuration

    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a2 --conf-file job/demo2-exec-memory-hdfs.conf -Dflume.root.logger=INFO,console
    
  3. Use echo command to simulate log generation

    [root@flume45 test]# echo hello >> a.log
    
  4. Accessing hdfs http://flume45:50070

Note: clock synchronization problem

linux time synchronization command:

1. Install the software using yum. In case of any query during software installation, select y. ntp is the time synchronization command: yum -y install ntp

2. Execute the synchronization command: ntpdate time1 aliyun. com

3. View current system time: date

Time synchronization has special significance in distributed programs. It effectively avoids the possibility that two hosts may receive error messages from future files in the process of data transmission in the distributed file system.

3. Read the directory file to the HDFS case in real time

1) Case requirement: use Flume to listen to files in the whole directory

2) Demand analysis:

3) Implementation steps:

  1. Create demo3 spoolingdir MEM HDFS conf

    [root@flume45 job]# touch demo3-SpoolingDir-mem-hdfs.conf
    
    # The contents are as follows
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source monitoring directory (you need to create the monitored folder in advance)
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/test/upload
    a3.sources.r3.fileSuffix = .done
    (prevent agent The downtime can be distinguished by repeated reading and adding a suffix. The file reading is completed.done sign)
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://flume45:9000/flume2/upload/%y-%m-%d/%H
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    a3.sinks.k3.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
  2. Start monitor folder command

    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a3 --conf-file job/demo3-SpoolingDir-mem-hdfs.conf -Dflume.root.logger=INFO,console
    

    Note: when using Spooling Directory Source

    1) do not create and continuously modify files in the monitoring directory, and subsequent additions will not take effect

    2) the uploaded files will be displayed in done end

    3) the monitored folder scans for file changes every 500 milliseconds

  3. Add files to the upload folder

    [root@flume45 test]# mkdir /opt/test/upload
    [root@flume45 test]# cp a.log /opt/test/upload/
    [root@flume45 upload]# ls
    a.log.done
    
  4. Accessing hdfs http://flume45:50070

4. Single data source multi export case (the selector is explained in Chapter 6)

The schematic diagram of single Source, multi-Channel and Sink is shown in the figure:

1) Case requirements: Flume-1 is used to monitor file changes, and Flume-1 passes the changes to Flume-2,

Flume-2 is responsible for storing to HDFS. At the same time, Flume-1 transmits the changes to Flume-3,

Flume-3 is responsible for outputting to the local file system.

2) Demand analysis:

3) Implementation steps:

  1. preparation

    stay/optinstalls/flume1.9/job Create under directory group4 folder
    [root@flume45 job]# mkdir group4
    
    stay/opt Create under directory datas/flume folder
    [root@flume45 job]# mkdir -p /opt/datas/flume2
     monitor a.log file
    
  2. In the group1 directory, create demo4-exec-memory-avro conf

    [root@flume45 job]# mkdir group4
    [root@flume45 group4]# touch demo4-exec-memory-avro.conf
    [root@flume45 group4]# touch demo4-avro-memory-hdfs.conf
    [root@flume45 group4]# touch demo4-avro-memory-file_roll.conf
    
    # Configure one source, two channel s and two sink for receiving log files	
    
    # Add the following
    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1 c2
    a1.sinks = k1 k2
    
    # Copy data flow to all channel s
    a1.sources.r1.selector.type = replicating
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/datas/flume2/a.log
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    
    # Describe the sink
    # avro on sink side is a data sender
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = flume45
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = flume45
    a1.sinks.k2.port = 4242
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    

    Note: Avro is a language independent data serialization and RPC framework created by Doug Cutting, founder of Hadoop.

    Note: RPC (Remote Procedure Call) - Remote Procedure Call. It is a protocol that requests services from remote computer programs through the network without understanding the underlying network technology.

  3. In the group4 directory, create demo4 Avro memory HDFS conf

    [root@flume45 group4]# touch demo4-avro-memory-hdfs.conf
    
    # Add the following
    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    # avro on the source side is a data receiving service
    a2.sources.r2.type = avro
    a2.sources.r2.bind = 0.0.0.0
    a2.sources.r2.port = 4141
    
    # Describe the channel
    a2.channels.c2.type = memory
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://flume45:9000/flume4/%y-%m-%d/%H
    #Data display storage format
    a2.sinks.k2.hdfs.fileType = DataStream
    #Use local timestamp
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    
  4. In the group4 directory, create demo4 Avro memory file_ roll. conf

    [root@flume45 group45]# touch demo4-avro-memory-file.conf
    
    # Add the following
    # Name the components on this agent
    a3.sources = r3
    a3.channels = c3
    a3.sinks = k3
    
    # Describe/configure the source
    a3.sources.r3.type = avro
    a3.sources.r3.bind = 0.0.0.0
    a3.sources.r3.port = 4242
    
    # Describe the channel
    a3.channels.c3.type = memory
    
    # Describe the sink
    # Output to / opt / data / flume2
    a3.sinks.k3.type = file_roll
    a3.sinks.k3.sink.directory = /opt/datas/flume2
    
    # Configure the scroll time
    a3.sinks.k3.sink.rollInterval = 3000
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    

    Tip: the output local directory must be an existing directory. If the directory does not exist, a new directory will not be created.

  5. Execution profile

    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a3 --conf-file job/group4/demo4-avro-memory-file_roll.conf
    
    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a2 --conf-file job/group4/demo4-avro-memory-hdfs.conf
    
    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a1 --conf-file job/group4/demo4-exec-memory-avro.conf
    
  6. Submit test data

    [root@flume45 flume2]# echo hahaha >> a.log
    
  7. Check data on HDFS

  8. Check the data in / opt / data / flume2 / a.log

5. Single data source multi export case (Sink group)

Sink group: a sink gets the data in the channel.

There are two schemes for handling event s in sink group:

I. load balancing:_ Balance (two equal, polling)

II. Disaster recovery: failover (the higher the priority number, the higher the priority)

The schematic diagram of single Source, Channel and multi sink (load balancing) is shown in the figure.

1) Case requirements: flume-1 is used to monitor network port 44444. Flume-1 transmits the changes to Flume-2, which is responsible for outputting them to the console. At the same time, flume-1 transmits the changes to Flume-3, which is also responsible for outputting them to the console
2) Demand analysis:

3) Implementation steps:

  1. preparation

    stay/optinstalls/flume1.9/job Create under directory group5 folder
    [root@flume45 job]# mkdir group5
    
  2. In the group5 directory, create demo5 group sins conf

    [root@flume45 job]# cd group5
    [root@flume45 group5]# touch demo5-netcat-memeory-avro.conf
    
    # Configure one source, one channel and two sink for receiving log files
    
    # Add the following
    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = flume45
    a1.sources.r1.port = 44444
    
    # Describe the channel
    a1.channels.c1.type = memory
    
    a1.sinkgroups.g1.sinks = k1 k2
    #sink's strategy for handling event s: load balancing
    a1.sinkgroups.g1.processor.type = load_balance
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = flume45
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = flume45
    a1.sinks.k2.port = 4242
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
  3. In the group5 directory, create demo5-sink-to-first conf

    [root@flume45 group5]# touch demo5-sink-to-first.conf
    
    # Add the following
    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = avro
    a2.sources.r2.bind = 0.0.0.0
    a2.sources.r2.port = 4141
    
    # Describe the sink
    a2.sinks.k2.type = logger
    
    # Describe the channel
    a2.channels.c2.type = memory
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    
  4. In the group5 directory, create demo5-sink-to-second conf

    [root@flume45 group5]# touch demo5-sink-to-second.conf
    
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = 0.0.0.0
    a3.sources.r1.port = 4242
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c2.type = memory
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    **In fact, just change it a3 Just change it
    
  5. Execution profile

    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a3 --conf-file job/group2/demo5-sink-to-second.conf -Dflume.root.logger=INFO,console
    
    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a2 --conf-file job/group2/demo5-sink-to-first.conf -Dflume.root.logger=INFO,console
    
    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a1 --conf-file job/group2/demo5-netcat-memory-avro.conf
    
  6. Use the netcat tool to send content to port 444 of this machine

    nc flume45 44444
    
  7. View the console print logs of Flume2 and Flume3

    Two printing methods:
    	polling  robin((default)
    	random random
    

6. Multi data source summary cases

Schematic diagram of summarizing data from multiple sources to single Flume:

1) Case requirements:
Realize sending the data of multiple agents to one agent
2) Demand analysis:

3) Implementation steps:

  1. preparation

    stay/opt/installs/flume1.9/job Create under directory group6 folder
    [root@flume45 job]# mkdir group6
    
  2. In the group6 directory, create demo6-agent1 conf

    [root@flume45 group6]# touch demo6-agent1.conf
    
    # The contents are as follows
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/test6/a.log
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = flume45
    a1.sinks.k1.port = 4141
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  3. In the group6 directory, create demo6-agent2 conf

    [root@flume45 group6]# touch demo6-agent2.conf
    
    # The contents are as follows
    a2.sources = r1
    a2.channels = c1
    a2.sinks = k1
    
    a2.sources.r1.type = exec
    a2.sources.r1.command = tail -F /opt/test6/b.log
    
    a2.channels.c1.type = memory
    
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = flume45
    a2.sinks.k1.port = 4141
    
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
  4. In the group6 directory, create demo6-agent3 conf

    [root@flume45 group6]# touch demo6-agent3.conf
    
    # The contents are as follows
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = flume45
    a3.sources.r1.port = 4141
    
    a3.channels.c1.type = memory
    
    a3.sinks.k1.type = logger
    
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
  5. Execution profile

    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a3 --conf-file job/group6/demo6-agent3.conf -Dflume.root.logger=INFO,console
    
    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a2 --conf-file job/group6/demo6-agent2.conf
    
    [root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a1 --conf-file job/group6/demo6-agent1.conf
    
    
  6. Use the echo command to add content to a.log and b.log to view the results

7.Taildir Source multi directory breakpoint continuation (flume 1.7, important level)

After reading the file, TAILDIR source will continue to read the file to check whether the file has the latest file content. If there is the latest file content, it will read the new content of the file

Note: flume 1.7.0 introduces the taildirSource component

Multiple directories: listen to multiple directories at the same time

Breakpoint continuation: the read location will be recorded. If the agent hangs, it can continue to read the original location after restarting

  1. Requirements: monitor multiple directories to realize breakpoint continuous transmission
  2. Demand analysis:
  3. Implementation steps
  1. Create a conf file in the job directory of flume

    [root@flume45 job]# touch demo7-taildir-file-logger.conf
    
  2. In demo7 taildir file logger Write an agent in the conf file

a1.sources = r1
a1.channels = c1
a1.sinks = k1

#The directory needs to be created in advance
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/datas/flume/ceshi.log
#Regular expression writing method (. Will match any character, * represents one or more occurrences, that is. * represents any character)
a1.sources.r1.filegroups.f2 = /opt/test/logs/.*log.*

a1.channels.c1.type = file

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3. Create the specified directory and corresponding files

stay/opt/datas/Create under flume catalogue
 stay/opt/test/Create under directory logs catalogue

4. Start the agent

bin/flume-ng agent --conf conf  -name a1 --conf-file job/demo7-taildir-file-logger.conf -Dflume.root.logger=INFO,console

5. Create a file to test log file

[root@flume45 flume]# touch ceshi.log
[root@flume45 flume]# echo hello >> ceshi.log
[root@flume45 flume]# echo hahaha >> ceshi.log

[root@flume45 logs]# touch aa.log
[root@flume45 logs]# echo hahaha99999 >> aa.log
[root@flume45 logs]# echo cartyuio >> aa.log

Reasons for continued transmission of breakpoints:

4, Four topologies of Flume

1. Serial mode

This mode connects multiple flumes sequentially, starting from the initial source to the destination storage system for the final sink transfer. This mode does not recommend bridging too many flumes. Too many flumes will not only affect the transmission rate, but also affect the whole transmission system once a node flume goes down during transmission.

Avro: RPC technology of Hadoop framework, RPC: remote procedure call

RPC(dubbo/avro) technology: call remote methods just like calling local methods, hiding the underlying details of RPC.

# When two agent s are connected, it must be Avro sink -- > Avro source
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/datas/a.log

a1.channels.c1.type = memory
# (to which machine)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flume45
a1.sinks.k1.port = 4545

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a2.sources = r2
a2.channels = c2
a2.sinks = k2

# (message received from which machine)
a2.sources.r2.type = avro
a2.sources.r2.bind = 0.0.0.0
a2.sources.r2.port = 4545

a2.channels.c2.type = memory

a2.sinks.k2.type = logger

a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
# Start a2 first and then a1 

2. Single Source, multi-Channel,Sink mode (copy mode)

Flume supports the flow of events to one or more destinations. This mode copies the data source to multiple channels, each channel has the same data, and sink can choose to transfer to different destinations.

3. Single Source,Channel and multi Sink mode (load balancing)

Flume supports logical grouping of multiple sinks into a sink group. Flume sends data to different sinks, which mainly solves the problems of load balancing and failover.

4. Aggregation mode

This combination of flume can well solve this problem. Each server deploys a flume to collect logs, which is transmitted to a flume that collects logs, and then the flume is uploaded to hdfs, hive, hbase, jms, etc. for log analysis.

5, Interceptor

flume implements the function of modifying and filtering events by using Interceptors. For example, a website produces a large amount of data every day, but many data may be incomplete (lack of important fields) or redundant. If these data are not specially processed, the efficiency of the system will be reduced. Then the interceptor comes in handy.

1. Interceptor position and function

Location: the interceptor is located between source and channel.

Function: it can label the past messages in the event header.

2.flume built-in interceptor

First list a table of flume built-in interceptors:

Since the interceptor usually handles the Event Header, let me introduce Event first

  • event is the basic unit for processing messages in flume, which is composed of zero or more header s and body.

  • The Header is in the form of key/value, which can be used to make routing decisions or carry other structured information (such as the timestamp of the event or the server hostname of the event source). You can think of it as providing the same function as HTTP headers - this method is used to transmit additional information outside the body.

  • Body is a byte array that contains the actual contents.

  • Different source s provided by flume will add different header s to the generated event s

1.1 timestamp interceptor

The Timestamp Interceptor interceptor can insert a timestamp with the keyword timestamp into the event header.

[root@flume45 job]# mkdir interceptors
[root@flume45 job]# cd interceptors/	
[root@flume45 interceptors]# touch demo1-timestamp.conf

#The contents of the document are as follows
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = flume45
a1.sources.r1.port = 44444

#timestamp interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

test

[root@flume0 interceptors]# nc flume45 44444

test result

2021-07-09 03:09:18,635 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org .apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{host=192.168.153.45, timestamp=1625771356107} body: 68 61 68 68 61 20              hahha  }
2021-07-09 03:09:20,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org  .apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{host=192.168.153.45, timestamp=1625771360814} body: 31 32 33 34 35       			12345 }

1.2 host interceptor

The interceptor can insert the host name or ip address of the default keyword host into the event header (note that it is the host name or ip address of the machine on which the agent runs)

[root@flume45 job]# mkdir interceptors
[root@flume45 job]# cd interceptors/	
[root@flume45 interceptors]# touch demo1-timestamp.conf

#The contents of the document are as follows
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = flume45
a1.sources.r1.port = 44444

#timestamp interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

test

[root@flume0 interceptors]# nc flume45 44444

test result

2021-07-09 03:09:18,635 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org .apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{host=192.168.153.45, timestamp=1625771356107} body: 68 61 68 68 61 20              hahha  }
2021-07-09 03:09:20,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org  .apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{host=192.168.153.45, timestamp=1625771360814} body: 31 32 33 34 35       			12345 }

1.3 Regex Filtering Interceptor

The Regex Filtering Interceptor interceptor is used to filter events and filter out events that match the configured regular expression. Can be used to include and exclude events. It is often used for data cleaning to filter out data through regular expressions.

[root@flume45 interceptors]# touch demo3-regex-filtering.conf

#The contents of the document are as follows
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = flume45
a1.sources.r1.port = 44444

#host interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
# Regular expression: all numeric data (accept: data conforming to regular expression)
a1.sources.r1.interceptors.i1.regex = ^[0-9]*$
#Exclude the data that conforms to the regular expression (write a regular expression for the data you don't want to exclude). Depending on the business requirements, choose one to use
a1.sources.r1.interceptors.i1.excludeEvents  = true

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Regex Filtering field development reference scenario: Troubleshooting error logs

2007-02-13 15:22:26 [com.sms.test.TestLogTool]-[INFO] this is info
2007-02-13 15:22:26 [com.sms.test.TestLogTool]-[ERROR] my exception  com.sms.test.TestLogTool.main(TestLogTool.java:8)

Corresponding regular expression

a1.sources.r1.interceptors.i3.regex = ^((?!error).)*$

1.4 multiple interceptors can be used at the same time

# Interceptor: acts on the Source to decorate or filter event s in the set order

a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i3.type = regex_filter
a1.sources.r1.interceptors.i3.regex = ^[0-9]*$

6, Custom interceptor

Overview: in actual development, there may be many types of logs generated by a server, and different types of logs may need to be sent to different analysis systems. At this time, the Multiplexing structure in Flume topology will be used. The principle of Multiplexing is to send different events to different channels according to the value of a key in the Header of the event. Therefore, we need to customize an Interceptor to give different values to the keys in the headers of different types of events.

Case demonstration: we simulate logs with port data and different types of logs with numbers (single) and letters (single). We need to customize the interceptor to distinguish numbers and letters and send them to different analysis systems (channels).

Implementation steps

1. Create a project and introduce the following dependencies

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

2. Customize the interceptor and realize the interceptor interface

package test;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MyInterceptor implements Interceptor{
    @Override
    public void initialize() {}

    @Override
    //Process the intercepted event
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        Map<String,String> header = new HashMap<>();
        if (body[0]>='a'&&body[0]<='z'){
            header.put("state","letter");
        }else {
            header.put("state","number");
        }
        event.setHeaders(header);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event e : list) {
            intercept(e);
        }
        return list;
    }

    @Override
    public void close() {}

    //Define a static inner class
    public static class Builder implements Interceptor.Builder{
		@Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {}
    }
}

3. Type the project into a jar package and upload it to the lib directory of flume installation directory

4. Write an agent and create it in the interceptors directory under the job directory, named my conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = test.MyInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
a1.sources.r1.selector.default  = c2

a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory  = /root/t1
a1.sinks.k1.sink.rollInterval = 600

a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /root/t2
a1.sinks.k1.sink.rollInterval = 600

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

5. Create t1 and t2 folders in / root directory

6. Test

[root@flume45 flume1.9]# bin/flume-ng agent --conf conf --name a1 --conf-file job/interceptors/my.conf -Dflume.roogger=INFO,console

7. View t1 and t2 files respectively

7, Flume internal process principle

1.Flume Agent internal principle

2.Flume composition structure and transaction details

batchSize(Batch size 100 default): appoint sources Number of rows of bulk read data/second
	Max number of lines to read and send to the channel at a time. Using the default is usually fine.
transactionCapacity(Transaction capacity 100 default): appoint Transaction What can be accommodated by the regional office event quantity
	The maximum number of events the channel will take from a source or give to a sink per transaction
Capacity(Capacity 100 default)appoint channel Buffer size:
	The maximum number of events stored in the channel
 Relationship among the three:
	batchSize<=transactionCapacity<=Capacity

8, Channel selector

Before the event enters the Channel, you can use the Channel selector to enter the specified event into the specified Channel

Flume has two built-in selectors, replicating and multiplexing. If no selector is specified in the Source configuration, the replication Channel selector will be automatically used.

property namedefaultdescribe
selector.typereplicatingReplicating
selector.typemultiplexingmultiplexing (i.e. selecting one of them)

Copying selector
Function: hand over the data of source to multiple channel s at the same time

1. Copying the Channel selector

Features: data synchronization to each Channel

[root@flume45 job]# mkdir selector
[root@flume45 job]# cd selector
[root@flume45 selector]# touch selector.conf

# The contents are as follows
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 666

a1.sources.r1.selector.type = replicating
a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /opt/datas/selector1
a1.sinks.k1.sink.rollInterval = 3000

a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /opt/datas/selector2
a1.sinks.k2.sink.rollInterval = 3000

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Code test

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-sdk</artifactId>
    <version>1.9.0</version>
</dependency>	

package test;

import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import java.util.HashMap;
import java.util.Map;

public class TestSelector {
    public static void main(String[] args) throws Exception {
      	//Copy selector
        //1. Create an rpc client object
        RpcClient client = RpcClientFactory.getDefaultInstance("flume45", 666);
        //2. Create an event object
        Event event = EventBuilder.withBody("hahaha".getBytes());
        //3. Send event object to server
        client.append(event);
        //4. Close the client connection
        client.close();
    }
}

Test result: both files have sink output

2. Multiplexing Channel selector

Features: data is shunted to the specified Channel

[root@flume45 selector]# touch selector2.conf

# The contents are as follows
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 666

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.android = c1
a1.sources.r1.selector.mapping.ios = c2

a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /opt/datas/selector1
a1.sinks.k1.sink.rollInterval = 600

a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /opt/datas/selector1
a1.sinks.k2.sink.rollInterval = 600

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Code test

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-sdk</artifactId>
    <version>1.9.0</version>
</dependency>

package test;

import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import java.util.HashMap;
import java.util.Map;

public class TestSelector {
    public static void main(String[] args) throws Exception {
        //1. Create an rpc client object
        RpcClient client = RpcClientFactory.getDefaultInstance("flume45", 666);
        //2. Create an event object
        Map map = new HashMap();
        map.put("state","android");
        Event event = EventBuilder.withBody("I'm from Andrew event".getBytes(),map);
        //3. Send the event object to the server
        client.append(event);
        //4. Close the client connection
        client.close();
    }
}

Test results: according to headers put("state","app1"); Setting determines which channel to send data to

9, Apply for Alibaba cloud SMS

  1. Log in to alicloud( https://account.aliyun.com/ )

  2. Search SMS service

  3. Open SMS service

  4. Management console

  5. Fast learning

  6. Expected result: SMS sending is completed through the console

  7. Fill in the relevant parameters in the list on the left and click to initiate the call. The debugging result on the right responds to the normal json string, and the corresponding filled PhoneNumbers can receive the verification code SMS

10, flume interview questions

1. What are the functions of flume's source, sink and channel?

① Source Component is specially used to collect data, and can process various types and formats of log data
	include avro,exec,spooling directory,netcat,tail dir
② Channel The component caches the collected data and can be stored in the Memory or File in
③ Sink Component is a component used to send data to the destination
	Destinations include Hdfs,Logger,avro,file,kafka

2. Function of flume's channel selector?

channel selectors You can let different project logs pass through different channel To different sink Middle go
 On official documents channel selector There are two types: Replicating channel selector (default) and
multiplexing channel selector 

These two selector The differences are: replicating Will source Come here events Send all channel,and multiplexing You can choose which to send to channle

3.flume parameter tuning?

1.source 
	increase source The number of can be increased source Ability to read files. For example, when a directory generates too many files, you need to split the file directory into multiple file directories and configure multiple files at the same time source To ensure source Have sufficient ability to obtain newly generated data.
	batchSize Parameter determination source One batch transportation to channle of event The number of bars can be improved by appropriately increasing this parameter source carry event reach channle Performance at

2.channle
	type choice memory Good performance, but may result in data loss. type choice file Better fault tolerance, but slightly worse performance.
	use file Channel Time dataDirs Configuring directories under multiple different disks can improve performance.
	Capacity Parameter determination Channel Can accommodate the largest event Number of entries. transactionCapacity Parameters determine each Source to channel It says the biggest event Number of pieces and each time Sink from channel Read the most in it event Number of entries. transactionCapacity Need greater than Source and Sink of batchSize Parameters.

capacity >= transactionCapacity >= (source and sink of batchSize)

3.sink
	increase Sink The number of can be increased Sink consumption event Ability. Sink Not the more the better, just enough, too much Sink It will occupy system resources and cause unnecessary waste of system resources.
	batchSize Parameter determination Sink One batch from Channel Read event The number of bars can be improved by appropriately increasing this parameter Sink from Channel Move out event Performance.

4. Transaction mechanism of flume

Flume Use two separate transactions to be responsible for Soucrce reach Channel,And from Channel reach Sink Event delivery. such as spooling directory source Create an event for each line of the file. Once all the events in the transaction are passed to the Channel And the submission is successful, then Soucrce Mark the file as complete. Similarly, transactions are handled in a similar manner Channel reach Sink If the event cannot be recorded for some reason, the transaction will be rolled back. And all events will remain until Channel Waiting for retransmission.

5. Will flume collected data be lost?

No, Channel storage can be stored in File, and data transmission has its own transactions

6.Flume uploads hdfs file scrolling

# Test case list
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/testFile.log

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

a1.channels.c1.type = memory

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://flume45:9000/flume6/%y-%m-%d/$H
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 10485760
a1.sinks.k1.hdfs.rollICount = 0

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

7. Dealing with small hdfs files

1.flume Upload files and scroll to generate new files
	hdfs.rollInterval  time              30s
	hdfs.rollSize      file size          one thousand and twenty-four
	hdfs.rollCount     event Quantity related     10

2.Expectation 10 M Generate a file, and the and time are the same event Quantity independent  
hdfs.rollInterval=0
hdfs.rollSize=10485760
hdfs.rollCount=0

3.Problems after three properties have been configured
	hadoop Number of distributed environment replicas 3    flume Internal default value,There will be no small problems
	Pseudo distributed environment      Number of copies 1    hdfs-site.xml  
	because flume The default number of copies of the underlying uploaded file is 3. In the pseudo distributed environment, the number of copies we set is 1, but flume I don't know
 cause flume When the underlying source code is processed, it leads to rollInterval/rollSize/rollCount Not effective

4.Solution
  take hadoop Lower hdfs-site.xml Copy to flume of conf Directory, without additional configuration, flume Will automatically identify
  
understand shell Script, loop, to a.log Print content, sleep Play the role of sleep
#!/bin/bash

i=0
while ((1))
do
   echo "hello world,$i" >> /opt/a.log
   #echo hello
   let i++
   sleep 0.01
done

The upper side can only generate files by scrolling, and the file directory can be generated by scrolling through the configuration at the lower side
#Scroll folders by time
hdfs.round = true
#How many time units to create a new folder
hdfs.roundValue = 10
#Redefine time units
hdfs.roundUnit = minute

#How often do I generate a new file? 0 means disabled
a2.sinks.k2.hdfs.rollInterval = 0
#Set the scroll size of each file 1048576 = 1M
a2.sinks.k2.hdfs.rollSize = 1048576
#File scrolling is independent of the number of events
a2.sinks.k2.hdfs.rollCount = 0

hdfs.round	     false	 Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue	 1	     Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnit	 second	 The unit of the round down value - second, minute or hour.

Special note: if the test uses hadoop Pseudo distributed environment, need to hadoop Lower hdfs-site.xml Copy to flume of conf Directory

8.Flume summary

1.Flume Function: collect data
2.Flume Three important components: Source/Channel/Sink
3.Common component types:
	Source:netcat,exec,spooldir,http,taildir(1.7 important)
	Channel:memory,file
	Sink: logger,hdfs,file_roll,kafka
4.Flume Medium and multi-layer channel,many sink Topology diagram
	Two agent Tandem, left sink,right source All avro
	avro.sink.hostname = (Host receiving data IP)    bind((which host is currently bound)
	many channel,many sink  Function: one agent Output the data to two destinations respectively, source Output data to channel,Using the copy selector
	sink Group role: sink There is only one in the group sink Can get channel Data for load balancing or disaster recovery.
	Data summary (first level) agent The data is summarized to the second layer agent)Function: reduce write hdfs pressure(Equivalent to buffer)
5.Interceptor
	Function: in source take event Pass to channel Between work, you can event Event header Perform the operation.
		Messages can be intercepted and processed according to conditions such as regular expressions body Content of the.
6.selector
	Copy selectors: multiple channel Get the same event
	Multiplexer:according to header Judge and select event To whom channel
7.custom interceptor 
	Define a class to implement Interceptor Interface, implementation interceptor method    Purpose: treatment event
	Define a static internal class to implement Interceptor.Builder Interface purpose: create interceptor object
	Build project jar,Throw to flume/lib Directory
	a1.sources.r1.interceptor = i1
	a1.sources.r1.interceptor.i1.type = Package name.Class name $Builder

Keywords: Big Data flume

Added by jcstanley on Wed, 19 Jan 2022 16:03:41 +0200