31 storm Word Count

Previous A brief look at storm , mainly briefly explained the cluster architecture, core concepts, parallelism, stream grouping of storm. This article uses storm combined with code to count words, and explains how to understand introductory storm from the code level.

Open code directly

Word count simple implementation logic:

  • Construct a Spout to provide a data source for the downstream Bolt job
  • Construct a Bolt to process upstream flow data for word segmentation
  • Construct a Bolt, process the upstream Bolt, and count the words
  • Assemble Spout, Bolt, and build a Topology
  • Submit Topology to the storm cluster and wait for results

Create a normal maven project named storm-wordcount

  • Introducing related class library dependencies in pom.xml
  <dependencies>
     <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
             <version>1.1.0</version>
        </dependency>
        <dependency>
          <groupId>commons-collections</groupId>
          <artifactId>commons-collections</artifactId>
          <version>3.2.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>22.0</version>
        </dependency>
  </dependencies>
  • Add plugin, package
<build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>test/main/java</testSourceDirectory>
        
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.sf</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.dsa</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                                <exclude>META-INF/*.rsa</exclude>
                                <exclude>META-INF/*.EC</exclude>
                                <exclude>META-INF/*.ec</exclude>
                                <exclude>META-INF/MSFTSIG.SF</exclude>
                                <exclude>META-INF/MSFTSIG.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    
          <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
              <execution>
                <goals>
                  <goal>exec</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <executable>java</executable>
              <includeProjectDependencies>true</includeProjectDependencies>
              <includePluginDependencies>false</includePluginDependencies>
              <classpathScope>compile</classpathScope>
              <mainClass></mainClass>
            </configuration>
          </plugin>
        </plugins>
    </build>

Create a new WordCountTopology class

  • Write a RandomSentenceSpout static internal class in WordCountTopology, inherit and implement BaseRichSpout abstract class
/**
     * 
     * Write spout, inherit a base class, and take care of getting data from the data source
     * @author bill
     * @date 2017 September 16 8:21:46 p.m.
     */
    public static class RandomSentenceSpout extends BaseRichSpout{
        
        private static final long serialVersionUID = 6102239192526611945L;

        private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
        
        private SpoutOutputCollector collector;
        private Random random;

        /**
         * This open method is called when a Task is initialized.
         * The SpoutOutputCollector and TopologyContext objects that send Tuple s are typically initialized in this method
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.random = new Random();
        }

        /**
         * This spout class, as previously mentioned, will eventually run in a task, within an executor thread of a worker process
         * That task is responsible for calling the nextTuple() method in an endless loop
         * As long as it's called in an infinite loop, the latest data can be continuously sent out to form a data stream
         */
        public void nextTuple() {
            String[] sentences = new String[]{
                     "I used to watch her from my kitchen widow"
                    , "she seemed so small as she muscled her way through the crowd of boys on the playground"
                    , "The school was across the street from our home and I would often watch the kids as they played during recess"
                    , "A sea of children, and yet tome"
                    , "she stood out from them all"};
            String sentence = sentences[random.nextInt(sentences.length)];
            LOGGER.info(" ★★★  launch sentence data > {}", sentence);  
            // This value, you can think of as building a tuple, which is the smallest unit of data, and a stream of unlimited tuples that send data downstream through emit to the bolt tuple
            this.collector.emit(new Values(sentence));
        }

        /**
         * Domain name used to declare the Tuple send stream for the current Spout.Stream streams are defined using the OutputFieldsDeclare.declareStream method
         * The common saying is this is to define what the name of each field in each tuple you send out is, as downstream
 bolt execute receives data key in 
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
    }
  • Write a SplitSentenceBolt static internal class in WordCountTopology, inherit the BaseRichBolt Abstract class, which handles the data sent from upstream Spout, and do sentence word segmentation here
/**
     * 
     * Write a bolt to slice each word and send it out at the same time
     * @author bill
     * @date 2017 September 16, 8:27:45 p.m.
     */
    public static class SplitSentenceBolt extends BaseRichBolt{
        
        private static final long serialVersionUID = -4758047349803579486L;
        
        private OutputCollector collector;

        /**
         * This preparemethod is called when a Task is initialized. For bolt, the first method is the prepare method
         * OutputCollector,This is also Bolt's transmitter for this tuple, in which the object OutputCollector that sent the Tuple is usually initialized
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        /**
         * This is the most critical method in Bolt where Tuple processing can be put.The specific sending is also done by the emit method
         * That is, each time a piece of data is received, it is given to the executor method to execute
         * Divide Words
         */
        public void execute(Tuple input) {
            // Receive upstream data
            String sentence = input.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for(String word : words){
                //Launch data
                this.collector.emit(new Values(word));
            }
        }

        /**
         * Domain name used to declare the current bolt's Tuple send stream.Stream streams are defined using the OutputFieldsDeclare.declareStream method
         * The common saying is this is to define what the name of each field in each tuple you send out is and to receive the data key as execute in the downstream bolt 
         * Define the tuple to be emitted, the name of each field
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }   
    }
  • Write a WordCountBolt static internal class in WordCountTopology, inherit the BaseRichBolt Abstract class, which handles the data sent by the upstream Bolt, counting the words here
/**
     * 
     * Word count bolt
     * @author bill
     * @date 2017 September 16 at 8:35:00 p.m.
     */
    public static class WordCountBolt extends BaseRichBolt{
        
        private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);

        private static final long serialVersionUID = -7114915627898482737L;
        
        private OutputCollector collector;
        
        Map<String,Long> countMap = Maps.newConcurrentMap();
        
        /**
         * This preparemethod is called when a Task is initialized. For bolt, the first method is the prepare method
         * OutputCollector,This is also Bolt's transmitter for this tuple, in which the object OutputCollector that sent the Tuple is usually initialized
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        /**
         * This is the most critical method in Bolt where Tuple processing can be put.The specific sending is also done by the emit method
         * That is, each time a piece of data is received, it is given to the executor method to execute
         * Statistical Words
         */
        public void execute(Tuple input) {
            // Receive upstream data
            String word = input.getStringByField("word");
            Long count = countMap.get(word);
            if(null == count){
                count = 0L;
            }
            count ++;
            countMap.put(word, count);
            LOGGER.info(" ★★★  Word Count[{}] Number of occurrences:{}", word, count); 
            //Launch data
            this.collector.emit(new Values(word,count));
        }
        
        /**
         * Domain name used to declare the current bolt's Tuple send stream.Stream streams are defined using the OutputFieldsDeclare.declareStream method
         * The common saying is this is to define what the name of each field in each tuple you send out is and to receive the data key as execute in the downstream bolt 
         * Define the tuple to be emitted, the name of each field
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }
    }
  • Construct Topology and submit storm Cluster Execution (**Note: storm provides a local simulation cluster that can be compiled and executed directly in the code editor)

Write the main execution method in WordCountTopology with the following code:

public static void main(String[] args) {
        //To combine spout and bolts to build a topology
        TopologyBuilder builder = new TopologyBuilder();
        
        // The first parameter means to give this spout a name
        // The second parameter means to create a spout object
        // The third parameter means how many executor s to set spout have
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        builder.setBolt("SplitSentence", new SplitSentenceBolt(), 5)
        //Set up several task s for bolt
        .setNumTasks(10)
        //Set Stream Grouping Policy
        .shuffleGrouping("RandomSentence");
        
        // The important thing about fieldsGrouping is that when the same word is emitted from SplitSentenceSpout, it must enter the same task specified downstream
        // Only in this way can we accurately count the number of each word
        // For example, if you have a word, hello, downstream task1 receives three hello, task2 receives two hello
        // fieldsGrouping allows five hello, all into one task
        builder.setBolt("wordCount", new WordCountBolt(), 10)
        //Set up several task s for bolt
        .setNumTasks(20)
        //Set Stream Grouping Policy
        .fieldsGrouping("SplitSentence", new Fields("word"));
        
        // Run Configuration Item
        Config config = new Config();
        
        //The instructions are executed from the command line and are intended to be submitted to the storm cluster
        if(args != null && args.length > 0){
            /** 
             *  There are three ways to improve the parallelism of storm
             *  worker(Process) >executor (thread) >task (instance)
             *  Increase the work process, the executor thread, and the task instance
             *  Number of configurations in corresponding supervisor.slots.port
             *  Use number can be set dynamically here
             *  It is best to use only one worker for a topology on one machine, mainly because data transfer between workers is reduced
             *  
             *  Note: Committing a topology will not execute if the worker is used up, because there are no workers available, it will only be in a waiting state and it will continue to execute once the previously running topology has been stopped
             */
            config.setNumWorkers(3);
            try {
                // Submit Topolog to Cluster
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            // Description is run locally in eclipse
            
            // Used to limit the number of threads generated when running a topology in local mode
            config.setMaxTaskParallelism(20);
            
            // Submit Topolog to Local Cluster
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCountTopology", config, builder.createTopology());
            
            // To test simulated wait
            Utils.sleep(60000);
            // Close cluster when execution is complete
            cluster.shutdown();
        }
    }
  • Run the main method to see the effect

Running effect

Summary: This demo shows you how to understand storm's cluster architecture, core concepts, parallelism, stream grouping from a code level. In conjunction with the previous article, it also shows Spout to Bolt, Bolt to Bolt communication

That's what this chapter is about. If something is wrong, please give us some advice. Thank you.

For the convenience of those in need, all the software in this series is available https://pan.baidu.com/s/1qYsJZfY

Next Chapter Preview: Main Explanations storm Cluster Setup

Author: Secret Descender (reprint please indicate source)

Keywords: Maven Apache Java xml

Added by saeed42 on Fri, 24 May 2019 19:38:14 +0300