Research on storm source code analysis

2021SC@SDUSC

Spuut source code analysis (I)

2021SC@SDUSC


2021SC@SDUSC

Introduction to core concepts

1. Structure:
Sput is one of the core components of storm, and the source interface is IComponent.

2. Send:
After sput obtains data from the outside, the Tuple sent to Topology can be reliable or unreliable. Sput can emit multiple streams, define multiple streams (that is, define multiple streams), or use methods to emit specified streams.

3. Important structure:
The important method of sprout is nexttuple. The nexttuple method sends a new tuple to Topology. If no new tuple is sent, it will be returned directly. Note that the nextTuple methods of task Spout do not become blocked because storm is the way to invoke spout in the same thread.
The other two important methods of sput are ack and fail. When the tuples emitted by sput are successfully processed by the topology, the ack method is called. When the processing fails, the fail method is called. In addition, the ack and fail methods are only called by reliable sput.

ISpout.java

ISpout interface:
storm implementation mainly depends on the following functions, and the global code is as follows:

package org.apache.storm.spout;

import java.io.Serializable;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
public interface ISpout extends Serializable {
 
    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);

    void close();

    void activate();

    void deactivate();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);
}

Strom supports all basic types. When it uses tuples as its data model, each field in tuples can be an object of any type. If you want to use a self-defined type, you need to implement and register a serializer for the self-defined type. Each node must also define a field name for the output tuple.
Partial function interpretation:
open():
Called when the component's task is initialized on the cluster. It provides an environment for spout to execute.
close():
Called when ISpout is about to close. There is no guarantee that close will be called because the supervisor will kill the worker process on the cluster.
activate():
Called when spuut is activated from inactive mode.
deactivate():
Called when spuut fails.

ShellSpout.java

Overloaded functions are as follows:

    public void open(Map<String, Object> topoConf, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
        this.context = context;

        if (topoConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
        } else {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
        }

        process = new ShellProcess(command);
        if (!env.isEmpty()) {
            process.setEnv(env);
        }

        Number subpid = process.launch(topoConf, context, changeDirectory);
        LOG.info("Launched subprocess with pid " + subpid);

        logHandler = ShellUtils.getLogHandler(topoConf);
        logHandler.setUpContext(ShellSpout.class, process, this.context);

        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    @Override
    public void close() {
        heartBeatExecutorService.shutdownNow();
        process.destroy();
        running = false;
    }

    @Override
    public void nextTuple() {
        this.sendSyncCommand("next", "");
    }

    @Override
    public void ack(Object msgId) {
        this.sendSyncCommand("ack", msgId);
    }

    @Override
    public void fail(Object msgId) {
        this.sendSyncCommand("fail", msgId);
    }

 @Override
    public void activate() {
        LOG.info("Start checking heartbeat...");
        // prevent timer to check heartbeat based on last thing before activate
        setHeartbeat();
        if (heartBeatExecutorService.isShutdown()) {
            //In case deactivate was called before
            heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        }
        heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
        this.sendSyncCommand("activate", "");
    }

    @Override
    public void deactivate() {
        this.sendSyncCommand("deactivate", "");
        heartBeatExecutorService.shutdownNow();
    }



void open(Map<String, Object> topoConf, TopologyContext context,SpoutOutputCollector collector)
Parameters:
topoconf :
Storm's configuration of this spuut
context :
It is used to obtain the information of the spuut task, including task id, component id, input and output information, etc
collector :
It is used to send tuples from this spuut. Tuples can be sent at any time, including in the open and close functions. The collector is thread safe and should be saved as an instance object in the spool object.

void ack(Object msgId):
Use the msgId message to tell Storm that the sput has successfully output the tuple
void activate():
Activate sput. Sput changes from deactivate mode to activate mode. Sput starts calling nextTuple to output data.
void close():
Turn off spuut
void deactivate():
Deactivate sput, convert sput from activate mode to deactivate mode, and sput stops calling nextTuple to output data
void fail(Object msgId):
Tell Storm with msgId message that the spuut failed to output the tuple. It is mainly used to put the tuple back into the message queue to resend the tuple after a period of time
void nextTuple():
Call this function to request Storm to send tuples to the Output Collector. This function should not be blocked. When no tuples are sent, sleep is generally called to make full use of the CPU.

Reference link:
https://blog.csdn.net/wdasdaw/article/details/48896321
https://xlucas.blog.csdn.net/article/details/55301577

Keywords: Java storm

Added by hobeau on Thu, 07 Oct 2021 17:57:24 +0300