Time series database infix iox source code learning vi-1 (partition of data writing)

Welcome to the official account:

The previous chapter talked about how to create a database and how to save the description information of the database. For details, see: https://my.oschina.net/u/3374539/blog/5025128

This chapter records how data is written and saved. It will be written in two parts:

  • An introduction to how partitioning is done
  • An introduction to specific writing

When it comes to data writing, you must be able to connect to the server. IOx project provides a variety of ways to interact with the server, namely Grpc and Http. Based on these two communication methods, it also extends and supports influxdb2_client and influxdb_iox_client.

Based on influxdb_iox_client I wrote an example of data writing and query to observe how the interface is organized. The code is as follows:

#[tokio::main]
async fn main() {
    {
        let connection = Builder::default()
            .build("http://127.0.0.1:8081")
            .await
            .unwrap();
        write::Client::new(connection)
            .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
            .await
            .expect("failed to write data");
    }

    let connection = Builder::default()
        .build("http://127.0.0.1:8081")
        .await
        .unwrap();

    let mut query = flight::Client::new(connection)
        .perform_query("a", "select * from myMeasurement")
        .await
        .expect("query request should work");

    let mut batches = vec![];

    while let Some(data) = query.next().await.expect("valid batches") {
        batches.push(data);
    }

    let format1 = format::QueryOutputFormat::Pretty;
    println!("{}", format1.format(&batches).unwrap());
}


+------------+--------+--------+-------------------------+
| fieldKey   | tag1   | tag2   | time                    |
+------------+--------+--------+-------------------------+
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+

Because I ran it several more times, I can see that the data is repeatedly inserted.

It should also be mentioned here that the written statement format can be seen in:

[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format

Write:: the write method in the client generates a WriteRequest structure and uses RPC to call the remote write method. Open Src / incluxdb_ ioxd/rpc/write. Rs: line 22 shows the specific implementation of the method.

async fn write(
        &self,
        request: tonic::Request<WriteRequest>,
    ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
        let request = request.into_inner();
        //Get the database name written in the client above, and pass in "a" in the above example
        let db_name = request.db_name;
        //Here we get the written LineProtocol
        let lp_data = request.lp_data;
        let lp_chars = lp_data.len();
        //Parsing the contents of LineProtocol
        //The lp in the example will be resolved as:
        //measurement: "myMeasurement"
        //tag_set: [("tag1", "value1"), ("tag2", "value2")]
        //field_set: [("fieldKey", "123")]
        //timestamp: 1556813561098000000
        let lines = parse_lines(&lp_data)
            .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
            .map_err(|e| FieldViolation {
                field: "lp_data".into(),
                description: format!("Invalid Line Protocol: {}", e),
            })?;

        let lp_line_count = lines.len();
        debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
        //Save data
        self.server
            .write_lines(&db_name, &lines)
            .await
            .map_err(default_server_error_handler)?;
        //Return success
        let lines_written = lp_line_count as u64;
        Ok(Response::new(WriteResponse { lines_written }))
    }

Keep watching self server. write_ Implementation of lines:

 pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
        self.require_id()?;
        //Verify the name and get the relevant information stored in memory when creating the database
        let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
        let db = self
            .config
            .db(&db_name)
            .context(DatabaseNotFound { db_name: &*db_name })?;
        //Here we begin to implement the strategy related to fragmentation
        let (sharded_entries, shards) = {
            //Read the partition policy configured when creating the database
            let rules = db.rules.read();
            let shard_config = &rules.shard_config;
            //According to the data and shard strategy, find the partition corresponding to the data one by one
            //Write to a structure such as list < partition ID and list < data > >
            //See the specific structure information later
            let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
                .context(LineConversion)?;
            //Then return the configuration of all partitions to the caller
            let shards = shard_config
                .as_ref()
                .map(|cfg| Arc::clone(&cfg.shards))
                .unwrap_or_default();

            (sharded_entries, shards)
        };

        //Traverse the map method according to the set returned above and write it to each partition
        futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

        Ok(())
    }

Here describes the main logic of writing a piece of data: when writing data, first divide the data into specific partitions (use the List structure to store the data corresponding to all partitions), and then write the data in parallel

