storm message format analysis
@(STORM)[storm]
Tuple is a message in storm. This paper introduces a process of acquiring and creating tuple from user call to final source code.
I. ITuple interface
storm's message is called a Tuple, and all message formats must implement the Ituple interface. It mainly defines various methods of how to get the content of the message. Its complete definition is as follows:
public interface ITuple { public int size(); public boolean contains(String field); public Fields getFields(); public int fieldIndex(String field); public List<Object> select(Fields selector); public Object getValue(int i); public String getString(int i); public Integer getInteger(int i); public Long getLong(int i); public Boolean getBoolean(int i); public Short getShort(int i); public Byte getByte(int i); public Double getDouble(int i); public Float getFloat(int i); public byte[] getBinary(int i); public Object getValueByField(String field); public String getStringByField(String field); public Integer getIntegerByField(String field); public Long getLongByField(String field); public Boolean getBooleanByField(String field); public Short getShortByField(String field); public Byte getByteByField(String field); public Double getDoubleByField(String field); public Float getFloatByField(String field); public byte[] getBinaryByField(String field); public List<Object> getValues(); }
There are two main implementation classes of ITuple interface:
* Tuple: For core-storm
* TridentTuple: For trident
These two implementations are described in the following two parts.
2. Message format of core-storm
3. trident message format
In Trident, a Bolt node may contain multiple operations that require message passing between them. Usually, operations either generate new domains or filter the original domains are inefficient if the input messages are copied each time.
Trident uses the TridentTupleView object to encapsulate messages. For example, the newly generated message consists of two parts, one from input and the other from computation. Instead of creating a new message, Trident TupleView merges the two parts to make it look like a message from the outside by updating the internal index. This saves the burden of copying messages and creating new objects, thus improving efficiency.
(1) the basic use of tuple in trident
1. User Code
String sentence = tuple.getString(0); String sentence = tuple.getStringByField("sentence");
Users can obtain the value of a field in TridentTuple by using one of the above two methods.
Let's continue with the first method as an example.
2. Source code of TridentTupleView
(1)Actually, it's called. getValue(i)The method is just a type conversion. @Override public String getString(int i) { return (String) getValue(i); } //Get the ValuePointer object, and then call the getValueByPointer() method @Override public Object getValue(int i) { return getValueByPointer(_index[i]); } //The way to actually find data by indexing. private Object getValueByPointer(ValuePointer ptr) { return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index); }
(2) the basic structure of the tuple of trident
1. The main categories involved
- TridentTuple interface: inherits from ITuple interface and List interface.
- TridentTupleView class: Implements the TridentTuple class and inherits from AbstractList.
- ValuePointer: An index data structure for TridentTupleView that specifies which location or field corresponds to which actual data (IPersistent Vector data).
- IPersistent Vector: The actual data object. Note that this is a collection in the clojure.lang.PersistentVector package that comes with clojure.
2. Basic Flow of Data Location
(1) User code obtains data in Trident TupleView by getting (i) or getValueByField ("field Name"). If you determine the type, you can also use getString(i), getStringByField("field Name") and other methods.
(2) For the former, the ValuePointer object is obtained from ValuePointer []_index. For the latter, the ValuePointer object is obtained by Map <String, ValuePointer>_field Index.
(3) ValuePointer is the index of the actual data. It first obtains which IPersistentVector object this is in according to the public int delegateIndex, and then locates an element in the IPersistentVector through protected int index or protected String field, which is the actual data.
3. Other Descriptions
(1) Simply speaking, the above process is to locate a ValuePointer object through delegated index and index/field, which is the index of the actual data, so the actual data can be found through this index.
(2) A Trident TupleView may have multiple IPersistentVector objects, and each IPersistentVector object has multiple elements, each of which corresponds to the actual data of a field.
A Trident TupleView has only one _delegates, but it includes multiple delegates, which can be located by _delegate.nth(i). The _delegateIndex in ValuePointer is used for this purpose. Its specific data structure is implemented by clojure, not to mention in detail.
(3) TridentTuple interface
public interface TridentTuple extends ITuple, List<Object> { public static interface Factory extends Serializable { Map<String, ValuePointer> getFieldIndex(); List<String> getOutputFields(); int numDelegates(); } }
It has an internal interface, which is represented by three methods:
* Mapping Relations between field and ValuePointer
* List of all output files
* How many IPersistentVector objects are there? Note that a TridentTupleView may have multiple IPersistentVector objects.
(4) Trident TupleView
Trident TupleView mainly defines (1) how to construct a message (2) how to read the specific content of a message.
1. Membership variables
ValuePointer[] _index; Map<String, ValuePointer> _fieldIndex; IPersistentVector _delegates;
Each Trident TupleView holds the index information ValuePointer and the implemented data IPersistent Vector. The other method is how to find the actual data in IPersistent Vector through the index information of ValuePointer.
Among them _delegates can be understood as multiple delegates, and then through nth(i) method to locate a specific one, which is a specific delegate itself is a collection. This is the class that comes with the clojure.lang package, without analyzing its implementation.
2. Main methods
As mentioned above, the method in Trident TupleView is mainly used to obtain data in Trident TupleView. For example:
@Override public Integer getInteger(int i) { return (Integer) getValue(i); }
Whatever method you use to get the message content, you end up calling this method:
private Object getValueByPointer(ValuePointer ptr) { return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index); }
Even using a VauluePorint object as an index to find the actual data in Persistence Vector. First locate a specific delegate within _delegates, and then locate the specific element. The nth() method used here roughly means locating a delegate in _delegates without looking at the clojure code.
3. Internal Classes
TridentTupleView has four internal classes that implement the Factory interface for creating Trident messages. The create() methods of these Factory subclasses are invoked by various operations of spout/bolt to create a TridentTuple, and we'll talk about who is calling these methods later. At present, we only need to know:
* ProjectionFactory: It does not create a new message, but retains only part of the parent field (defined by project fields)
* FreshOutputFactory: Generates a new message based on the input field name and value
* OperationOutputFactory: Create a new _delegate by creating selfFields, and then form a new Trident TupleView with parent. So its _delegates number will be + 1.
* RootFactory: An entry factory for operations that adapts incoming messages
4,ProjectionFactory
The Project Factory rebuilds the Trident TupleView based on the input parent and the project fields. It does not create a new message, but retains only some fields of the parent (defined by the project fields).
public ProjectionFactory(Factory parent, Fields projectFields) { _parent = parent; if(projectFields==null) projectFields = new Fields(); Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex(); _fieldIndex = new HashMap<>(); for(String f: projectFields) { _fieldIndex.put(f, parentFieldIndex.get(f)); } _index = ValuePointer.buildIndex(projectFields, _fieldIndex); } public TridentTuple create(TridentTuple parent) { if(_index.length==0) return EMPTY_TUPLE; else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex); }
It returns the same number of delegates as parent, thus proving again that it will not generate new delegates:
@Override public int numDelegates() { return _parent.numDelegates(); }
5,FreshOutputFactory
FreshOutputFactory generates a new message based on the field name and value entered.
public FreshOutputFactory(Fields selfFields) { _fieldIndex = new HashMap<>(); for(int i=0; i<selfFields.size(); i++) { String field = selfFields.get(i); _fieldIndex.put(field, new ValuePointer(0, i, field)); } _index = ValuePointer.buildIndex(selfFields, _fieldIndex); } public TridentTuple create(List<Object> selfVals) { return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex); }
(1) Firstly, a ValuePointer object ValuePointer(0, i, field) is constructed from the value of field in the constructor function, that is to say, this is the first field of the 0_delegate and the name of field is field.
(2) Then create a PersistentVector object by calling the create() method and create a TridentTupleView with ValuePointer.
The _delegate object returned by FreshOutputFactory is always 1:
@Override public int numDelegates() { return 1; }
6,OperationOutputFactory
OperationOutputFactory creates a new _delegate by creating selfFields, and then forms a new TridentTupleView with parent. So its _delegates number will be + 1.
This also proves that a Trident TupleView has multiple _delegate s. This refers to the number returned by numDelegates(), not to the number of IPersistentVector objects. In fact, each TridentTupleView has only one IPersistentVector object.
public OperationOutputFactory(Factory parent, Fields selfFields) { _parent = parent; _fieldIndex = new HashMap<>(parent.getFieldIndex()); int myIndex = parent.numDelegates(); for(int i=0; i<selfFields.size(); i++) { String field = selfFields.get(i); _fieldIndex.put(field, new ValuePointer(myIndex, i, field)); } List<String> myOrder = new ArrayList<>(parent.getOutputFields()); Set<String> parentFieldsSet = new HashSet<>(myOrder); for(String f: selfFields) { if(parentFieldsSet.contains(f)) { throw new IllegalArgumentException( "Additive operations cannot add fields with same name as already exists. " + "Tried adding " + selfFields + " to " + parent.getOutputFields()); } myOrder.add(f); } _index = ValuePointer.buildIndex(new Fields(myOrder), _fieldIndex); } public TridentTuple create(TridentTupleView parent, List<Object> selfVals) { IPersistentVector curr = parent._delegates; curr = (IPersistentVector) RT.conj(curr, selfVals); return new TridentTupleView(curr, _index, _fieldIndex); }
Confirm again the number of _delegate s:
@Override public int numDelegates() { return _parent.numDelegates() + 1; }
Yes, that's the number of parent s plus one.
7,RootFactory
RootFactory is the entry factory for operations and adapts input messages. It generates a Trident TupleView type message based on the input message, which can be used by other working methods.
(5) Value Pointer
1. Member Variables-Constructors
public int delegateIndex; protected int index; protected String field; public ValuePointer(int delegateIndex, int index, String field) { this.delegateIndex = delegateIndex; this.index = index; this.field = field; }
ValuePointer has three member variables that represent:
(1) delegateIndex represents which IPersistentVector object in TridentTupleView. As previously said, TridentTupleView may have multiple IPersistentVector objects.
(2) index represents which element in the IPersistent Vector collection.
(3) Field represents the name of the field.
Therefore, through ValuePointer, you can locate which IPersistentVector object, which element of the IPersistentVector object is next, and what is the name of the field corresponding to this element?
2, 2 methods
These two methods are mainly used for ValuePointer [] and Map