[Spark] action operator of RDD


The so-called action operator is the method to trigger job execution

reduce

Function signature: def reduce (F: (T, t) = > t): t
Function Description: aggregate all elements in RDD, first aggregate data in partitions, and then aggregate data between partitions

  @Test
  def reduce(): Unit = {
    val rdd = sc.makeRDD(List(1,2,3,4))
    //Direct results, unlike transformation operators, change from one RDD to another
    val i: Int = rdd.reduce(_+_)
    println(i)
  }

Result display

Jump top

collect

Function signature: def collect(): Array[T]
Function Description: in the driver, all elements of the data set are returned in the form of Array array

  @Test
  def collect(): Unit = {
    val rdd = sc.makeRDD(List(1,2,3,4))
    //Collect will collect the data of different partitions into the Driver side memory according to the partition order to form an array
    val ints: Array[Int] = rdd.collect()
    println(ints.mkString(","))
  }

Result display

Jump top

count

 function signature: def count(): Long
Function Description: returns the number of elements in RDD

  @Test
  def count(): Unit = {
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    //Number of elements
    val cnt = rdd.count()
    println(cnt)
  }

Result display

Jump top

first

 function signature: def first(): T
Function Description: returns the first element in the RDD

  @Test
  def first(): Unit = {
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    //Returns the first element
    println(rdd.first())
  }

Result display

Jump top

take

Function signature: def take(num: Int): Array[T]
Function Description: returns an array composed of the first n elements of RDD

  @Test
  def takeOrdered(): Unit = {
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    //Returns the first few elements
    println(rdd.take(2).mkString(","))
  }

Result display

Jump top

takeOrdered

Function signature: def takeordered (Num: int) (implicit order: ordering [t]): array [t]
Function Description: returns an array composed of the first n elements sorted by the RDD

  @Test
  def takeOrdered(): Unit = {
    val rdd = sc.makeRDD(List(1, 6, 2, 4))
    //Returns the first few in order
    println(rdd.takeOrdered(2).mkString(","))
  }

Result display

Jump top

aggregate

 function signature
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
Function Description: the data in the partition is aggregated through the initial value and the data in the partition, and then the data in the partition is aggregated with the initial value

  @Test
  def aggregate(): Unit = {
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    println(rdd.aggregate(1)(_+_,_+_))
  }

Result display

aggravat will not only participate in the calculation within partitions, but also participate in the calculation between partitions

Jump top

fold

 function signature: def fold (zerovalue: T) (OP: (T, t) = > t): t
Function Description: folding operation, simplified version of aggregate operation
Simplified operation of

  @Test
  def fold(): Unit = {
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    println(rdd.fold(1)(_ + _))
  }

Result display

Jump top

countByKey

Function signature: def countByKey(): Map[K, Long]
Function Description: count the number of key s of each type

  @Test
  def countByKey(): Unit = {
    val rdd = sc.makeRDD(List(("a",1),("b",1),("c",1),("a",1),("c",1)), 2)
    println(rdd.countByKey())
  }

Result display

Jump top

save correlation operator

 function signature

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
	 path: String,
	 codec: Option[Class[_ <: CompressionCodec]] = None): Unit

Function Description: save data to files in different formats

  @Test
  def save(): Unit = {
    val rdd = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1), ("a", 1), ("c", 1)), 2)
    rdd.saveAsTextFile("output")
    rdd.saveAsObjectFile("output1")
    rdd.saveAsSequenceFile("output2")
  }

Result display

saveAsSequenceFile requires that the data format must be KV type
Jump top

foreach

 function signature

def foreach(f: T => Unit): Unit = withScope {
	 val cleanF = sc.clean(f)
	 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

Function Description: traverse each element in RDD in a distributed manner and call the specified function

  @Test
  def foreach(): Unit = {
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    rdd.collect().foreach(println(_))
    println("#######################")
    rdd.foreach(println)
  }

Result display

Jump top

Keywords: Scala Big Data Spark

Added by nascarjunky on Wed, 05 Jan 2022 02:37:28 +0200