Enterprise Architecture Case for flume Learning

Advances in flume learning

Flume Transactions

The primary purpose is to ensure data consistency, either with success or with failure.

Transaction schematics

Flume Agent Internal Principles

To summarize: That is to say Source Collected in event Not directly to channel Instead, a ChannelProcessor,this processor Will let us event Goes to take the interceptor chain, and then processor After passing through the interceptor chain event Send to ChannelSelector,selector There are two types: Replicating Channel Selector and Multiplexing Channel Selector,The former will event To each channel Send one copy in each, and the latter is to be configured to determine which to go to channel Sent in. Arrived channel After middle, another one will pass by SinkProcessor,sinkProcessor There are three types: defaultSinkProcessor,
LoadBalancingSinkProcessor,FailoverSinkProcessor,The first is for only one channel In this case, the second is load balanced, which will vary according to an algorithm channel transmission event,The third is that failover is highly available.

Schematic diagram

Flume Topology

Use multiple flume s together

Simple series

In this series structure, A is required to be client, B is server, A sink is AVRO,B source is AVRO.

Start Server-side B at startup

This mode connects multiple flumes sequentially, starting from the initial source to the destination storage system for the final sink transfer. This mode does not recommend bridging too many flumes. Too many flumes will not only affect the transfer rate, but will also affect the entire transmission system once a node flumes down during the transfer process.

Replication and multiplexing

Copy: A copy of the data is copied to a different channel and finally to hdfs,kafka, and other flume s.

Multiplexing: One piece of data is separated into unused channels, one part to hdfs, one part to kafka, etc. This can be achieved by multiplexing channel selector

Flume supports the flow of events to one or more destinations. This mode allows you to copy the same data to multiple channels or distribute different data to different channels, and sink can choose to transfer to different destinations.

Load Balancing and Failover

Flume supports the use of logically grouping multiple sinks into a single sink group that can work with different SinkProcessor s to achieve load balancing and error recovery.

polymerization

This is the most common and practical mode for us. Daily web applications are usually distributed over hundreds of servers, or even thousands or thousands of servers. The resulting logs can be very cumbersome to process. This combination of flumes can solve this problem very well. Each server deploys a flume to collect logs and send them to a flume that collects logs centrally.Flume is then uploaded to hdfs, hive, hbase, and so on for log analysis.

Flume Enterprise Development Case

Replication and multiplexing cases

copy

Requirements: Use Flume-1 to monitor file changes,

Flume-1 passes changes to Flume-2, which stores them in HDFS.

Flume-1 passes changes to Flume-3, which is responsible for output to the Local FileSystem.

Architecture diagram

Dead work

# Create a replicating folder in the / opt/module/flume-1.9.0/jobs directory to store flume1,2,3 configurations
[atguigu@hadoop102 jobs]$ pwd
/opt/module/flume-1.9.0/jobs
[atguigu@hadoop102 jobs]$ mkdir replicating
[atguigu@hadoop102 jobs]$ cd replicating/
[atguigu@hadoop102 replicating]$ ls

Write flume3.conf configuration against structure

# Processing data, writing out to local file system
# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source, need to dock with the last agent, use avro
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

# sink 
a3.sinks.k1.type = file_roll
# Remember to create the fileroll folder
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll

# channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

Write flume2.conf configuration against structure

# Processing data, writing out to hdfs
# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# source, need to dock with the last agent, use avro
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

# sink 
a2.sinks.k1.type = hdfs
# Note that path here is the configuration of fs.defaultFS in core-site.xml of hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H 
#Prefix for uploading files
a2.sinks.k1.hdfs.filePrefix = logs-
#Whether to scroll folders according to time
a2.sinks.k1.hdfs.round = true
#How much time unit to create a new folder
a2.sinks.k1.hdfs.roundValue = 1
#Redefine time units
a2.sinks.k1.hdfs.roundUnit = hour
#Whether to use local timestamps
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#How many Event s to flush to HDFS once
a2.sinks.k1.hdfs.batchSize = 100
#Set file type to support compression
a2.sinks.k1.hdfs.fileType = DataStream
#How often to generate a new file in seconds
a2.sinks.k1.hdfs.rollInterval = 60
#Set the scroll size of 128M for each file
a2.sinks.k1.hdfs.rollSize = 134217700
#File scrolling is independent of the number of Event s
a2.sinks.k1.hdfs.rollCount = 0


# channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

Write flume1.conf configuration against structure

# This is an upstream agent and needs to dock two downstream agents flume2, flume3
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Specify source configuration
a1.sources.r1.type = TAILDIR
# Multiple groups can be set
a1.sources.r1.filegroups = f1
# Monitored Directory
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
# Breakpoint Continuation (json format)
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
# channel Selector configuration
a1.sources.r1.selector.type = replicating

# Specify sink configuration (two sinks)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888


# Specify channel configuration (two channels)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

# Specify the binding relationship between source,sink,channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Execute flume command

# flume3
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console

# flume2
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console

# flume1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console

Append content to/opt/module/flume-1.9.0/jobs/taildir/.*.txt

[atguigu@hadoop102 taildir]$ pwd
/opt/module/flume-1.9.0/jobs/taildir
[atguigu@hadoop102 taildir]$ ll
 Total usage 16
-rw-rw-r--. 1 atguigu atguigu 12 10 January 1019:29 file1.txt
-rw-rw-r--. 1 atguigu atguigu 13 10 January 1019:30 file2.txt
[atguigu@hadoop102 taildir]$ echo hello >> file1.txt 
[atguigu@hadoop102 taildir]$ echo pihao >> file2.txt 
[atguigu@hadoop102 taildir]$ 

View content in hdfs

View local system content

[atguigu@hadoop102 fileroll]$ pwd
/opt/module/flume-1.9.0/jobs/fileroll
[atguigu@hadoop102 fileroll]$ 
[atguigu@hadoop102 fileroll]$ ll
 Total usage 8 # This locality is a bit odd. It generates a file every 30s, with or without new content
-rw-rw-r--. 1 atguigu atguigu 0 10 January 1023:52 1633881154435-1
-rw-rw-r--. 1 atguigu atguigu 0 10 January 1023:53 1633881154435-2
-rw-rw-r--. 1 atguigu atguigu 6 10 January 1023:55 1633881154435-3
-rw-rw-r--. 1 atguigu atguigu 6 10 January 1023:55 1633881154435-4
[atguigu@hadoop102 fileroll]$ cat 1633881154435-3
hello
[atguigu@hadoop102 fileroll]$ cat 1633881154435-4
pihao

The test is complete, now a copy of the data is transferred to a different destination, copy the test OK!

Load Balancing and Failover Cases

Requirements: Use Flume1 to monitor a port, sink in sink group is connected to Flume2 and Flume3 respectively, Failover SinkProcessor is used for failover function

Load Balancing First

Use Flume1 Monitor a port and send monitored content to it in a polling or random manner flume2 and flume3
flume2 Output data to console
flume3 Output data to console

Architecture diagram

flume3.conf

# Processing data, writing out to local file system
# Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# source, need to dock with the last agent, use avro
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

# sink 
a3.sinks.k1.type = logger

# channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

flume2.conf

# Processing data, writing out to hdfs
# Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# source, need to dock with the last agent, use avro
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

# sink 
a2.sinks.k1.type = logger


# channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume1.conf

# This is an upstream agent and needs to dock two downstream agents flume2, flume3
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Specify source configuration
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666

# channel Selector configuration
a1.sources.r1.selector.type = replicating

# Specify sink configuration (two sinks)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

# sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
# Specify selector as polling or random round_robin/random
a1.sinkgroups.g1.processor.selector = round_robin


# Specify channel configuration (two channels)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100



# Specify the binding relationship between source,sink,channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

Create the above profiles separately

[atguigu@hadoop102 jobs]$ pwd
/opt/module/flume-1.9.0/jobs
[atguigu@hadoop102 jobs]$ mkdir loadbalancing
[atguigu@hadoop102 jobs]$ cd loadbalancing/
[atguigu@hadoop102 loadbalancing]$ vim flume1.conf
[atguigu@hadoop102 loadbalancing]$ vim flume2.conf
[atguigu@hadoop102 loadbalancing]$ vim flume3.conf
[atguigu@hadoop102 loadbalancing]$ ll
 Total usage 12
