1. Use background
Recently, the group wants to build a set of real-time data warehouse in the existing environment. After comprehensive analysis, doris will be used as the database of real-time data warehouse. The data sources include message data and business database data.
2. Data source access
It's easy to say the message data. Whether pulsar, kafka or flyk officials have provided ready-made source interfaces. It's ok to configure them according to the official documents. However, due to the existence of the mysterious organization dba, they are worried that opening bin log will increase their database pressure and can't give us access to bin log. As a leader, I can only accept it silently, Therefore, it is impossible to monitor the bin log of pg database in the way of flick CDC to obtain the change data. You can only write your own code and query the operation of the data_ Time obtains the latest data and generates kafka messages by itself.
Relevant codes are as follows:
public class MysqlToKafka { public static void main(String[] args) throws Exception { StudentInfo studentInfo = null; Properties pro = new Properties(); pro.put("bootstrap.servers", "hadoop102:9092"); pro.put("acks", "all"); pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); pro.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); pro.put("retries", 3); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(pro); MysqlJDBC mysqlJDBC = new MysqlJDBC(); ResultSet resultSet = null; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String sql = null; mysqlJDBC.init(); while(true){ String timeStamp = sdf.format(new Date()); sql = "select " + "stu_id," + "stu_name," + "course," + "stu_score," + "stu_operation_time " + "from student_info " + "where timestampdiff(minute,stu_operation_time,\""+timeStamp+"\") <= \""+2+"\";"; resultSet = mysqlJDBC.select(sql); while(resultSet.next()){ studentInfo = new StudentInfo( resultSet.getString("stu_id"), resultSet.getString("stu_name"), resultSet.getString("course"), resultSet.getString("stu_score"), resultSet.getString("stu_operation_time"), "" ); System.out.println(studentInfo); kafkaProducer.send(new ProducerRecord<>("MysqlToKafka","mysql", JSON.toJSONString(studentInfo))); } Thread.sleep(60*1000); } } }
3. Data processing and writing
data processing
The above logic of accessing data is to query the data changed in the last two minutes in the table within one minute, so there will be duplicate data.
Next, the data needs to be de duplicated. The de duplication operation can be carried out in two places
The first one: accurate de duplication through the state variables in flink
Second: when designing the doris table, the modification method of the field data is designed to be replace. The specific table creation statements can be referred to doris official document
Data writing
There are many ways to write data to doris. You can refer to doris official document , we began to consider writing numbers to doris through jdbc, but the insert into method is not suitable for long-time insertion of a large amount of data. We can only use stream load or the flink connector doris extended by doris. Because this is not the sink component officially provided by flink, we can not find relevant dependencies in the maven central warehouse, According to the official introduction of doris, you can compile a doris sink by yourself here It is found that dependencies can be added in this way, and then call the dorisink method to implement it. According to his introduction, the bottom layer is also implemented in the way of Stream load. Finally, the debugging is successful.
Curious about the relationship between the DorisDB enterprise document and the official document?? If you know something, you can say it
Add dependencies as shown here
<repositories> <repository> <id>dorisdb-maven-releases</id> <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-releases/</url> </repository> <repository> <id>dorisdb-maven-snapshots</id> <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-snapshots/</url> </repository> </repositories>
<dependency> <groupId>com.dorisdb.connector</groupId> <artifactId>flink-connector-doris</artifactId> <version>1.0.32-SNAPSHOT</version> <!-- for flink-1.11 ~ flink-1.12 --> <version>1.0.32_1.13-SNAPSHOT</version> <!-- for flink-1.13 --> </dependency>
I don't know what this step means, so it doesn't seem to affect me if I don't operate this step
take com.dorisdb.table.connector.flink.DorisDynamicTableSinkFactory Add to: src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
Relevant codes are as follows:
public class SinkConnectorToDoris { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(3000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092"); properties.setProperty("group.id", "test"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("auto.offset.reset", "earliest"); properties.put("max.poll.records", "10000"); DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer<>("MysqlToKafka", new SimpleStringSchema(), properties)); dataStream //. map (T - > JSON. Parseobject (T)) (unnecessary) .keyBy(t->t) //RichFlatMapFunction de duplicates the message .flatMap(new RichFlatMapFunction<String, String>() { private transient ValueState<Boolean> isExist; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Boolean> vsd = new ValueStateDescriptor<>("isExist", Boolean.class); isExist = getRuntimeContext().getState(vsd); } @Override public void flatMap(String s, Collector<String> collector) throws Exception { if(isExist.value() == null){ collector.collect(s); isExist.update(true); } } }) .addSink( DorisSink.sink( DorisSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://172.17.60.10:19030/doris_qa") .withProperty("load-url", "172.17.60.10:18030") .withProperty("username", "root") .withProperty("password", "") .withProperty("table-name", "student_info_gxd_test") .withProperty("database-name", "doris_qa") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .withProperty("sink.buffer-flush.interval-ms","1000") .build() ) ).setParallelism(1); env.execute(); } }
Note:
The data stream passed in by addSink should be a String generic data stream. At the beginning, the data stream was. Map (T - > JSON. Parseobject (T)), and the data could not be inserted. I hope you don't make the same mistake.