Keyed flow conversion operator
keyby
If you want to aggregate now, you must group first, so keyby is very important
The keyby operator is special and is not a step-by-step operation
Not the real aoprete
It defines the relationship between two tasks
Data transmission mode
keyby groups based on defined key s
A repartition is performed based on the hashcode of each key
When the same key is repartitioned, it must be put into the same partition. The current partition must have all the data of this key. The same partition can have multiple keys
The conversion of keyby involves the conversion of a data structure
datastream ==> keyedStream
Source code of observation data:
There is no aggregation method for datastream itself. When converted to keyedStream, there is an aggregation method
This keyedStream is essentially inherited from datastream
keyby is not an end in itself, the purpose is to aggregate
Aggregation operator
Rolling aggregation operator: in the process of aggregation, the data is far from constant, and the aggregation results are constantly updated in the previous results.
Common operators
sum
min
max
minby
maxby
The difference between min and minbu is to specify a field.
The entire source data corresponding to the minimum value of the specified field output by minby
min only selects the minimum value of the specified field, and other fields output the value of the first data
Min minby code example
Create environment to read data
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt")
Convert the read data to a sample class
val mapedData: DataStream[SensorReading] = SensorData.map(data => { val arr = data.split(", ") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) })
Grouping and min aggregation
val resultData: DataStream[SensorReading] = mapedData.keyBy("id").min("temperature")
Grouping and minBy aggregation
val resultData: DataStream[SensorReading] = mapedData.keyBy("id").minBy("temperature")
Print execution
resultData.print() env.execute("aaaatest")
result
min =>
3> SensorReading("Sensor_1",1547718145,59.0)
3> SensorReading("Sensor_1",1547718145,59.0)
3> SensorReading("Sensor_1",1547718145,36.0)
3> SensorReading("Sensor_1",1547718145,6.0)
3> SensorReading("Sensor_1",1547718145,3.0)
In the result, only the field where min is located has changed, while other fields are consistent with the first field
minBy =>
3> SensorReading("Sensor_1",1547718129,136.0)
3> SensorReading("Sensor_1",1547718144,6.0)
3> SensorReading("Sensor_1",1547718152,3.0)
3> SensorReading("Sensor_1",1547718152,3.0)
3> SensorReading("Sensor_1",1547718152,3.0)
The result data of minBy is the data corresponding to the minimum value
Note: because it is parallel processing, considering transmission and calculation, the order of result output is very different from that in the file. If you want the processing order to be consistent with the file order, you can set the global parallelism to 1
Complete code
//Create environment first val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt") // Convert to sample class val mapedData: DataStream[SensorReading] = SensorData.map(data => { val arr = data.split(", ") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) val resultData: DataStream[SensorReading] = mapedData.keyBy("id").min("temperature") val resultData2: DataStream[SensorReading] = mapedData.keyBy("id").minBy("temperature") resultData.print() resultData2.print() env.execute("aaaatest")
reduce code example
Requirements: the output contains the latest timestamp and the minimum temperature value
Create environment to read data
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt")
Convert the read data to a sample class
val mapedData: DataStream[SensorReading] = SensorData.map(data => { val arr = data.split(", ") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) })
reduce1: use lamda expression
val resultData: DataStream[SensorReading] = mapedData.keyBy("id").reduce { (curdata, newdata) => SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature)) }
reduce2: the user-defined class implements reduceFunction
val resultData: DataStream[SensorReading] = mapedData.keyBy("id").reduce(new myReduce) // Implement a function by yourself. First inherit the reducefunction [processed data] class myReduce extends ReduceFunction[SensorReading] { override def reduce(curdata: SensorReading, newdata: SensorReading): SensorReading = { SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature)) } } // reduce is to specify the data. Input parameter 1 indicates that it is the result of the previous specification, input parameter 2 to table new data, and then return a result after the specification // The core content of this method is the same as the expression method
Print execution
resultData.print() env.execute("aaaa")
Complete code
object transformtest { def main(args: Array[String]): Unit = { //Create environment first val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val SensorData: DataStream[String] = env.readTextFile("F:\\idea1\\flinktest\\src\\main\\resources\\sensorfile.txt") // Convert to sample class val mapedData: DataStream[SensorReading] = SensorData.map(data => { val arr = data.split(", ") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) val resultData: DataStream[SensorReading] = mapedData.keyBy("id").reduce { (curdata, newdata) => SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature)) } val resultData2: DataStream[SensorReading] = mapedData.keyBy("id").reduce(new myReduce) resultData.print() resultData2.print() env.execute("aaaa") } } class myReduce extends ReduceFunction[SensorReading] { override def reduce(curdata: SensorReading, newdata: SensorReading): SensorReading = { SensorReading(curdata.id, newdata.timeStamp.max(curdata.timeStamp), curdata.temperature.min(newdata.temperature)) } }