Flink -- transform (keyed flow conversion operator)

Keyed flow conversion operator

keyby

If you want to aggregate now, you must group first, so keyby is very important
The keyby operator is special and is not a step-by-step operation
Not the real aoprete
It defines the relationship between two tasks
Data transmission mode
keyby groups based on defined key s
A repartition is performed based on the hashcode of each key

When the same key is repartitioned, it must be put into the same partition. The current partition must have all the data of this key. The same partition can have multiple keys
The conversion of keyby involves the conversion of a data structure
datastream ==> keyedStream
Source code of observation data:
There is no aggregation method for datastream itself. When converted to keyedStream, there is an aggregation method

This keyedStream is essentially inherited from datastream
keyby is not an end in itself, the purpose is to aggregate

Aggregation operator

Rolling aggregation operator: in the process of aggregation, the data is far from constant, and the aggregation results are constantly updated in the previous results.
Common operators
sum
min
max
minby
maxby
The difference between min and minbu is to specify a field.
The entire source data corresponding to the minimum value of the specified field output by minby
min only selects the minimum value of the specified field, and other fields output the value of the first data

Min minby code example

Create environment to read data

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt")

Convert the read data to a sample class

val mapedData: DataStream[SensorReading] = SensorData.map(data => {
      val arr = data.split(", ")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

Grouping and min aggregation

val resultData: DataStream[SensorReading] = mapedData.keyBy("id").min("temperature")

Grouping and minBy aggregation

val resultData: DataStream[SensorReading] = mapedData.keyBy("id").minBy("temperature")

Print execution

resultData.print()
env.execute("aaaatest")

result

min =>
3> SensorReading("Sensor_1",1547718145,59.0)
3> SensorReading("Sensor_1",1547718145,59.0)
3> SensorReading("Sensor_1",1547718145,36.0)
3> SensorReading("Sensor_1",1547718145,6.0)
3> SensorReading("Sensor_1",1547718145,3.0)
In the result, only the field where min is located has changed, while other fields are consistent with the first field

minBy =>
3> SensorReading("Sensor_1",1547718129,136.0)
3> SensorReading("Sensor_1",1547718144,6.0)
3> SensorReading("Sensor_1",1547718152,3.0)
3> SensorReading("Sensor_1",1547718152,3.0)
3> SensorReading("Sensor_1",1547718152,3.0)
The result data of minBy is the data corresponding to the minimum value

Note: because it is parallel processing, considering transmission and calculation, the order of result output is very different from that in the file. If you want the processing order to be consistent with the file order, you can set the global parallelism to 1

Complete code

//Create environment first
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt")
    // Convert to sample class
    val mapedData: DataStream[SensorReading] = SensorData.map(data => {
      val arr = data.split(", ")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })


    val resultData: DataStream[SensorReading] = mapedData.keyBy("id").min("temperature")

    val resultData2: DataStream[SensorReading] = mapedData.keyBy("id").minBy("temperature")
    resultData.print()
    resultData2.print()
    env.execute("aaaatest")

reduce code example

Requirements: the output contains the latest timestamp and the minimum temperature value

Create environment to read data

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt")

Convert the read data to a sample class

val mapedData: DataStream[SensorReading] = SensorData.map(data => {
      val arr = data.split(", ")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

reduce1: use lamda expression

 val resultData: DataStream[SensorReading] = mapedData.keyBy("id").reduce { (curdata, newdata) =>
      SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature))
    }

reduce2: the user-defined class implements reduceFunction

val resultData: DataStream[SensorReading] = mapedData.keyBy("id").reduce(new myReduce)

// Implement a function by yourself. First inherit the reducefunction [processed data]
class myReduce extends ReduceFunction[SensorReading] {
  override def reduce(curdata: SensorReading, newdata: SensorReading): SensorReading = {
    SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature))
  }
}
// reduce is to specify the data. Input parameter 1 indicates that it is the result of the previous specification, input parameter 2 to table new data, and then return a result after the specification
// The core content of this method is the same as the expression method

Print execution

    resultData.print()
    env.execute("aaaa")

Complete code

object transformtest {
  def main(args: Array[String]): Unit = {


    //Create environment first
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt")
    // Convert to sample class
    val mapedData: DataStream[SensorReading] = SensorData.map(data => {
      val arr = data.split(", ")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val resultData: DataStream[SensorReading] = mapedData.keyBy("id").reduce { (curdata, newdata) =>
      SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature))
    }

    val resultData2: DataStream[SensorReading] = mapedData.keyBy("id").reduce(new myReduce)
    resultData.print()
    resultData2.print()
    env.execute("aaaa")
  }
}

class myReduce extends ReduceFunction[SensorReading] {
  override def reduce(curdata: SensorReading, newdata: SensorReading): SensorReading = {
    SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature))
  }
}

Keywords: Big Data flink

Added by 11Tami on Tue, 07 Dec 2021 09:19:32 +0200