-rw-rw-r--. 1 atguigu atguigu 992 10 November 120:32 flume1.conf
-rw-rw-r--. 1 atguigu atguigu 430 10 November 120:32 flume2.conf
-rw-rw-r--. 1 atguigu atguigu 443 10 November 120:32 flume3.conf
[atguigu@hadoop102 loadbalancing]$ 

Execute flume command

# flume3
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console

# flume2
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console

# flume1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

Test Send Data

[atguigu@hadoop102 loadbalancing]$ nc localhost 6666
hello
OK
pihao
OK
flume
OK

The test succeeded and found that only one sink was used to upload data at a time, and the polling was implemented. Note: The polling here does not refer to a single event's polling. Instead, it refers to that sink is polling to fetch data into channels. It is possible that this time flume2 gets data, then flume3 grabs it, and there is no data in channels. Next time, flume2 is getting data, there is data available, right

Test random

Simply change the sink processor in flume1.conf to:
a1.sinkgroups.g1.processor.selector = random

Test ok, load balancing polling and random mode testing is complete, then test failover

Failover Testing

flume1 monitors port data, sends monitored content to active's flume, and automatically transfers it to other flumes when the active's flume is down for high availability

Architecture diagram

This example is actually similar to the architecture of the load balancing example above. The only difference is the sink processor. The above example is the load_balancing sink processor, and this example is the failover sink processor

Configure flume1.conf

# This is an upstream agent and needs to dock two downstream agents flume2, flume3
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Specify source configuration
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666

# channel Selector configuration
a1.sources.r1.selector.type = replicating

# Specify sink configuration (two sinks)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

# sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
# Configure failover sink processor
a1.sinkgroups.g1.processor.type = failover
# Configure priority (see which sink becomes active)
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10

# Specify channel configuration (two channels)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100



# Specify the binding relationship between source,sink,channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

Create flume profiles separately

[atguigu@hadoop102 jobs]$ pwd
/opt/module/flume-1.9.0/jobs
[atguigu@hadoop102 jobs]$ mkdir failover
[atguigu@hadoop102 jobs]$ cd failover/
[atguigu@hadoop102 failover]$ vim flume1.conf
[atguigu@hadoop102 failover]$ vim flume2.conf
[atguigu@hadoop102 failover]$ vim flume3.conf
[atguigu@hadoop102 failover]$ ll
 Total usage 12
-rw-rw-r--. 1 atguigu atguigu 1054 10 11 21 January:06 flume1.conf
-rw-rw-r--. 1 atguigu atguigu  430 10 11 21 January:06 flume2.conf
-rw-rw-r--. 1 atguigu atguigu  443 10 11 21 January:06 flume3.conf
[atguigu@hadoop102 failover]$ 

Execute flume command

# flume3
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console

# flume2
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console

# flume1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console

Test Send Data

[atguigu@hadoop102 failover]$ nc localhost 6666
hello
OK
pihao
OK
nihoaya
OK
haha 
OK
hhe
OK

Now manually turn off flume3, then send the data again, and find that the data now goes to flume2 for failover.

Finally, I restarted flume3, sent the data again after successful connection, and found that the data is back to flume3

Aggregate Cases

Requirements:

Flume1 monitoring file/opt/module/flume-1.9.0/jobs/taildir/.txt on hadoop102,

Flume2 on hadoop103 monitors the data flow on port 6666.

Flume-1 and Flume-2 send the data to Flume3 on hadoop104, and Flume3 prints the final data to the console.

Architecture diagram

Write flume1.conf configuration

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Write flume2.conf configuration

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 8888

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 

Write flume3.conf configuration

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 

Create configuration files separately

[atguigu@hadoop102 jobs]$ pwd
/opt/module/flume-1.9.0/jobs
[atguigu@hadoop102 jobs]$ mkdir aggregate
[atguigu@hadoop102 jobs]$ cd aggregate/
[atguigu@hadoop102 aggregate]$ vim flume1.conf
[atguigu@hadoop102 aggregate]$ vim flume2.conf
[atguigu@hadoop102 aggregate]$ vim flume3.conf
[atguigu@hadoop102 aggregate]$ ll
 Total usage 12