Next, let's look at how the data is partitioned:

pub fn lines_to_sharded_entries(
    lines: &[ParsedLine<'_>],
    sharder: Option<&impl Sharder>,
    partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
    let default_time = Utc::now();
    let mut sharded_lines = BTreeMap::new();

    //Traverse all the data to be inserted
    for line in lines {
        //Find the shard first
        let shard_id = match &sharder {
            Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
            None => None,
        };
        //Then judge which partition it belongs to
        let partition_key = partitioner
            .partition_key(line, &default_time)
            .context(GeneratingPartitionKey)?;
        let table = line.series.measurement.as_str();
        //Finally, it is stored in a map
        //Mapping relationship between shard - > partition - > Table - > List < data >
        sharded_lines
            .entry(shard_id)
            .or_insert_with(BTreeMap::new)
            .entry(partition_key)
            .or_insert_with(BTreeMap::new)
            .entry(table)
            .or_insert_with(Vec::new)
            .push(line);
    }

    let default_time = Utc::now();
    //Finally, traverse the map and convert it to the List structure mentioned earlier
    let sharded_entries = sharded_lines
        .into_iter()
        .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
        .collect::<Result<Vec<_>>>()?;
    Ok(sharded_entries)
}

Here, we understand the concept of shard is a machine or a group of machines, called a shard, which is responsible for the real storage of data.

partition is understood as a folder, and the specific storage path on the shard.

Here's how to complete the division of shard s:

impl Sharder for ShardConfig {
    fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
        if let Some(specific_targets) = &self.specific_targets {
            //If the data is matched, it will be returned if the rules are met, and the current shard can be used
            //The official code only implements the strategy of shard according to the table name
            //It seems that this configuration can only be set through grpc. The advantage may be that there is a management interface that can be dynamically modified in the future
            if specific_targets.matcher.match_line(line) {
                return Ok(specific_targets.shard);
            }
        }
        //If there is no configuration, use the hash method
        //Hash the whole data, and then compare the hash of the machine to find the appropriate node
        //If it is not found, it is placed in the first node of hashing
        //The hash algorithm is shown later
        if let Some(hash_ring) = &self.hash_ring {
            return hash_ring
                .shards
                .find(LineHasher { line, hash_ring })
                .context(NoShardsDefined);
        }

        NoShardingRuleMatches {
            line: line.to_string(),
        }
        .fail()
    }
}
//If the specific Hash algorithm is fully configured, the distribution will be particularly scattered, and almost different measuring points are placed in different places
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        //If the use of table name is configured, add tablename to the hash
        if self.hash_ring.table_name {
            self.line.series.measurement.hash(state);
        }
        //Then hash according to the value of the configured column
        for column in &self.hash_ring.columns {
            if let Some(tag_value) = self.line.tag_value(column) {
                tag_value.hash(state);
            } else if let Some(field_value) = self.line.field_value(column) {
                field_value.to_string().hash(state);t
            }
            state.write_u8(0); // column separator
        }
    }
}

Next, let's look at the default partition method:

impl Partitioner for PartitionTemplate {
    fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
        let parts: Vec<_> = self
            .parts
            .iter()
             //Match the partition policy, either single or composite
             //Based on current value and time table
             //The rest will also support regular expressions and strftime patterns
            .map(|p| match p {
                TemplatePart::Table => line.series.measurement.to_string(),
                TemplatePart::Column(column) => match line.tag_value(&column) {
                    Some(v) => format!("{}_{}", column, v),
                    None => match line.field_value(&column) {
                        Some(v) => format!("{}_{}", column, v),
                        None => "".to_string(),
                    },
                },
                TemplatePart::TimeFormat(format) => match line.timestamp {
                    Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
                    None => default_time.format(&format).to_string(),
                },
                _ => unimplemented!(),
            })
            .collect();
        //Finally, a combined file name is returned, either a-b-c or a single value
        Ok(parts.join("-"))
    }
}

Here, the partition work is completed. The next article continues to analyze how to write.

Have fun

Added by argyrism on Fri, 04 Mar 2022 01:14:29 +0200