3 flume custom components

1. Customize the Interceptor

1.1 case requirements

When Flume is used to collect the local logs of the server, different types of logs need to be sent to different locations according to different log types
The same analysis system.

1.2 demand analysis

In actual development, there may be many types of logs generated by a server, and different types of logs may need to be sent to different analysis systems. In this case, the Multiplexing structure in Flume topology will be used. The principle of Multiplexing is to send different events to different channels according to the value of a key in the Header of the event. Therefore, we need to customize an Interceptor to assign different values to the keys in the headers of different types of events.
In this case, we simulate logs with port data and different types of logs with numbers (single) and letters (single). We need to customize the interceptor to distinguish numbers and letters and send them to different analysis systems (channels).

1.3 implementation steps

Create a maven project and introduce the following dependencies.

<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>

Define the CustomInterceptor class and implement the Interceptor interface.

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class CustomInterceptor implements Interceptor {
 @Override
 public void initialize() {
 }
 @Override
 public Event intercept(Event event) {
 byte[] body = event.getBody();
 if (body[0] < 'z' && body[0] > 'a') {
 event.getHeaders().put("type", "letter");
 } else if (body[0] > '0' && body[0] < '9') {
 event.getHeaders().put("type", "number");
 }
 return event;
 }
 @Override
 public List<Event> intercept(List<Event> events) {
 for (Event event : events) {
 intercept(event);
 }
 return events;
 }
 @Override
 public void close() {
 }
 public static class Builder implements Interceptor.Builder {
 @Override
 public Interceptor build() {
 return new CustomInterceptor();
 }
 @Override
 public void configure(Context context) {
 }
 } }

Edit flume profile
hadoop102:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 
com.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop103:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

hadoop104:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

2 custom Source

2.1 introduction

Source is the component responsible for receiving data to Flume Agent. The source component can handle various types and formats of log data, including avro, thrift, exec, jms, spooling directory, netcat, sequence generator, syslog, http and legacy. There are many official source types, but sometimes they can't meet the needs of actual development. At this time, we need to customize some sources according to the actual needs.

MySource needs to inherit AbstractSource class and implement Configurable and PollableSource interfaces.
Implement corresponding methods:
getBackOffSleepIncrement() / / not used yet
getMaxBackOffSleepInterval() / / not used yet
Configure (context) / / initialize the context (read the content of the configuration file)
process() / / encapsulate the obtained data into an event and write it to the channel. This method will be called circularly.
Usage scenario: read MySQL data or other file systems.

2.2 requirements

Flume is used to receive data, prefix each data and output it to the console. The prefix can be configured from the flume configuration file.

2.3 coding

Import pom dependencies

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements 
Configurable, PollableSource {
 //Define the fields that the configuration file will read in the future
 private Long delay;
 private String field;
 //Initialize configuration information
 @Override
 public void configure(Context context) {
 delay = context.getLong("delay");
 field = context.getString("field", "Hello!");
 }
 @Override
 public Status process() throws EventDeliveryException {
 try {
 //Create event header information
 HashMap<String, String> hearderMap = new HashMap<>();
 //Create event
 SimpleEvent event = new SimpleEvent();
 //Circular encapsulation event
 for (int i = 0; i < 5; i++) {
 //Set header information for events
 event.setHeaders(hearderMap);
 //Set content for event
 event.setBody((field + i).getBytes());
 //Write events to channel
 getChannelProcessor().processEvent(event);
 Thread.sleep(delay);
 }
 } catch (Exception e) {
 e.printStackTrace();
 return Status.BACKOFF;
 }
 return Status.READY;
 }
 @Override
 public long getBackOffSleepIncrement() {
 return 0;
 }
 @Override
 public long getMaxBackOffSleepInterval() {
 return 0;
  }
  }

configuration file

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = atguigu
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Open task

bin/flume-ng agent -c conf/ -f 
job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

3. Customize Sink

3.1 introduction

Sink constantly polls events in the Channel and removes them in batches, and writes these events in batches to the storage or indexing system, or is sent to another Flume Agent.
Sink is completely transactional. Before bulk deleting data from the Channel, each sink starts a transaction with the Channel. Once batch events are successfully written out to the storage system or the next Flume Agent, sink will commit transactions using the Channel. Once the transaction is committed, the Channel deletes the event from its own internal buffer.
Sink component destinations include hdfs, logger, avro, thrift, ipc, file, null, HBase, solr, and custom. There are many types of sink officially provided, but sometimes it can not meet the needs of actual development. At this time, we need to customize some sink according to the actual needs.
MySink needs to inherit the AbstractSink class and implement the Configurable interface.
Implement corresponding methods:
Configure (context) / / initialize the context (read the content of the configuration file)
process() / / read and obtain data (event) from the Channel. This method will be called circularly.
Usage scenario: read Channel data and write it to MySQL or other file systems.

3.2 requirements

Use flume to receive data, add prefix and suffix to each data at the Sink end, and output it to the console. The prefix and suffix can be configured in the flume task configuration file.

Write code

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
 //Create Logger object
 private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSink.class);
 private String prefix;
 private String suffix;
 @Override
 public Status process() throws EventDeliveryException {
 //Declare return value status information
 Status status;
 //Get the Channel bound by the current Sink
 Channel ch = getChannel();
 //Get transaction
 Transaction txn = ch.getTransaction();
 //Declaration event
 Event event;
 //Open transaction
 txn.begin();
 //Read the events in the Channel until the end of the cycle is read
 while (true) {
 event = ch.take();
 if (event != null) {
 break;
 }
 }
 try {
 //Handling events (printing)
 LOG.info(prefix + new String(event.getBody()) + suffix);
 //Transaction commit
 txn.commit();
 status = Status.READY;
 } catch (Exception e) {
 //Exception encountered, transaction rolled back
 txn.rollback();
 status = Status.BACKOFF;
 } finally {
 //Transaction shutdown
 txn.close();
 }
 return status;
 }
 @Override
 public void configure(Context context) {
  //Read the contents of the configuration file, with default values
 prefix = context.getString("prefix", "hello:");
 //Read the contents of the configuration file, no default value
 suffix = context.getString("suffix");
 } }

configuration file

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.MySink
#a1.sinks.k1.prefix = atguigu:
a1.sinks.k1.suffix = :atguigu
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Added by dabas on Tue, 08 Feb 2022 14:21:08 +0200