Research on storm source code analysis

2021SC@SDUSC

bolt source code analysis (III)

2021SC@SDUSC

This article mainly introduces the bolt interface

Bolt interfaces defined in Storm mainly include IBolt, IRichBolt, IBasicBolt and IBatchBolt
The relationships are as follows:

IBolt.java

IBolt defines the function set of Bolt, and its code is as follows:

public interface IBolt extends Serializable {
  
    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);

    void execute(Tuple input);
    
    void cleanup();
}

Bolt is the basic running unit in Storm. When it starts and has a message input, it will call the execute method for processing. Similar to ISpout, the IBolt object will also be serialized into a byte array when submitted. The specific execution node obtains the object through the deserialization method and calls the prepare callback method. c users should implement the initialization of complex objects in the prepare callback method to ensure that each specific object can be initialized correctly.
When the object is destroyed, the cleanup callback method will be called, but Storm does not guarantee that the method will be executed.
Usually, the input message will be processed in the implementation of the execute method, which may generate a new message that needs to be sent to the downstream node. Finally, Ack the input message. If the message processing fails, you need to Fail the input message, which is the basis to ensure the normal operation of the Ack message system.

IRichBolt.java

package org.apache.storm.topology;

import org.apache.storm.task.IBolt;
public interface IRichBolt extends IBolt, IComponent {

}

IRichBolt needs to implement IComponent and IBolt interfaces at the same time, which means a component with Bolt function. In practical use, IRichBolt is the main interface to implement Topology components.

IBasicBolt.java

public interface IBasicBolt extends IComponent {
    void prepare(Map<String, Object> topoConf, TopologyContext context);

    void execute(Tuple input, BasicOutputCollector collector);

    void cleanup();
}

The definition of IBasicBolt interface is basically the same as that of IBolt, and the specific implementation requirements are the same as that of IBolt. The difference between IBasicBolt interface and IBolt lies in the following two points:
1. Its output collector uses basicooutputcollector, and the parameter is placed in the execute method instead of prepare.
2. It implements the IComponent interface, which indicates that it can be used to define Topology components.

Reasons for the existence of this interface:
The main function of ibasic Bolt is to provide users with a simpler Bolt
Writing method. The advantage of writing based on ibasic bolt is that the Storm framework itself helps you handle the Ack, Fail and Anchor operations of the sent message, which is implemented by the actuator BasicBoltExecutor.

The BasicBoltExecutor implements the IRichBolt interface and also contains an IBasciBolt member variable for forwarding calls. It is implemented based on decoration mode, which is defined as follows:

public class BasicBoltExecutor implements IRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);

    private IBasicBolt bolt;
    private transient BasicOutputCollector collector;

    public BasicBoltExecutor(IBasicBolt bolt) {
        this.bolt = bolt;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        bolt.declareOutputFields(declarer);
    }


    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        bolt.prepare(topoConf, context);
        this.collector = new BasicOutputCollector(collector);
    }

    public void execute(Tuple input) {
        collector.setContext(input);
        try {
            bolt.execute(input, collector);
            collector.getOutputter().ack(input);
        } catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                collector.reportError(e);
            }
            collector.getOutputter().fail(input);
        }
    }

    public void cleanup() {
        bolt.cleanup();
    }

    public Map<String, Object> getComponentConfiguration() {
        return bolt.getComponentConfiguration();
    }
}

Reasons why BasicBoltExecutor needs to implement IRichBolt interface:

After the user implements the Bolt object of IBasicBolt interface, Storm will call the setbolt method of TopologyBuilder to set the Bolt object when building Topology. The setbolt method encapsulates the user's implementation class with the BasicBoltExecutor, which Storm automatically implements for the user, and it also calls the overloaded method that can receive the IRichBolt parameter to complete the Bolt setting.

IBatchBolt.java

public interface IBatchBolt<T> extends Serializable, IComponent {
    void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, T id);

    void execute(Tuple tuple);

    void finishBatch();
}

Different from the IBasicBolt interface, IBatchBolt is mainly used for batch processing in storm. At present, storm mainly uses this interface to realize reliable message transmission. In this case, batch processing will be more efficient than single message processing. Storm's transaction Topology and Trident are mainly based on IBatchBolt.

Compared with the previous IBolt, IBasicBolt and IRichBolt, IBatchBolt has an additional finishBatch method, which is called at the end of a batch.

prepare method:
Used to initialize a Batch. In the prepare method, the last parameter is the generic type T, which can be used as the unique identification of the Batch. In the BaseTransactionalBolt derived from IBatchBolt, t will be instantiated as transactionalatempt.
In the current Storm implementation, each transaction will correspond to a Batch, and the data of each Batch will be processed by a new IBatchBolt object. Therefore, in the prepare method, you need to pass in a variable T used to identify Batch. In transaction Topology, Storm uses TransactionAttempt as the identifier. After a Batch is successfully processed, the IBatchBolt object corresponding to the Batch will be destroyed. Therefore, users cannot save the data to be shared among multiple batches through the IBatchBolt object itself.
execute method:
Used to process messages belonging to this Batch.
finishBatch method:
This method is called only when this batch of messages is processed. If BatchBolt is implemented at the same time
With the iconitter interface, the finishBatch method is called only after all batches before the Batch have been successfully processed. This not only ensures the strong order relationship, but also the implementation basis of transaction Topology in Storm.

Keywords: Big Data Zookeeper storm

Added by ducey on Thu, 18 Nov 2021 16:21:01 +0200