Flink Tutorial - Flnk 1.11 Streaming Data ORC Format Writing file


In flink, StreamingFileSink is an important sink for writing streaming data to the file system. It supports writing data in row format (json, csv, etc.) and column format (orc, parquet).
Hive is a broad data storage, while ORC, as a special optimized column storage format of hive, plays an important role in the storage format of hive. Today, we will focus on using StreamingFileSink to write streaming data to the file system in ORC format, which is supported by Flink version 1.11.

Introduction to StreamingFileSink

StreamingFileSink provides two static methods to construct the corresponding sink, forRowFormat to construct the sink for writing row format data, and forBulkFormat to construct the sink for writing column format data.

Let's look at the method forBulkFormat.

/**
     * Creates the builder for a {@link StreamingFileSink} with bulk-encoding format.
     *
     * @param basePath the base path where all the buckets are going to be created as
     *     sub-directories.
     * @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the
     *     buckets.
     * @param <IN> the type of incoming elements
     * @return The builder where the remaining of the configuration parameters for the sink can be
     *     configured. In order to instantiate the sink, call {@link BulkFormatBuilder#build()}
     *     after specifying the desired parameters.
     */
    public static <IN> StreamingFileSink.DefaultBulkFormatBuilder<IN> forBulkFormat(
            final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
        return new StreamingFileSink.DefaultBulkFormatBuilder<>(
                basePath, writerFactory, new DateTimeBucketAssigner<>());
    }

forBulkFormat has two parameters, the first is a path written to, and the second is an implementation BulkWriter used to create writer s. Factory class for Factory interface.

Write orc factory class

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-orc_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

Flnk gives us the factory class OrcBulkWriterFactory written in orc format, so let's take a brief look at some of its variables.

@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {

	private static final Path FIXED_PATH = new Path(".");

	private final Vectorizer<T> vectorizer;
	private final Properties writerProperties;
	private final Map<String, String> confMap;
	private OrcFile.WriterOptions writerOptions;
	
	public OrcBulkWriterFactory(Vectorizer<T> vectorizer) {
		this(vectorizer, new Configuration());
	}
	public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration) {
		this(vectorizer, null, configuration);
	}
	public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration) {
        ...................
	}
    .............
}

Vectorization operation

Flnk uses hive's VectorizedRowBatch to write data in ORC format, so the input data needs to be organized into VectorizedRowBatch objects, and the function of this transformation is accomplished by the variable in OrcBulkWriterFactory, which is also the abstract class Vectorizer class. The main method to achieve this is org.apache.flink.orc.vector.Vectorizer#vectorize method.

In flink, a RowData Vectorizer is provided that supports the RowData input format. In method vectorize, data in RowData format is converted to VectorizedRowBatch type according to different types.

	@Override
	public void vectorize(RowData row, VectorizedRowBatch batch) {
		int rowId = batch.size++;
		for (int i = 0; i < row.getArity(); ++i) {
			setColumn(rowId, batch.cols[i], fieldTypes[i], row, i);
		}
	}

Construct OrcBulkWriterFactory

The factory class provides a total of three construction methods, and we see that the most complete one accepts three parameters. The first one is the Vectorizer object we mentioned above, the second one is a configuration property written in orc format, and the third one is a configuration file for hadoop.

Written configuration from https://orc.apache.org/docs/hive-config.html Can be as follows.

Instance Explanation

Construct source

First, we customize a source to simulate the generation of RowData data, which is also simpler, mainly to generate random numbers of type int and double.

	public static class MySource implements SourceFunction<RowData>{
		@Override
		public void run(SourceContext<RowData> sourceContext) throws Exception{
			while (true){
				GenericRowData rowData = new GenericRowData(2);
				rowData.setField(0, (int) (Math.random() * 100));
				rowData.setField(1, Math.random() * 100);
				sourceContext.collect(rowData);
				Thread.sleep(10);
			}
		}
		@Override
		public void cancel(){

		}
	}

Construct OrcBulkWriterFactory

Next, define the parameters required to construct the OrcBulkWriterFactory.

		//Writing attributes in orc format
		final Properties writerProps = new Properties();
		writerProps.setProperty("orc.compress", "LZ4");

		//Define type and field name
		LogicalType[] orcTypes = new LogicalType[]{new IntType(), new DoubleType()};
		String[] fields = new String[]{"a1", "b2"};
		TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(RowType.of(
				orcTypes,
				fields));

		//Construct factory class OrcBulkWriterFactory
		final OrcBulkWriterFactory<RowData> factory = new OrcBulkWriterFactory<>(
				new RowDataVectorizer(typeDescription.toString(), orcTypes),
				writerProps,
				new Configuration());

Construct StreamingFileSink

		StreamingFileSink orcSink = StreamingFileSink
				.forBulkFormat(new Path("file:///tmp/aaaa"), factory)
				.build();


Keywords: Java flink

Added by ssmitra on Mon, 17 Jan 2022 10:46:53 +0200