Python big data processing library PySpark Practice II

Pyspark establishes Spark RDD

  • Each RDD can be divided into multiple partitions. Each partition can be regarded as a data set fragment and can be saved to different nodes in the Spark cluster
  • RDD itself has fault-tolerant mechanism and is a read-only data structure, which can only generate new RDD through transformation; An RDD can be processed in parallel on multiple machines through partition; Part of the data can be cached in memory and reused for many times; When the memory is insufficient, the data can be dropped onto the disk
  • Method of creating RDD
    • Parallelize (set, number of partitions)
    • range sc.range(1,10,2) start end step
    • Establishing RDD using HDFS

pyspark shell

 #pyspark shell
 rdd = sc.parallelize(["hello world","hello spark"]);
 rdd2 = rdd.flatMap(lambda line:line.split(" "));
 rdd3 = rdd2.map(lambda word:(word,1));
 rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
 rdd5.collect();
 quit();

VScode

 # vscode
 #pip install findspark
 #fix:ModuleNotFoundError: No module named 'pyspark'
 import findspark
 findspark.init()
 
 #############################
 from pyspark import SparkConf, SparkContext
 
 # Create SparkContext
 conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
 sc = SparkContext(conf=conf)
  
 rdd = sc.parallelize(["hello world","hello spark"]);
 rdd2 = rdd.flatMap(lambda line:line.split(" "));
 rdd3 = rdd2.map(lambda word:(word,1));
 rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
 #print, otherwise the result cannot be displayed
 #[('spark', 1), ('hello', 2), ('world', 1)]
 print(rdd5.collect());
 #Prevent multiple creation of SparkContexts
 sc.stop()

Jupyter notebook

 #jupyter
 from pyspark.sql import SparkSession
 spark = SparkSession.builder.master("local[*]").appName("WordCount").getOrCreate();
 sc = spark.sparkContext
 rdd = sc.parallelize(["hello world","hello spark"]);
 rdd2 = rdd.flatMap(lambda line:line.split(" "));
 rdd3 = rdd2.map(lambda word:(word,1));
 rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
 #print, otherwise the result cannot be displayed
 #[('spark', 1), ('hello', 2), ('world', 1)]
 print(rdd5.collect());
 #Prevent multiple creation of SparkContexts
 sc.stop()

Action operator

collect converts RDD type data into an array and pulls data from the cluster

stats returns the count, mean, variance, maximum and minimum values of RDD elements

countByKey counts the number of each K in RDD[K,V]. Adding one for each same K result does not add the value of V

  • first: returns an element in the RDD

  • max: returns the largest element

  • sum: return and

  • take: return the first n elements

  • top: returns the first n sorted elements in descending order. top(10,key=str): sorts the first 10 elements in dictionary order

  • count: returns the number of

  • collect: convert RDD type data into an array and pull data from the cluster

  • collectAsMap: convert the key value RDD into a Map map Map and retain its key value structure

  • countByKey: count the number of each K in RDD[K,V]. Add one for each same K result instead of adding the value of V

  • countByValue: counts the number of occurrences of each value in an RDD and returns the dictionary. key is the value of the element and value is the number of occurrences/

    sc.parallelize(range(2,100)) Equivalent to sc.range(2,100)
    
    rdd3 = sc.parallelize([("a",1),("a",1),("b",2),("a",1)])
    print(rdd3.countByKey())
    #defaultdict(<class 'int'>, {'a': 3, 'b': 1})
    print(rdd3.countByValue())
    #defaultdict(<class 'int'>, {('a', 1): 3, ('b', 2): 1})
    
  • stats: returns the count, mean, variance, maximum and minimum values of RDD elements

    rdd = sc.parallelize(range(100))
    print(rdd.stats())
    #(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99, min: 0)
    
  • Aggregate: aggregate(zeroValue,seqOp,combOp) aggregates the elements on each partition using the seqOP function and the given zeroValue, and then aggregates all partition results with CombOp and zeroValue

    data=[1,3,5,7,9,11,13,15,17]
    rdd=sc.parallelize(data,2)
    print(rdd.glom().collect()) 
    # [[1, 3, 5, 7], [9, 11, 13, 15, 17]]
    seqOp = (lambda x, y: (x[0] + y, x[1] + 1))  #Sum number
    combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
    a=rdd.aggregate((0,0),seqOp,combOp)
    #(81, 9)=(0,0)+(16,4)+(65,5)
    

