Use of Flink operator state

1. Operator state classification

The scope of action of operator state is limited to operator parallel subtasks. This means that all data processed by the same parallel subtask can access the same state, which is shared for the same subtask. Operator state cannot be accessed by another parallel subtask of the same or different operators.

Flink provides three basic data structures for operator states. It mainly introduces how to allocate operator states when the parallelism changes (capacity expansion and contraction) and restarts from the savepoint:

  1. List state: represents the state as a list of a set of data.

Operators with operator list status will reallocate the entries in the list when expanding or shrinking the capacity. Theoretically, the list entries of all parallel operator tasks will be collected uniformly, and then evenly distributed to fewer or more tasks. If the number of list entries is less than the newly set parallelism of the operator, the state of some tasks at startup may be empty.

  1. Union list state also represents the state as a list of data. The difference between it and the conventional list state is that it recovers when the application is started from the savepoint in case of failure. If the parallelism changes, the operator with the operator joint list state will broadcast all the entries of the state list to all tasks when expanding and shrinking, and then the task will decide which entries should be retained and which should be discarded.

For the same operator, if the previous parallelism is 2, there will be two subtasks, that is, two states. If the parallelism is changed to 3, then send a copy of the previous two states to each parallel subtask, so that each parallel subtask has all States, and then the parallel subtask decides which state to use.

  1. Broadcast state: different from the ordinary operator state, the state of each parallel subtask is the same. However, each parallel subtask still accesses its own state, but the state is the same. If each operator has the same subtask state, then it is most suitable for parallel subtasks.

Operators with operator broadcast state will copy the state to all new tasks when expanding or shrinking the capacity. The reason for this is that the broadcast state can ensure that the states of all tasks are the same. In the case of volume reduction, since the state will not be lost after replication, we can simply stop the extra tasks.

2. Use of operator state

public class StateTest1_OperatorState {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket text stream
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // Convert to SensorReading type
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // Define a stateful map operation to count the number of data in the current partition
        SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());

        resultStream.print();

        env.execute();
    }

    // Custom MapFunction
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{
        // Define a local variable as the operator state
        private Integer count = 0;

        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }

        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }

        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for( Integer num: state )
                count += num;
        }
    }
}
  1. The definition of operator state is the same as that of ordinary member variables, but the corresponding operator processing function should inherit the corresponding interface, such as ListCheckpointed, and the logic of user-defined state snapshot and recovery.

Keywords: Java flink

Added by Qlubbie on Tue, 08 Mar 2022 04:59:25 +0200