Research on storm source code analysis

2021SC@SDUSC

Introduction to stream groups and WordCountTopology

2021SC@SDUSC

Stream groupings stream groupings

There are 8 flow grouping methods in Storm. One style flow grouping method can be realized by implementing the CustomStreamGroupingj interface:

Storm defines eight built-in data flow grouping methods:

1. Shuffle grouping: in this way, tuples will be randomly distributed to each task of bolt, and each bolt instance will receive the same number of tuples.

2. Fields grouping: grouping according to the value of the specified field. For example, if a data flow is grouped according to the "word" field, all tuple s with the same "word" field value will be routed to the task of the same bolt.

3. All grouping: copy all tuples and distribute them to all bolt task s. Each task subscribing to the data stream will receive a copy of tuples.

4, Global grouping: this grouping method routes all tuples to a single task. Storm selects the task that receives data according to the smallest task ID. note that when global grouping is used, it is meaningless to set the task concurrency of bolt (spout concurrency is meaningful), because all tuples are forwarded to the same task. When using global grouping, you should pay attention to that, because all tuples are forwarded to a JVM instance, which may cause a performance bottleneck or crash of a JVM or server in the storm cluster.

5. None grouping: it has the same function as random grouping and is reserved for the future.

6. Direct grouping: the data source will call the emitDirect() method to determine which Storm component should receive a tuple. It can only be used on data streams that are declared to be directional.

7, Local or shuffle grouping: similar to random grouping, but tuple s are distributed to bolt tasks in the same worker (if there is a bolt task to receive data in the worker). In other cases, random grouping is used. Depending on the concurrency of topology, local or random grouping can reduce network transmission and improve topology performance.

8. Partial Key grouping: the flow is partitioned according to the fields specified in the grouping (such as field grouping), but the load is balanced between the two downstream bolt s, so as to provide better resource utilization when the incoming data is tilted.

WordCountTopology

WordCountTopology is a basic Storm Topology, which is composed of three components: RandomSentenceSpout
SplitSentence
WordCount

RandomSentenceSpout
This class defines a Spout, which inherits from BaseRichSpout. BaseRichSpout is a virtual class that implements the IRichBolt interface, which is a main interface in Storm. Its nextTuple method randomly selects a sentence from a sentence array and sends it. The declareOutputFields method declares the message mode of the spuut output. Here, the output is only
There is a column whose field name is word:

public class RandomSentenceSpout extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);

    SpoutOutputCollector collector;
    Random rand;

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        rand = new Random();
    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[]{
            sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
            sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
        };
        final String sentence = sentences[rand.nextInt(sentences.length)];

        LOG.debug("Emitting tuple: {}", sentence);

        collector.emit(new Values(sentence));
    }

    protected String sentence(String input) {
        return input;
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public static class TimeStamped extends RandomSentenceSpout {
        private final String prefix;

        public TimeStamped() {
            this("");
        }
        public TimeStamped(String prefix) {
            this.prefix = prefix;
        }
        @Override
        protected String sentence(String input) {
            return prefix + currentDate() + " " + input;
        }
    }
}

SplitSentence
This class defines a Bolt, which inherits from BaseBasicBolt. BaseBasicBolt is a virtual class that implements the IBasicBolt interface. The execute method is where Bolt really handles business logic. It divides the sentences received from sprout into spaces, and then sends each word as a message

WordCount
Similar to SplitSentence, the WordCount class also defines a Bolt. This class counts and counts all received words. The execute method updates the cache number of received words and sends the current word and its corresponding number. The declareOutputFields method declares the output message format of the Bolt, The cleanup method is called when the Topology is stopped (it is not guaranteed to be called), and it prints all the currently cached word and number information to the log

public class WordCountBolt extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

WordCountTopology
The class WordCountTopology is where the Topology is really defined

public class WordCountTopology extends ConfigurableTopology {
    public static void main(String[] args) throws Exception {
        ConfigurableTopology.start(new WordCountTopology(), args);
    }

    @Override
    protected int run(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));

        conf.setDebug(true);

        String topologyName = "word-count";

        conf.setNumWorkers(3);

        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

    public static class SplitSentence extends ShellBolt implements IRichBolt {

        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
}

First, create a TopologyBuilder object, which is used to build the basic Topology

Then set the Topology's spuut, whose id is sput, create a randomsentencesput object as the sput object, and set the parallelism to 5.

Then set the Bolt of Topology, whose id is split, and create a SplitSentence object as the Bolt
Object with parallelism set to 8. It receives the messages sent by the split, and its grouping strategy is ShuffleGrouping, that is, multiple instances of the split will randomly distribute messages to each instance of the split.

Then set another Bolt of Topology, whose id is count. Here, create a WordCount object as the Bolt object, and set the parallelism to 12. It receives messages sent by split. Its grouping strategy is Fields Grouping, that is, each instance of split will determine which instance of count to send messages to according to the value corresponding to the word column of the message, and all messages with the same word column value will be sent to the same count node for processing.

Reference link: https://www.jianshu.com/p/2c79be8b0403

Keywords: storm

Added by andychurchill on Sat, 25 Dec 2021 20:51:39 +0200