Flume07: [case] Source Interceptors of flume's advanced components

1, Source Interceptors

Next, let's take a look at the first advanced component, Source Interceptors

Many Source Interceptors have been built into the system
Common types of Source Interceptors: Timestamp Interceptor, Host Interceptor, Search and Replace Interceptor, Static Interceptor, Regex Extractor Interceptor, etc

1,Timestamp Interceptor: towards event Medium header Add inside timestamp Timestamp information
2,Host Interceptor: towards event Medium header Add inside host Properties, host The value of is the host name of the current machine or ip
3,Search and Replace Interceptor: Query according to specified rules Event in body The data in it is then replaced, and the interceptor will modify it event in body That is, the original collected data content will be modified
4,Static Interceptor: towards event Medium header Add fixed key and value
5,Regex Extractor Interceptor: From the specified rule Event Medium body Extract data from it and generate key and value,Again key and value Add to header in

According to the analysis just now, it is summarized as follows:

Timestamp Interceptor, Host Interceptor, Static Interceptor, Regex Extractor Interceptor add key value data to the header in the event to facilitate the use of subsequent channel and sink components, without any impact on the content of the collected original data

The Search and Replace Interceptor will modify the original data content in the body of the event according to the rules without any impact on the header. You need to be careful when using this interceptor because it will modify the original data content.

Among these interceptors, Search and Replace Interceptor and Regex Extractor Interceptor are used more in our work.

2, Case

Let's take a look at a case:
The collected data will be stored by type and by day

Our original data is like this. Look at this file, Flume test data format txt




There are three types of data in this data: video information, user information and gift giving information. The data is in json format. Another common feature of these data is that there is a type field in which the value of the type field represents the data type

When our live broadcast platform operates normally, these log data will be generated in real time. We hope to collect these data on hdfs for storage, and store them in different directories according to the data type. Video data, user data and gift data are placed in one piece

If the agent is configured for this requirement, source uses file based execsource and CHANLE uses file based CHANLE. We want to ensure the integrity and accuracy of the data. sink uses hdfssink

However, note that the path in hdfssink cannot be written dead. First, it needs to dynamically obtain the date by day, and then because different types of data need to be stored in different directories
That means that there must be variables in the path, including data type variables in addition to date variables.

The format of the data types here is that there is an underscore in the middle of the word, but our requirement is that the words in the directory do not appear underlined and use the naming format of hump.

Therefore, the final directory to be generated in hdfs is roughly like this.


The date variable here is easy to obtain, but how to obtain the data type?

Attention, we analyzed it earlier and passed it source Your interceptor can event of header Add in key-value,Then in the back channle perhaps sink Get in key-value Value of

Then we can get the value of the type field in the original data through the Regex Extractor Interceptor, and store it in the header, so that we can get it in the sink stage.

However, the value of type obtained directly at this time does not meet the requirements. It is necessary to convert the value of type, remove the underline and convert it into hump form
Therefore, you can first use Search and Replace Interceptor to convert the value of type in the original data, and then use Regex Extractor Interceptor to specify rules to obtain the value of type field and add it to the header.

So the overall process is like this

Exec Source -> Search and Replace Interceptor->Regex Extractor Interceptor->File Channel->HDFS Sink

Now let's configure the Agent and create file-to-hdfs-moretype on bigdata04 machine conf

# The name of the agent is a1
# Specify the names of source component, channel component and Sink component
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Configure source component
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/moreType.log

# Configure interceptors [multiple interceptors are executed in sequence]
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info"
a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo"

a1.sources.r1.interceptors.i2.type = search_replace
a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info"
a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo"

a1.sources.r1.interceptors.i3.type = search_replace
a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record"
a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord"

a1.sources.r1.interceptors.i4.type = regex_extractor
a1.sources.r1.interceptors.i4.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i4.serializers = s1
a1.sources.r1.interceptors.i4.serializers.s1.name = logType

# Configuring channel components
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/moreType/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/moreType/data

# Configuring sink components
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://{logType}
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#Add file prefix and suffix
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log

# Connect the components
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Note: there are interceptors. One or more interceptors can be set. Each piece of data collected by source will be processed by all interceptors, and multiple interceptors will be executed in sequence.

The prefix and suffix of hdfs files are also uniformly set later

The test data is generated by Mr. and the data is manually added to moretype in advance Log file

[root@bigdata04 ~]# cd /data/log/
[root@bigdata04 log]# vi moreType.log

Start Agent

[root@bigdata04 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs-moreType.conf -Dflume.root.logger=INFO,console

Check the verification results on the bigdata01 node

[root@bigdata01 soft]# hdfs dfs -ls -R /moreType/20200502/
drwxr-xr-x   - root supergroup          0 2020-05-02 23:57 /moreType/20200502/giftRecord
-rw-r--r--   2 root supergroup        138 2020-05-02 23:57 /moreType/20200502/giftRecord/data.1588435055112.log.tmp
drwxr-xr-x   - root supergroup          0 2020-05-02 23:57 /moreType/20200502/userInfo
-rw-r--r--   2 root supergroup        413 2020-05-02 23:57 /moreType/20200502/userInfo/data.1588435055069.log.tmp
drwxr-xr-x   - root supergroup          0 2020-05-02 23:57 /moreType/20200502/videoInfo
-rw-r--r--   2 root supergroup        297 2020-05-02 23:57 /moreType/20200502/videoInfo/data.1588435053696.log.tmp

Take a look at the file content in HDFS and find that the value of the type field is indeed modified by the interceptor

[root@bigdata01 soft]# hdfs dfs -cat /moreType/20200502/videoInfo/data.1588435053696.log.tmp

Keywords: Hadoop flume

Added by bdamod1 on Fri, 04 Mar 2022 07:35:30 +0200