Flume08: [case] Channel Selectors01: Replicating Channel Selector for multiple channels

1, Channel Selectors

Next, take a look at Channel Selectors
Channel Selectors include: Replicating Channel Selector and Multiplexing Channel Selector
The Replicating Channel Selector is the default Channel selector. It will send the events collected by the Source to all channels
Check the official documentation for an explanation of this default channel selector

In the configuration of this example, c3 is an optional channel. Failure to write to c3 will be ignored. Since c1 and c2 are not marked as optional, failure to write to these channels will cause the transaction to fail
For this configuration, generally speaking, the data of source will be sent to the three chanles of c1, c2 and c3, which can ensure that c1 and c2 will receive all data, but c3 cannot

This selector The optional parameter is optional and can be used without configuration.
If there are multiple channels, you can specify the names of multiple channels directly after the channels parameter. The names of multiple channels are separated by spaces,
In fact, you can see that the name is channels with s. from the name, you can see that it supports multiple channels

Another Channel selector is the Multiplexing Channel Selector, which means that events will be sent to different channels according to the value in the header in the Event

Take a look at the introduction on the official website

In the configuration of this example, four channels are specified, C1, c2, c3 and c4
The specific channel to which the data collected by source will be sent will be based on the value of the state attribute in the header in the event. This is through the selector Header controlled

If state The value of the property is CZ,Send to c1
 If state The value of the property is US,Send to c2 c3
 If state If the value of the property is another value, it is sent to c4

These rules are through selector Mapping and selector Default controlled

In this way, the data can be distributed to different channel s according to certain rules.

Next, let's look at a case

2, Replicating Channel Selector for multiple channels

In this case, we use the Replicating selector to repeatedly send the data collected by the source to two channels. Finally, each channel is followed by a sink, which is responsible for storing the data in different storage media for later use.

In practical work, this kind of demand is quite common, that is, we hope to store a data in different storage media after collecting it. The characteristics and application scenarios of different storage media are different, typically hdfssink and kafkasink,

The off-line data is stored on the disk through hdfssink, which is convenient for off-line data calculation later
Real time data storage is realized through kafkasink, which is convenient for real-time calculation later,

Since we haven't learned kafka yet, we use the loggersink agent here first.

Next, configure the agent according to the source, channel and sink listed in the figure

Create a TCP - to - replicatingchannel in the conf directory of flume in bigdata04 conf

[root@bigdata04 conf]# vi tcp-to-replicatingchannel.conf
# The name of the agent is a1
# Specify the names of source component, channel component and Sink component
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# Configure source component
a1.sources.r1.type = netcat
a1.sources.r1.bind =
a1.sources.r1.port = 44444

# Configure CHANLE selector [copying is the default, so it can be omitted]
a1.sources.r1.selector.type = replicating

# Configuring channel components
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Configuring sink components
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.filePrefix = data
a1.sinks.k2.hdfs.fileSuffix = .log

# Connect the components
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Start Agent

[root@bigdata04 apache-flume-1.9.0-bin]#  bin/flume-ng agent --name a1 --conf conf --conf-file conf/tcp-to-replicatingchannel.conf -Dflume.root.logger=INFO,console

Generate test data and connect to socket through telnet

[root@bigdata04 ~]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.

Viewing the effect, you can see the log information output by Flume on the console

2020-05-03 09:51:51,123 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
2020-05-03 09:51:51,141 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2020-05-03 09:51:51,396 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://

View the contents of the file generated by sink2 in hdfs

[root@bigdata04 ~]# hdfs dfs -cat hdfs://

This is the application of Replicating Channel Selector.

Keywords: Hadoop flume

Added by deed02392 on Fri, 04 Mar 2022 21:08:31 +0200