Transformation operator

coalesce repartition
filter filtering
map conversion per element
flatmap transforms and flattens each element
mapPartitions convert by partition
mapValues KV format retains k to v operation
reduce the number of; reducebykey KV format reduces the number of elements for v operation
Links within the join; fullOuterJoin full external connection
The return of the groupBy function is grouped as K; K group in groupByKey KV
keys and values obtain the corresponding sequence
zip elements are the same, one-to-one correspondence
union merger; substract subtraction; Intersection; certesian intersection
Cache, persist ent cache
glom viewing partition status
sortBy: sorts RDD elements

  • coalesce: rdd.coalesce(numPartitions,[isShuffle=False]) repartitions the RDD. Is there a shuffling operation in the partitioning process

    rdd=sc.parallelize([1, 2, 3, 4, 5], 3).glom()
    #[[1], [2, 3], [4, 5]]
    rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6], 3).coalesce(1,False)
    #[1, 2, 3, 4, 5, 6]
    
  • Repartition: repartition and shuffle like coalesce(1,True)

  • distinct: de duplication

  • Filter: returns the elements that satisfy the filter function to be True to form a filter(lambda x: x%2 == 0)

    #filter
    rdd5 = sc.parallelize([1,2,3,4,5]).filter(lambda x: x%2 == 0)
    print(rdd5.collect())
    [2,4]
    
  • Map: each element of RDD is processed according to the logic defined by func. RDD is commonly used in counting the number of words map(func,preservesPartitioning=Flase)

    rdd = sc.parallelize(["b", "a", "c", "d"])
    rdd2 = rdd.map(lambda x: (x, 1))
    #[('b', 1), ('a', 1), ('c', 1), ('d', 1)]
    
  • flatMap: operate each element in RDD according to the processing logic of func, and flatten the result

    #faltMap
    rdd5 = sc.parallelize([1,2,3,4,5]).flatMap(lambda x:[(x,1)])
    print(rdd5.collect())
    [(1, 2), (2, 4), (3, 6), (4, 8), (5, 10)]
    
  • flatMapValues: perform func defined logical processing on the Value in the RDD element format KV pair to form a new KV, and flatten the result

    #flatMapValues
    rdd = sc.parallelize([("a", [1, 2, 3]), ("c", ["w", "m"])])
    ret = rdd.flatMapValues(lambda x: x)
    #[('a', 1), ('a', 2), ('a', 3), ('c', 'w'), ('c', 'm')]
    
  • mapPartitions: the elements in each RDD partition are returned and processed according to the defined logic and return values respectively

    rdd = sc.parallelize([1, 2, 3, 4 , 5], 2)
    def f(iter): 
        yield sum(iter) #The function of yield is to turn the function into a generator and return an iterable object
    
    rdd2 = rdd.mapPartitions(f)
    print(rdd2.collect())
    #[3,12]
    
  • mapValues: apply a function to each element in the RDD in KV format. The K Value remains unchanged and the original partition is retained. Operate on Value

    rdd = sc.parallelize([("a", ["hello", "spark", "!"]), ("b", ["cumt"])])
    rdd2 = rdd.mapValues(lambda x:len(x))
    #[('a', 3), ('b', 1)]
    
  • mapPartitionsWithIndex: the elements in each RDD partition are returned and processed according to the defined logic, tracking the index of the original partition

    rdd = sc.parallelize([1, 2, 3, 4 ,5 ,6], 3)
    def f(index, iter): 
      #Partition index 0,1,2
      print(index)
      for x in iter:
        #1,2;3,4;5,6
        print(x)
        yield index
    ret = rdd.mapPartitionsWithIndex(f).sum()
    #3=0+1+2
    print(ret)	
    
  • Reduce: calculate RDD elements according to func to reduce the number of elements

    rdd = sc.parallelize([1, 2, 3, 4, 5])
    ret = rdd.reduce(lambda x,y : x+y)
    15
    
  • reduceByKey: calculate KV data and reduce the number of elements

    rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b", 3)])
    rdd2 = rdd.reduceByKey(lambda x,y:x+y)
    #[('a', 3), ('b', 4)]
    
  • join: all pairs of elements containing itself and another matching key. Each pair of elements is returned in (k,(v1,v2)) tuples, where (k,v1) is in itself and (k,v2) is in the other

    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("a", 3)])
    ret = x.join(y).collect()
    #[('a', (1, 2)), ('a', (1, 3))]
    
  • fullOuterJoin: None if all external connections do not match

    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("c", 8)])
    rdd = x.fullOuterJoin(y)
    # [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
    
  • leftOuterJoin and rightOuterJoin: left outer connection and right outer connection

    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("c", 8)])
    rdd = x.leftOuterJoin(y)
    #[('b', (4, None)), ('a', (1, 2))]
    rdd = x.rightOuterJoin(y)
    #[('c', (None, 8)), ('a', (1, 2))]
    
  • groupBy: the return of the groupBy (func, numpartitions = none, partitionfunc = < function portable_hash) function is taken as the key, and its elements are grouped through the key to return a new RDD

  • rdd = sc.parallelize([1, 2, 3, 4, 5, 10])
    rdd = rdd.groupBy(lambda x:x%2)
    result = rdd.collect()
    #[(0, <pyspark.resultiterable.ResultIterable object at 0x110ef9c50>), (1, <pyspark.resultiterable.ResultIterable object at 0x110ef94d0>)]
    ret = sorted([(x, sorted(y)) for (x, y) in result])
    #[(0, [2, 4, 10]), (1, [1, 3, 5])]
    
  • groupByKey: group the values of each key in the RDD into a single sequence, and use numpartitions partition to hash the generated RDD. If the sum or average value is calculated, it is recommended to use reduceByKey or AggregateByKey

    rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    rdd2 = rdd.groupByKey().mapValues(lambda x: sum(x))
    rdd3 = rdd.reduceByKey(lambda x,y: x+y) #Same as rdd2
    # [('a', 2), ('b', 1)]
    print(sorted(rdd2.collect()))
    
  • keyBy: take the element in the original RDD as the Key, and the Key returns the value through func as the value to create a tuple

    rdd = sc.parallelize(range(0,3))
    rdd = rdd.keyBy(lambda x: x*x)
    #[(0, 0), (1, 1), (4, 2)]
    
  • keys: get the Key sequence in KV format and return a new RDD

    rdd1 = sc.parallelize([("a",1),("b",2),("a",3)])
    print(rdd1.keys().collect())
    #['a', 'b', 'a']
    
  • values: get the Value sequence in KV format and return a new RDD

    rdd1 = sc.parallelize([("a",1),("b",2),("a",3)])
    print(rdd1.keys().collect())
    #[1, 2, 3]
    
  • zip: rdd.zip(otherRDD) takes the element in the first RDD as the Key and the element in the second RDD as the Value to form a new RDD. The number of elements in the two RDDS is the same

    x = sc.parallelize(range(1,6))
    y = sc.parallelize(range(801, 806))
    print(x.zip(y).collect())
    #[(1, 801), (2, 802), (3, 803), (4, 804), (5, 805)]
    #x. Y length must be equal
    
  • zipWithIndex: RDD element as key and index as Value

    rdd = sc.parallelize(["a", "b", "c", "d"], 3)
    print(rdd.zipWithIndex().collect())
    #[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
    
  • union: the combination of the first RDD element and the second

    dd =sc.parallelize(range(1,10))
    rdd2 =sc.parallelize(range(11,20))
    rdd3 = rdd.union(rdd2)
    #[1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    
  • subtract: the elements in the second are excluded from the first

    x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
    y = sc.parallelize([("a", 1), ("b", 5)])
    z = x.subtract(y)
    #[('b', 4), ('a', 3)]
    
  • subtractByKey: remove another RDD whose element is in KV format. As long as the Key is the same, it will be deleted

    x = sc.parallelize([("a", 1), ("b", 4), ("c", 5), ("a", 3)])
    y = sc.parallelize([("a", 7), ("b", 0)])
    z = x.subtractByKey(y)
    #[('c', 5)]
    
  • Intersection: return intersection and de duplication

    rdd1 = sc.parallelize([("a", 2), ("b", 1), ("a", 2),("b", 3)])
    rdd2 = sc.parallelize([("a", 2), ("b", 1), ("e", 5)])
    ret = rdd1.intersection(rdd2).collect()
    #('a', 2), ('b', 1)]
    
  • certesian: there are many Cartesian product elements of two RDD S returned, which may lead to insufficient memory

    rdd = sc.parallelize([1, 2])
    rdd2 = sc.parallelize([3, 7])
    rdd3 = sorted(rdd.cartesian(rdd2).collect())
    #[(1, 3), (1, 7), (2, 3), (2, 7)]
    print(rdd3)
    
  • sortBy: sort RDD elements. sortBy(keyfuc,ascending=True,numPartitions=None) is in ascending order by default

    rdd = [('a', 6), ('f', 11), ('c', 7), ('d', 4), ('e', 5)]
    rdd2 = sc.parallelize(rdd).sortBy(lambda x: x[0])
    #[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
    rdd3 = sc.parallelize(rdd).sortBy(lambda x: x[1])
    #[('f', 2), ('d', 4), ('e', 5), ('a', 6), ('c', 7)]
    rdd3 = sc.parallelize(rdd).sortBy(lambda x: x[1],False)
    #[('c', 7), ('a', 6), ('e', 5), ('d', 4), ('f', 2)]
    
  • sortByKey: sort sortByKey by Key (ascending = true, numpartitions = none, keyfunc =)

    x = [('a', 6), ('f', 2), ('c', 7), ('d', 4), ('e', 5)]
    rdd = sc.parallelize(x).sortByKey(True, 1)
    #[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
    print(rdd.collect())
    
  • takeOrdered: the first num elements obtained after sorting in RDD form RDD, which is in ascending order by default and can support optional functions

    rdd =sc.parallelize(range(2,100))
    print(rdd.takeOrdered(10))
    #[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
    print(rdd.takeOrdered(10, key=lambda x: -x))
    #[99, 98, 97, 96, 95, 94, 93, 92, 91, 90]
    
  • takeSample: takeSample(withReplacement,num,seed=None) samples sub data sets of fixed size. The first parameter Boolean value indicates whether multiple samples can be taken, the number of samples in the second, and the third random number generator seed

    dd =sc.parallelize(range(2,10))
    print(rdd.takeSample(True, 20, 1))
    #True means that an element can appear more than once
    #[5, 9, 5, 3, 2, 2, 7, 7, 5, 7, 9, 9, 5, 3, 2, 4, 5, 5, 6, 8]
    print(rdd.takeSample(False, 20, 1))
    #False means that an element can only appear once
    #[5, 8, 3, 7, 9, 2, 6, 4]
    
  • Sample: sample (withreplacement, fraction, seed) second parameter sampling proportion [0,1]

    rdd = sc.parallelize(range(100), 1)
    ret = rdd.sample(False, 2, 1)
    #Possible output [9, 11, 13, 39, 49, 55, 61, 65, 90, 91, 93, 94]
    
  • randomSplit: randomly segment the RDD according to the weight and return the list composed of multiple RDDS

    rdd = sc.parallelize(range(100), 1)
    rdd1, rdd2 = rdd.randomSplit([2, 3], 10)
    print(len(rdd1.collect())) #40
    print(len(rdd2.collect())) #60
    
  • loopup: find relevant elements from RDD according to the key value and return V in KV

    rdd = sc.parallelize([('a', 'b'), ('c', 'd')])
    print(rdd.lookup('a')) #['b']
    
  • fold: handle each element of RDD according to the logic of func. fold(value,func) func has two parameters: A, b, the initial value of a is value, followed by cumulative value, and b represents the current element value, which can be used to accumulate and multiply

    #fold
    ret=sc.parallelize([1, 2, 3, 4, 5]).fold(0, lambda x,y:x+y)
    #15
    ret=sc.parallelize([1, 2, 3, 4, 5]).fold(1, lambda x,y:x*y)
    #120
    
  • foldByKey: performs logical processing defined by func on keys in RDD element format KV pairs, which can be used for grouping, accumulation and multiplication

    #foldByKey
    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3),("b", 5)])
    rdd2=rdd.foldByKey(0, lambda x,y:x+y)
    # [('a', 4), ('b', 7)]
    rdd3=rdd.foldByKey(1, lambda x,y:x*y)
    # [('a', 3), ('b', 10)]
    
  • foreach: each element of RDD is processed according to the logic defined by func

  • Foreachpartition: the elements in each partition of RDD are processed according to the logic defined by func. Generally speaking, foreachpartition is more efficient than foreach. It processes one partition data at a time, and its performance is much higher than that of map when writing database

    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3),("b", 5)])
    def f(x):
        print(x)
        return (x[0],x[1]*2)
        
    def f2(iter):
        for x in iter:
            print(x)
            
    ret = rdd.foreach(f)
    ret2 = sc.parallelize([1,2,3,4,5,6,7,8],2).foreachPartition(f2)
    
  • aggregateByKey: aggregate(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc =) aggregates the elements on each partition using the seqFunc function and the given zeroValue, and then aggregates all partition results with CombFunc and zeroValue

    data=[("a",1),("b",2),("a",3),("b",4),("a",5),("b",6),("a",7),("b",8),("a",9),("b",10)]
    rdd=sc.parallelize(data,2)
    print(rdd.glom().collect())
    #[[('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)], [('b', 6), ('a', 7), ('b', 8), ('a', 9), ('b', 10)]]
    def seqFunc(x,y):
    	return x + y
    def combFunc(x,y):
    	return x + y
    a=rdd.aggregateByKey(0,seqFunc,combFunc)
    # [('b', 30), ('a', 25)]
    print(a.collect())
    
  • combineByKey:

    • createCombiner: v = > C this function takes the current value as a parameter, which can be operated and returned
    • mergeValue: (C, V) = > C merges element V into the previous element C (this operation is performed in each partition)
    • mergeCombiners: (C, c) = > C merges two elements (this operation is performed between different partitions)
    a = [1,2]
    b = [10,11]
    a.extend(b) #[1, 2, 10, 11]
    a.append(b) #[1, 2, [10, 11]]
    
    #combineByKey
    rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2)
    def to_list(a):
        return [a]
    def append(a, b): #Partition merge
        a.append(b)
        return a
    def extend(a, b):#Merging different partitions
        a.extend(b)
        return a
    print(rdd.glom().collect())
    ret = sorted(rdd.combineByKey(to_list, append, extend).collect())
    #[[('a', 1), ('b', 3)], [('a', 2), ('b', 4)]]
    #[('a', [1, 2]), ('b', [3, 4])]  
    
  • glom: convert the element T of each partition in RDD into Array[T], and each partition has only one array element

    #glom
    rdd2 = sc.parallelize([1,2,3,4,5],3)
    print(rdd2.collect())
    #[1, 2, 3, 4, 5]
    print(rdd2.glom().collect())
    #[[1], [2, 3], [4, 5]]
    print(rdd2.coalesce(1).glom().collect())
    #[[1, 2, 3, 4, 5]]
    
  • Cache: default storage level of cache (MEMORY_ONLY)

  • persist: the cache can customize the storage level storageLevel

  • saveAsTextFile: save the RDD file as an object,

Keywords: Python Database Big Data Spark Pyspark

Added by pete07920 on Sun, 30 Jan 2022 16:23:19 +0200