-rw-rw-r--. 1 atguigu atguigu 538 10 11 21 January:35 flume1.conf
-rw-rw-r--. 1 atguigu atguigu 404 10 11 21 January:35 flume2.conf
-rw-rw-r--. 1 atguigu atguigu 354 10 11 21 January:35 flume3.conf
[atguigu@hadoop102 aggregate]$ 

Note that the configuration file has now been created, but there is no flume on the hadoop103 and hadoop104 machines, and the configuration of environment variables for flume needs to be distributed as well.

#Execute in hadoop102
[atguigu@hadoop102 module]$ pwd
/opt/module
[atguigu@hadoop102 module]$ my_rsync.sh flume-1.9.0/
[atguigu@hadoop102 module]$ scp -r /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/
[atguigu@hadoop102 module]$ scp -r /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/

# Test whether environment variables are valid at 103,104, respectively

Execute flume command

# Start each machine in the following order

# flume3 (104) Downstream Service Start First
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregate/flume3.conf -n a3 -Dflume.root.logger=INFO,console

# flume1 (102)
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregate/flume1.conf -n a1 -Dflume.root.logger=INFO,console

# flume2 (103)
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregate/flume2.conf -n a2 -Dflume.root.logger=INFO,console

Three boots succeeded!

test

# Append to 102 test files
[atguigu@hadoop102 taildir]$ pwd
/opt/module/flume-1.9.0/jobs/taildir
[atguigu@hadoop102 taildir]$ ll
 Total usage 16
-rw-rw-r--. 1 atguigu atguigu 18 10 January 1023:54 file1.txt
-rw-rw-r--. 1 atguigu atguigu 19 10 January 1023:55 file2.txt
-rw-rw-r--. 1 atguigu atguigu 11 10 January 1019:30 log1.log
-rw-rw-r--. 1 atguigu atguigu 11 10 January 1019:30 log2.log
[atguigu@hadoop102 taildir]$ echo hello >> file1.txt 
[atguigu@hadoop102 taildir]$ echo pihao >> file2.txt 
[atguigu@hadoop102 taildir]$ 

# Send content on test port 103
[atguigu@hadoop103 ~]$ nc localhost 6666
hello2
OK
pihao2
OK

# View output from 104 console

ok, aggregated case testing complete

Customize Interceptor

Requirements: Imagine a scenario where the data I collect contains a variety of information, including bigdata and java. Then I want to distribute the data containing bigdata and Java to different destinations. How can I do this?

Analysis

# You can use a multiplexer to send logs to the sink you want to specify. Take a look at the multiplexer configuration for your official network first:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

# Roughly, this means getting the header from the event, then determining the value of the state field, and distributing it to the specified channel.Implementing a custom configuration channel

Then think again about this event Enter this channel Before we do that, we need to event Set up header Head. Where to set it? That's in interceptor Interceptors can be set.

Write a java project

package com.pihao.flume.interceptor;

import com.nimbusds.jose.util.StandardCharset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;

/**
 * Custom flume interceptor
 */
