1. Customize the Interceptor
1.1 case requirements
When Flume is used to collect the local logs of the server, different types of logs need to be sent to different locations according to different log types
The same analysis system.
1.2 demand analysis
In actual development, there may be many types of logs generated by a server, and different types of logs may need to be sent to different analysis systems. In this case, the Multiplexing structure in Flume topology will be used. The principle of Multiplexing is to send different events to different channels according to the value of a key in the Header of the event. Therefore, we need to customize an Interceptor to assign different values to the keys in the headers of different types of events.
In this case, we simulate logs with port data and different types of logs with numbers (single) and letters (single). We need to customize the interceptor to distinguish numbers and letters and send them to different analysis systems (channels).
1.3 implementation steps
Create a maven project and introduce the following dependencies.
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
Define the CustomInterceptor class and implement the Interceptor interface.
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; public class CustomInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); if (body[0] < 'z' && body[0] > 'a') { event.getHeaders().put("type", "letter"); } else if (body[0] > '0' && body[0] < '9') { event.getHeaders().put("type", "number"); } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } }
Edit flume profile
hadoop102:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.flume.interceptor.CustomInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.letter = c1 a1.sources.r1.selector.mapping.number = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop103 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = hadoop104 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
hadoop103:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
hadoop104:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
2 custom Source
2.1 introduction
Source is the component responsible for receiving data to Flume Agent. The source component can handle various types and formats of log data, including avro, thrift, exec, jms, spooling directory, netcat, sequence generator, syslog, http and legacy. There are many official source types, but sometimes they can't meet the needs of actual development. At this time, we need to customize some sources according to the actual needs.
MySource needs to inherit AbstractSource class and implement Configurable and PollableSource interfaces.
Implement corresponding methods:
getBackOffSleepIncrement() / / not used yet
getMaxBackOffSleepInterval() / / not used yet
Configure (context) / / initialize the context (read the content of the configuration file)
process() / / encapsulate the obtained data into an event and write it to the channel. This method will be called circularly.
Usage scenario: read MySQL data or other file systems.
2.2 requirements
Flume is used to receive data, prefix each data and output it to the console. The prefix can be configured from the flume configuration file.
2.3 coding
Import pom dependencies
import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { //Define the fields that the configuration file will read in the future private Long delay; private String field; //Initialize configuration information @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field", "Hello!"); } @Override public Status process() throws EventDeliveryException { try { //Create event header information HashMap<String, String> hearderMap = new HashMap<>(); //Create event SimpleEvent event = new SimpleEvent(); //Circular encapsulation event for (int i = 0; i < 5; i++) { //Set header information for events event.setHeaders(hearderMap); //Set content for event event.setBody((field + i).getBytes()); //Write events to channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
configuration file
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.MySource a1.sources.r1.delay = 1000 #a1.sources.r1.field = atguigu # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Open task
bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
3. Customize Sink
3.1 introduction
Sink constantly polls events in the Channel and removes them in batches, and writes these events in batches to the storage or indexing system, or is sent to another Flume Agent.
Sink is completely transactional. Before bulk deleting data from the Channel, each sink starts a transaction with the Channel. Once batch events are successfully written out to the storage system or the next Flume Agent, sink will commit transactions using the Channel. Once the transaction is committed, the Channel deletes the event from its own internal buffer.
Sink component destinations include hdfs, logger, avro, thrift, ipc, file, null, HBase, solr, and custom. There are many types of sink officially provided, but sometimes it can not meet the needs of actual development. At this time, we need to customize some sink according to the actual needs.
MySink needs to inherit the AbstractSink class and implement the Configurable interface.
Implement corresponding methods:
Configure (context) / / initialize the context (read the content of the configuration file)
process() / / read and obtain data (event) from the Channel. This method will be called circularly.
Usage scenario: read Channel data and write it to MySQL or other file systems.
3.2 requirements
Use flume to receive data, add prefix and suffix to each data at the Sink end, and output it to the console. The prefix and suffix can be configured in the flume task configuration file.
Write code
import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { //Create Logger object private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { //Declare return value status information Status status; //Get the Channel bound by the current Sink Channel ch = getChannel(); //Get transaction Transaction txn = ch.getTransaction(); //Declaration event Event event; //Open transaction txn.begin(); //Read the events in the Channel until the end of the cycle is read while (true) { event = ch.take(); if (event != null) { break; } } try { //Handling events (printing) LOG.info(prefix + new String(event.getBody()) + suffix); //Transaction commit txn.commit(); status = Status.READY; } catch (Exception e) { //Exception encountered, transaction rolled back txn.rollback(); status = Status.BACKOFF; } finally { //Transaction shutdown txn.close(); } return status; } @Override public void configure(Context context) { //Read the contents of the configuration file, with default values prefix = context.getString("prefix", "hello:"); //Read the contents of the configuration file, no default value suffix = context.getString("suffix"); } }
configuration file
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.MySink #a1.sinks.k1.prefix = atguigu: a1.sinks.k1.suffix = :atguigu # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1