Flip on yarn specifies the third-party jar package

1. Background

When submitting the flick task to yarn, we usually use the shade plug-in to package all the required jar packages into a large jar package, and then submit them to yarn through the flick run command. However, if there are frequent code changes, or many colleagues in the team need to develop multiple business modules in the same project, and there are few changes, each business module is packaged into a very large jar package, which will become very troublesome when uploading the final task jar.

Therefore, we need to put the third-party jar package on the server, and then just upload the user's thin jar each time to start the corresponding flink task, which can greatly reduce the time consumption of data transmission.

2. Parameters

To specify a third-party jar package, you need to add two parameters after the flick run command: - yt and - C.

  1. -yt: upload all files in the specified directory to the hdfs directory corresponding to the flink task. Then, when the flink task runs, the entire directory on hdfs will be copied to the local directory of the machine where TM is located. You can find the machine where TM is located through the flick UI interface, then use the jps command to find the pid of the corresponding task, and then jump to the / proc / process number / fd directory to view all the files in this directory, and you can see all the jar packages required for the process to run.
  2. -C: Specify the classpath of the java program run by both driver and taskManager. The file path specified by the command must be in URI format, and the local file starts with file: / / / note that the file wildcard "*" cannot be used. If it is a relative path (relative to the directory where you run the flink run command), you can start with flink:.

However, many third-party jar packages are used in the flink program. What can I do?

For example, I used 30 third-party jar packages. The - yt parameter is OK. You can specify a directory, but the - C parameter can only specify one file. It's too troublesome for me to use the - C parameter 30 times.

In fact, this problem is easy to solve.

Write your own project as a multi module project, set the scope tags of all third-party dependencies in the pom file of the project to provided, then create a new module, put all jar packages that need to be uploaded to the server into the pom file of the module, and do not specify the scope, which means that they will be packaged into the final jar package during packaging.

Multi module project, project pom file content:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.baishancloud.log</groupId>
    <artifactId>thunderfury-flink-maven</artifactId>
    <version>1.0</version>

    <name>thunderfury-flink-maven</name>

    <modules>
        <module>common</module>
        <module>streaming-directories-aggregate</module>
        <module>streaming-fm-single-machine-statistics</module>
        <module>streaming-netease-analyzer</module>
        <module>streaming-bilibili-quality</module>
        <module>streaming-icbc-report</module>
        <module>streaming-live</module>
        <module>streaming-miguict-audit</module>
        <module>streaming-murloc</module>
        <module>streaming-302-traffic</module>
        <module>third-part-package</module>
    </modules>

    <packaging>pom</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.13.3</flink.version>
        <scala.version>2.12.15</scala.version>
        <scala.package.version>2.12</scala.package.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-io</artifactId>
                    <groupId>commons-io</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <!-- connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-codec</artifactId>
                    <groupId>commons-codec</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>httpclient</artifactId>
                    <groupId>org.apache.httpcomponents</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>httpcore</artifactId>
                    <groupId>org.apache.httpcomponents</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks_${scala.package.version}</artifactId>
            <version>1.1.14_flink-${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <!-- other -->
        <dependency>
            <groupId>com.baishancloud.log</groupId>
            <artifactId>log-format-scala_${scala.package.version}</artifactId>
            <version>3.1.43</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.15</version>
            <scope>provided</scope>
        </dependency>
        <!--slf4j+log4j2-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- The test code runs the plug-in, which can be skipped before packaging test The code of all classes that comply with the naming convention under the package -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <!-- Package plug-ins -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- java Compiling plug-ins -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- scala Compiling plug-ins -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <!-- Directory of resource files to be included, write relative path, relative to the root path of the project -->
                <directory>src/main/resources</directory>
                <includes>
                    <!-- The file to include, relative to the directory specified above -->
                    <include>*</include>
                </includes>
            </resource>
        </resources>
    </build>

    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>https://maven.aliyun.com/nexus/content/repositories/central/</url>
        </repository>
    </repositories>

</project>

The module pom file used to package the third-party jar package

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>third-part-package</artifactId>
    <version>1.0</version>

    <parent>
        <groupId>com.baishancloud.log</groupId>
        <artifactId>thunderfury-flink-maven</artifactId>
        <version>1.0</version>
    </parent>

    <dependencies>
        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.package.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>commons-codec</artifactId>
                    <groupId>commons-codec</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>httpclient</artifactId>
                    <groupId>org.apache.httpcomponents</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>httpcore</artifactId>
                    <groupId>org.apache.httpcomponents</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks_${scala.package.version}</artifactId>
            <version>1.1.14_flink-${flink.version}</version>
        </dependency>


        <!-- other -->
        <dependency>
            <groupId>com.baishancloud.log</groupId>
            <artifactId>log-format-scala_${scala.package.version}</artifactId>
            <version>3.1.43</version>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.15</version>
        </dependency>

    </dependencies>

</project>

2.1. Example

Submit the flink jar to yarn through DS.

First upload the dependency of the third-party jar package to the DS.

I uploaded the packaged jar packages of all third-party jar packages used in the project to the flink/other directory of DS.

 

Then, in the submission parameter setting of the Flyn task, specify the - yt and - C commands in the option parameters, both of which are relative paths (relative to running the Flyn run command). Check the jar package just uploaded to the flink/other directory in the resource.

Then start the task and observe the startup log.

From the log, you can see that a temporary startup directory is created on the machine that starts the task, and then the startup command is written to a directory Command script. The start command contains the - yt and - C parameters just specified.

Then log in to the specific cluster of the DS startup task, switch to the temporary startup directory, and then you can observe that the jar package is downloaded to the temporary startup directory.

View the files in the directory corresponding to yid after hdfs corresponding to Flyn on yarn is started.

You can see that the files we uploaded in DS and the files under flink's own lib are uploaded to hdfs.

Then check the files on the machine corresponding to TM.

You can see all the jar packages used by the process running the yarn task, including the jar package packaged by the third-party jar package specified by us.

3. Summary

- yt is to upload all files in the specified local directory to hdfs. Then, when the fly on yarn task runs, it will copy the specified files to the machine running TM (also the machine running container) through - C. then, when the fly TM runs, TM can directly read the local jar package of the machine.

Keywords: flink Yarn jar

Added by rschneid on Mon, 03 Jan 2022 08:13:20 +0200