Talk about Java type erasure, Lambda expression used in Flink, lost information and Flink type hint mechanism

Recently, when learning Flink, I found that due to the existence of Java type erasure, generic types cannot be detected when using Lambda expressions in Flink. We need to use Flink type hint mechanism to solve it. Now let's analyze it in depth!

What is Java generic erasure

This article does not introduce Java generics. Students who do not know much about generics strongly recommend this blog: https://www.cnblogs.com/coprince/p/8603492.html

Look at two examples:

(1) Example 1

List arrayList = new ArrayList();
arrayList.add("abc");
arrayList.add(12);

for(int i = 0; i< arrayList.size();i++){
	String item = (String)arrayList.get(i);
	System.out.println(item);
}

Operation error:

Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String

Because we do not specify a generic type, any type of data can be stored in the List. The above code first adds a String type data to the List, and then adds an Integer type data. The compiler will not prompt any error, but it will report an error at runtime.

This is because the List is based on the data type added for the first time, that is, it is used in the form of String. After adding Integer data, the program crashes. In order to solve similar problems in the compilation stage, we can execute generic types in the code:

List<String> arrayList = new ArrayList<String>();
//arrayList.add(100);  During the compilation phase, the compiler prompts an error

(2) Example 2

List<String> stringArrayList = new ArrayList<String>();
List<Integer> integerArrayList = new ArrayList<Integer>();

Class classStringArrayList = stringArrayList.getClass();
Class classIntegerArrayList = integerArrayList.getClass();

System.out.println(classStringArrayList==classIntegerArrayList);

Output result: true

The above example can prove that the program will take de genericization measures after compilation. In other words, generics in Java are only valid at the compilation stage. In the compilation process, after the generic results are correctly verified, the generic related information will be erased at runtime. The compiler will only add type checking and conversion methods at the boundary where the object enters and leaves the JVM. The generic information will not enter the runtime stage, which is called Java type erasure.

There are two ways to erase generics. Java uses the first way, and C + + and c# use the second way

  • Method 1: Code sharing. Only the same object code is generated for generic types under the same original type
  • Method 2: Code specialization. Generate different object code for each generic type.

They are also commonly known as "false" generics and "true" generics, respectively. As a result, the program is not aware of generic types at runtime, so the code in example 1 above is decompiled and only List is left, which is actually class <? The comparison of extensions ArrayList > results in the true output of example 2.

Why does Java use the Code sharing mechanism for type erasure? There are two reasons: first, Java generics only appeared in version 1.5. Before that, the JVM has experienced a long period of development without generics. If Code specialization is adopted, it will have to make bone breaking changes to the JVM type system, and forward compatibility cannot be guaranteed. Second, Code specialization generates different object codes for each generic type. If there are 10 lists of different generic types, 10 bytecodes will be generated, resulting in code expansion.

Impact of type erasure on Flink

Let's look at a simple piece of code

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1));
mapDataStream.print();
env.execute();

An error is reported during program operation. The error causes are as follows:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.

This means that the parameter type in Tuple2 is missing. This is probably because the lambda expression cannot provide enough information to automatically detect the parameter type in Tuple2. It is recommended that we use anonymous inner classes instead.

We change to anonymous inner class implementation:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        return new Tuple2<>(value,1);
    }
});

mapDataStream.print();
env.execute();

The above code is successfully executed and printed out.

Why can't the JVM automatically detect the parameter types in Tuple2 when using lambda expressions, while anonymous inner classes can?

Tuple2 has two generics. When anonymous internal classes are used, they will be really compiled into class files, and type checking and conversion will be carried out at the boundary between the object entering and leaving the JVM, so as to ensure that the parameter types of tuple2 can be detected correctly. This approach is actually a feature of static language.

Lambda expressions call invokedynamic instructions at runtime to support method calls in dynamic languages. Specifically, It will call the point (CallSite) is abstracted into a Java class and exposes the method calls and method links originally controlled by the Java virtual machine to the application. During the running process, each invokedynamic instruction will bind a call point and call the method handle linked to the call point. When the invokedynamic instruction is executed for the first time, the Java virtual opportunity calls the instruction Corresponding startup method (BootStrap Method) to generate the call point mentioned earlier and bind it to the invokedynamic instruction. In the subsequent running process, the Java virtual machine will directly call the method handle linked by the bound call point. That is, it will be determined when its logic is executed for the first time. However, after the object enters the JVM, it will be type erased, resulting in insufficient data The information detects the specific types of two generics in Tuple2.

The above statement may be a little vague. You need to understand the principle of JVM invokedynamic (ha ha, in fact, I didn't dig deeply, so I have the opportunity to supplement it).

To overcome the problems caused by type erasure, the type hint mechanism is provided in the Flink type system. After the returns method is called after map, the return type can be specified.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<Tuple2<String, Integer>> mapDataStream = dataStream.map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapDataStream.print();
env.execute();

In addition, for certain data types (i.e. data types without generics), lambda expressions can be used in flex at will. For example:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink"));
DataStream<String> mapDataStream = dataStream.map(word -> word+"_1");

mapDataStream.print();
env.execute();

The above code will execute normally.

reference material

  1. https://www.cnblogs.com/coprince/p/8603492.html
  2. https://time.geekbang.org/column/article/12564
  3. https://time.geekbang.org/column/article/12574
  4. https://zhuanlan.zhihu.com/p/26389041
  5. https://blog.csdn.net/nazeniwaresakini/article/details/104220123

Keywords: Java flink

Added by spider.nick on Thu, 30 Dec 2021 02:18:50 +0200