jstorm Advanced-ack Mechanism and KafkaSpout

Installation Deployment Use

Please refer to another article of mine jstorm deployment and use

ack mechanism

ack mechanism principle

Instead of saying what ack s are, you can refer to the documentation on the official website Ack mechanism
We just need to know how it works using xor:

A xor A = 0
A xor B xor B xor A = 0

Use ack mechanism

To use the ack mechanism, you need to do the following:

Processing of Topology

Set acker not to be zero when building a topology, as follows:

config.setNumAckers(1);

This method actually sets the value with Config.TOPOLOGY_ACKER_EXECUTORS as the key, as follows:

     /**
     * How many executors to spawn for ackers.
     * <p/>
     * <p>
     * If this is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.
     * </p>
     */
    public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";

Processing of Spout

When sending data using spout, bring msgid with you with the interface as follows:

    /**
     * Emits a new tuple to the default output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has
     * failed to be fully processed, the spout will receive an ack or fail
     * callback respectively with the messageId as long as the messageId was not
     * null. If the messageId was null, Storm will not track the tuple and no
     * callback will be received. The emitted values must be immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

Let's see what KafkaSpout does:

    @Override
    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            try {
                // in case the number of managers decreased
                _currPartitionIndex = _currPartitionIndex % managers.size();
                EmitState state = managers.get(_currPartitionIndex).next(_collector);
                if (state != EmitState.EMITTED_MORE_LEFT) {
                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
                }
                if (state != EmitState.NO_EMITTED) {
                    break;
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", e);
                _coordinator.refresh();
            }
        }

        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

Notice EmitState state = managers.get(_currPartitionIndex).next(_collector) above; let's go in and see:

     public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if ((tups != null) && tups.iterator().hasNext()) {
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
                ack(toEmit.offset);
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

See, collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));emit specifies messageId, which is a static internal class with two attributes: partition and offset

    static class KafkaMessageId {
        public Partition partition;
        public long offset;

        public KafkaMessageId(Partition partition, long offset) {
            this.partition = partition;
            this.offset = offset;
        }
    }

Processing of Bolt

There are generally two ways we write bolt s, one using the IRichBolt interface or its abstract implementation class BaseRichBolt, the other using IBasicBolt or its abstract implementation class BaseBasicBolt, which are different, mainly in affecting ack mechanisms

Use IRichBolt

Using IRichBolt means that you want to implement the following interfaces:

void execute(Tuple input);

It also means that the class you want to operate on is OutputCollector
When using OutputCollector to emit a tuple to the next bolt, you must use anchored mode with the following interfaces:

    /**
     * Emits a new tuple to the default stream anchored on a single tuple. The
     * emitted values must be immutable.
     *
     * @param anchor the tuple to anchor to
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(Tuple anchor, List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);

    /**
     * Emits a new tuple to the default stream anchored on a group of input
     * tuples. The emitted values must be immutable.
     *
     * @param anchors the tuples to anchor to
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);
    }

The so-called anchor is the tuple inside Bolt's execute method, which is the tuple sent upstream to you
Note that unanchored cannot be used as follows:

    /**
     * Emits a new unanchored tuple to the default stream. Beacuse it's
     * unanchored, if a failure happens downstream, this new tuple won't affect
     * whether any spout tuples are considered failed or not. The emitted values
     * must be immutable.
     *
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple);
    }

Also, after emit, collector.ack(tuple) is performed manually; method

Use IbasicBolt

Programming with IbasicBolt is much simpler because it helps me do a lot. All we have to do is call the emit method, first look at the interface to be implemented:

    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     * 
     * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
     */
    void execute(Tuple input, BasicOutputCollector collector);

This execute method is different from the above one. He injected the BasicOutputCollector class into it and we can manipulate it. In fact, there is one of the above OutputCollector properties in this class, and inputTuple is automatically injected into it to emit tuple. There are only two emit methods exposed:

    public List<Integer> emit(String streamId, List<Object> tuple) {
        return out.emit(streamId, inputTuple, tuple);
    }

    public List<Integer> emit(List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple);
    }

As the code above shows, it actually calls the emit method of OutputCollector and automatically calls anchor for us, using the proxy pattern in the design pattern we are familiar with
You may have noticed that there is no call to collector.ack(tuple) shown here; the method, guessed here, also knows that template mode should be used. In the caller of this method, after calling the execute method, call the ACK method and look up the code, which is true, in the BasicBoltExecutor class,The methods are as follows:

    public void execute(Tuple input) {
        _collector.setContext(input);
        try {
            _bolt.execute(input, _collector);
            _collector.getOutputter().ack(input);
        } catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);
        }
    }

Look closely and find that it also handles exceptions for us, so if we throw FailedException, it will automatically execute the fail method

Turn off ack

ACK mechanism is not required and consumes some performance. If you can tolerate partial data loss, you can turn off ack mechanism for better performance

Method

  1. spout sends data without msgid
  2. Set acker number equal to 0
  3. Using unanchored methods

performance

In terms of performance, Trident < transaction <use ack mechanism common interface <turn off ack mechanism common interface
We can also increase threads by increasing the number of concurrencies in ack

Other

ack and fail

The ack and fail methods are only available in Spout
ack, the action triggered when spout receives an ack message
Fail, action triggered when spout receives a fail message

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

The parameter is msgId. Whether you need to resend the tuple after a failure depends entirely on your implementation. For example, KafkaSpout has its own implementation and the code is not pasted here.

Multithreaded

In jstorm, nextTuple and ack/fail run in different threads in spout, which encourages users to perform blocks inside nextTuple. Native storm, nextTuple and ack/fail are on the same thread and nextTuple/ack/fail is not allowed to perform any blocks. Otherwise, data timeouts will occur, but problems will ariseThe Title is, when there is no data, the entire spout keeps running empty, which wastes a lot of cpu, so jstorm changes the spout design of storm to encourage user block operations (such as take messages from a queue) to save cpu.
Further instructions are as follows:
When topology.max.spout.pending is not set to 1 (including topology.max.spout.pending set to null), an additional thread is started inside the spout to perform ACK or fail operations independently, thus nextTuple executes in a separate thread, thus allowing the block action to be executed in the nextTuple, whereas the native storm, nextTuple/ack/fail ureFail is executed in a single thread, nextTuple returns immediately when the amount of data is not large, and ACK and fail are also prone to have no data, resulting in a large amount of CPU idling and a waste of CPU in the blank. In JStorm, nextTuple can get data in the form of a block, such as from disruptor BlockingQueue, whenDirect blocks save a lot of CPU when there is no data.
However, there is a problem with thread security when handling ack/fail and nextTuple.
When topology.max.spout.pending is 1, restore to spout a thread, that is, nextTuple/ack/fail runs in a thread.

restart

It is recommended that you force a restart of the supervisor within a month, because the supervisor is a daemon process that keeps creating subprocesses. When used for too long, the file will open many handles, which will slow down the time to start the worker. Therefore, it is recommended that you force a restart of the supervisor every other week

Keywords: supervisor Programming

Added by bruceg on Sun, 07 Jul 2019 20:39:57 +0300