public class EventHeaderInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // Get header
        Map<String, String> headers = event.getHeaders();

        // Get body
        String body = new String(event.getBody(), StandardCharset.UTF_8);
        //Determine if a body contains bigdata,java
        if(body.contains("bigdata")){
            headers.put("whichChannel","bigdata");
        }else if(body.contains("java")){
            headers.put("whichChannel","java");
        }else{
          //  headers.put("whichChannel","other");This is not writable and there is a default option in the multiplexer. This is the case
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }


    /**
     * builder Internal class to instantiate the interceptor class above
     */
    public static class MyBuilder implements Builder{
        @Override
        public Interceptor build() {
            return new EventHeaderInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
# Then pack it and send it to lib directory of cflume
[atguigu@hadoop102 lib]$ ls|grep pihao
pihao_event_interceptor.jar
[atguigu@hadoop102 lib]$ 

Write a configuration file

Multiplexer case
flume1 Monitor port data, send monitored data to flume2,flume3,flume4.Contain bigdata,java,Other Issues flume4
flume2,flume3,flume4 Direct Output

Architecture diagram

Write a configuration file

#flume1.conf 
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555

#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = whichChannel
a1.sources.r1.selector.mapping.bigdata = c1
a1.sources.r1.selector.mapping.java = c2
a1.sources.r1.selector.default = c3

# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.EventHeaderInterceptor$MyBuilder

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100


a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100


#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888

#Bind
a1.sources.r1.channels = c1 c2 c3   
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 
a1.sinks.k3.channel = c3 




#flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 


#flume3.conf 
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 



#flume4.conf 
#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1 

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888

#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#Sink
a4.sinks.k1.type = logger

#Bind
a4.sources.r1.channels = c1 
a4.sinks.k1.channel = c1 


start-up:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console

ok, boot successful, now start sending data to port 5555 to see if you can separate bigdata,java

[atguigu@hadoop102 multi]$ nc localhost 5555
bigdata # flume2
OK
java	# flume3
OK
other	# flume4
OK

Multiple Case Test Successful

Customize Source, Customize Sink

Most Source s and Sink s are already defined on the official website, which is not important

Flume Data Flow Monitoring

Use Ganglia monitoring to learn about the installation yourself

Business Real Interview Questions

How do you do it Flume Data transfer monitoring?
Use third-party framework Ganglia real time monitoring Flume. 

Flume Of Source,Sink,Channel Role? You Source What type?
1)Effect
(1)Source Components are designed to collect data and can handle various types and formats of log data, including avro,thrift,exec,jms,spooling directory,netcat,sequence generator,syslog,http,legacy
(2)Channel The component caches the collected data and can store it in Memory or File Medium.
(3)Sink A component is a component used to send data to a destination that includes Hdfs,Logger,avro,thrift,ipc,file,Hbase,solr,Customize.
2)Our company uses Source The type is:
(1)Monitor background logs: exec
(2)Monitor the port of background log generation: netcat

Channel Selectors for Flume?

Flume Parameter tuning?

1)Source
 increase Source (Used) Tair Dir Source Can be increased FileGroups Number) can be increased Source For example, when a directory produces too many files, it is necessary to split the file directory into several file directories and configure many files at the same time. Source To ensure Source Enough capacity to obtain newly generated data.
batchSize Parameter determination Source One-time bulk shipment to Channel Of event Number of bars, increasing this parameter appropriately can improve Source Carry Event reach Channel The performance at time.
2)Channel 
type Choice memory time Channel Best performance, but if Flume Data may be lost if the process hangs unexpectedly. type Choice file time Channel Better fault tolerance, but better performance than memory channel Poor.
Use file Channel time dataDirs Configuring multiple directories under different disks can improve performance.
Capacity Parameter determination Channel Maximum capacity event Number of bars. transactionCapacity Parameters determine each time Source to channel Maximum Written Inside event Number of bars and each time Sink from channel Maximum Read Inside event Number of bars. transactionCapacity Need to be greater than Source and Sink Of batchSize Parameters.
3)Sink 
increase Sink Number can be increased Sink consumption event Ability. Sink Nor is it that the more, the better, the more Sink Will occupy system resources, causing unnecessary waste of system resources.
batchSize Parameter determination Sink One batch from Channel Read event Number of bars, increasing this parameter appropriately can improve Sink from Channel Remove event Performance.
Flume Transaction mechanism?

Flume Transaction mechanism (similar to database transaction mechanism): Flume Using two separate transactions responsible for Soucrce reach Channel,And from Channel reach Sink Event delivery. For example spooling directory source Create an event for each line of the file once all events in the transaction are passed to Channel And submitted successfully, then Soucrce This file is marked as complete. Similarly, transactions are handled in a similar manner from Channel reach Sink If for some reason the event cannot be logged, the transaction will be rolled back. And all events will remain untouched Channel Medium, waiting for re-delivery.

Keywords: Big Data Hadoop flume

Added by Jax2 on Mon, 11 Oct 2021 19:39:19 +0300