Spark Doris Connector design

Spark Doris Connector is a new feature introduced by Doris in version 0.12. Users can use this function to directly read and write the data stored in Doris through spark, and support SQL, Dataframe, RDD and other methods.

From the perspective of Doris, introducing its data into Spark can use a series of rich ecological products of Spark, broaden the imagination of products, and make the joint query between Doris and other data sources possible

1. Technical selection

In the early scheme, we directly provided Doris's JDBC interface to Spark. For the JDBC data source, the working principle of the Spark side is that the Spark Driver accesses the FE of Doris through the JDBC protocol to obtain the Schema of the corresponding Doris table. Then, according to a certain field, the query is divided into multiple Partition sub query tasks and distributed to multiple Spark executors. Executors convert the Partition they are responsible for into the corresponding JDBC query, and directly access Doris's FE interface to obtain the corresponding data. This scheme hardly needs to change the code, but because Spark can't perceive Doris's data distribution, it will cause great query pressure to Doris.

So the community developed a new data source for Doris, spark Doris connector. Under this scheme, Doris can expose Doris data and distribute it to spark. Spark Driver accesses Doris FE to obtain the Schema and underlying data distribution of Doris table. Then, according to this data distribution, reasonably allocate data query tasks to Executors. Finally, spark Executors access different BE for query. It greatly improves the efficiency of query.

2. Usage

Compile doris-spark-1.0.0-SNAPSHOT.jar in the extension / Spark Doris connector / directory of Doris code base. Add this jar package to Spark's ClassPath to use Spark on Doris function

2.1 reading

2.1.1 SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

2.1.2 DataFrame

val dorisSparkDF = spark.read.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  .load()

dorisSparkDF.show(5)

2.1.3 RDD

import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
  tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
  cfg = Some(Map(
    "doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
    "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
  ))
)

dorisSparkRDD.collect()

2.2 writing

2.2.1 SQL mode

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
# or
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE

2.2.2 DataFrame(batch/stream) mode

## batch sink
val mockDataDF = List(
  (3, "440403001005", "21.cn"),
  (1, "4404030013005", "22.cn"),
  (33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  .save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
  .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
  .option("startingOffsets", "latest")
  .option("subscribe", "$YOUR_KAFKA_TOPICS")
  .format("kafka")
  .load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
  .writeStream
  .format("doris")
  .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  .start()
  .awaitTermination()

2.3 applicable scenarios

2.3.1 processing historical data changes

Before Spark Doris Connector, Doris had a high cost of modifying data, but data modification and deletion requirements often appeared in real business.

Before Spark Doris Connector

Scheme 1: do not delete the wrong data imported before. Use the replace method to pour all the wrong data into a negative value, brush the value to 0, and then import the correct data.

Scheme 2: delete the wrong data, and then insert the correct data.

There is a problem with the above schemes, that is, there is always a period of time when the data value is 0. This is intolerable for external systems. For example, advertisers need to view their account information. If the account is displayed as 0 due to data change, it will be unacceptable and unfriendly.

Spark Doris Connector solution

With Spark Doris Connector, it will be more convenient to handle historical data changes

As shown in the above figure, the first line is error data and the second line is correct data. Spark can link two streams. One stream uses Spark Doris Connector to connect Doris, and the other stream connects external correct data (such as Parquet file generated by business department). Do the diff operation in spark and calculate the diff value of all values, that is, the result of the last line in the figure. Import it into Doris. The advantage of this is that it can eliminate the intermediate time window, and it is also convenient for the business parties that often use spark to operate. It is very friendly.

Use Spark to jointly analyze the data in Doris and other data sources

Many business departments put their data on different storage systems, such as some online analysis and report data in Doris, some structured retrieval data in Elasticsearch, and some data that need things in MySQL, etc. Businesses often need to analyze across multiple storage sources. After connecting Spark and Doris through Spark Doris Connector, businesses can directly use Spark to jointly query and calculate the data in Doris with multiple external data sources.

2.3.2 data real-time processing and writing

At present, Spark doris connector supports reading data from the data source through SQL and DataFrame, and writing data to doris through SQL and DataFrame. At the same time, Spark's computing power can also be used to perform some real-time calculations on the data.

3. Relevant configuration parameters

3.1 general configuration

KeyDefault ValueComment
doris.fenodesDoris FE http address, supports multiple addresses, separated by commas
doris.table.identifierDoris table name, such as db1.tbl1
doris.request.retries3Number of retries to send a request to Doris
doris.request.connect.timeout.ms30000Connection timeout for sending request to Doris
doris.request.read.timeout.ms30000Read timeout for sending request to Doris
doris.request.query.timeout.s3600The timeout for querying doris. The default value is 1 hour, - 1 means there is no timeout limit
doris.request.tablet.sizeInteger.MAX_VALUEThe number of Doris tablets corresponding to an RDD Partition. The smaller this value is, the more partitions will be generated. So as to improve the parallelism of Spark side, but at the same time, it will cause greater pressure on Doris.
doris.batch.size1024The maximum number of rows to read data from BE at a time. Increase this value to reduce the number of connections between Spark and Doris. So as to reduce the additional time overhead caused by network delay.
doris.exec.mem.limit2147483648Memory limit for a single query. The default is 2GB in bytes
doris.deserialize.arrow.asyncfalseDoes it support asynchronous conversion of Arrow format to RowBatch required for spark Doris connector iteration
doris.deserialize.queue.size64Asynchronously convert the internal processing queue in Arrow format. It takes effect when doris.deserialize.arrow.async is true

3.2 SQL and Dataframe proprietary configuration

KeyDefault ValueComment
userUser name to access Doris
passwordPassword to access Doris
doris.filter.query.in.max.count100The maximum number of in expression value list elements in predicate pushdown. If this number is exceeded, the in expression conditional filtering is processed on the Spark side.

3.3 RDD proprietary configuration

KeyDefault ValueComment
doris.request.auth.userUser name to access Doris
doris.request.auth.passwordPassword to access Doris
doris.read.fieldRead the column name list of Doris table, and use commas to separate multiple columns
doris.filter.queryFilter the expression for reading data, which is transmitted to Doris. Doris uses this expression to complete source side data filtering.

3.4 Doris and Spark column type mapping

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.StringType1
DATETIMEDataTypes.StringType1
BINARYDataTypes.BinaryType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDataTypes.StringType
VARCHARDataTypes.StringType
DECIMALV2DecimalType
TIMEDataTypes.DoubleType
HLLUnsupported datatype
  • Note: in Connector, DATE and DATETIME are mapped to string. Due to the processing logic of Doris's underlying storage engine, when the time type is used directly, the covered time range cannot meet the requirements. So use   String   Type directly returns the corresponding time readable text

Keywords: Scala Big Data Spark Doris

Added by jackyhuphp on Wed, 13 Oct 2021 00:15:42 +0300