Pyspark establishes Spark RDD
- Each RDD can be divided into multiple partitions. Each partition can be regarded as a data set fragment and can be saved to different nodes in the Spark cluster
- RDD itself has fault-tolerant mechanism and is a read-only data structure, which can only generate new RDD through transformation; An RDD can be processed in parallel on multiple machines through partition; Part of the data can be cached in memory and reused for many times; When the memory is insufficient, the data can be dropped onto the disk
- Method of creating RDD
- Parallelize (set, number of partitions)
- range sc.range(1,10,2) start end step
- Establishing RDD using HDFS
pyspark shell
#pyspark shell rdd = sc.parallelize(["hello world","hello spark"]); rdd2 = rdd.flatMap(lambda line:line.split(" ")); rdd3 = rdd2.map(lambda word:(word,1)); rdd5 = rdd3.reduceByKey(lambda a, b : a + b); rdd5.collect(); quit();
VScode
# vscode #pip install findspark #fix:ModuleNotFoundError: No module named 'pyspark' import findspark findspark.init() ############################# from pyspark import SparkConf, SparkContext # Create SparkContext conf = SparkConf().setAppName("WordCount").setMaster("local[*]") sc = SparkContext(conf=conf) rdd = sc.parallelize(["hello world","hello spark"]); rdd2 = rdd.flatMap(lambda line:line.split(" ")); rdd3 = rdd2.map(lambda word:(word,1)); rdd5 = rdd3.reduceByKey(lambda a, b : a + b); #print, otherwise the result cannot be displayed #[('spark', 1), ('hello', 2), ('world', 1)] print(rdd5.collect()); #Prevent multiple creation of SparkContexts sc.stop()
Jupyter notebook
#jupyter from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[*]").appName("WordCount").getOrCreate(); sc = spark.sparkContext rdd = sc.parallelize(["hello world","hello spark"]); rdd2 = rdd.flatMap(lambda line:line.split(" ")); rdd3 = rdd2.map(lambda word:(word,1)); rdd5 = rdd3.reduceByKey(lambda a, b : a + b); #print, otherwise the result cannot be displayed #[('spark', 1), ('hello', 2), ('world', 1)] print(rdd5.collect()); #Prevent multiple creation of SparkContexts sc.stop()
Action operator
collect converts RDD type data into an array and pulls data from the cluster
stats returns the count, mean, variance, maximum and minimum values of RDD elements
countByKey counts the number of each K in RDD[K,V]. Adding one for each same K result does not add the value of V
-
first: returns an element in the RDD
-
max: returns the largest element
-
sum: return and
-
take: return the first n elements
-
top: returns the first n sorted elements in descending order. top(10,key=str): sorts the first 10 elements in dictionary order
-
count: returns the number of
-
collect: convert RDD type data into an array and pull data from the cluster
-
collectAsMap: convert the key value RDD into a Map map Map and retain its key value structure
-
countByKey: count the number of each K in RDD[K,V]. Add one for each same K result instead of adding the value of V
-
countByValue: counts the number of occurrences of each value in an RDD and returns the dictionary. key is the value of the element and value is the number of occurrences/
sc.parallelize(range(2,100)) Equivalent to sc.range(2,100) rdd3 = sc.parallelize([("a",1),("a",1),("b",2),("a",1)]) print(rdd3.countByKey()) #defaultdict(<class 'int'>, {'a': 3, 'b': 1}) print(rdd3.countByValue()) #defaultdict(<class 'int'>, {('a', 1): 3, ('b', 2): 1})
-
stats: returns the count, mean, variance, maximum and minimum values of RDD elements
rdd = sc.parallelize(range(100)) print(rdd.stats()) #(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99, min: 0)
-
Aggregate: aggregate(zeroValue,seqOp,combOp) aggregates the elements on each partition using the seqOP function and the given zeroValue, and then aggregates all partition results with CombOp and zeroValue
data=[1,3,5,7,9,11,13,15,17] rdd=sc.parallelize(data,2) print(rdd.glom().collect()) # [[1, 3, 5, 7], [9, 11, 13, 15, 17]] seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) #Sum number combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) a=rdd.aggregate((0,0),seqOp,combOp) #(81, 9)=(0,0)+(16,4)+(65,5)
Transformation operator
coalesce repartition
filter filtering
map conversion per element
flatmap transforms and flattens each element
mapPartitions convert by partition
mapValues KV format retains k to v operation
reduce the number of; reducebykey KV format reduces the number of elements for v operation
Links within the join; fullOuterJoin full external connection
The return of the groupBy function is grouped as K; K group in groupByKey KV
keys and values obtain the corresponding sequence
zip elements are the same, one-to-one correspondence
union merger; substract subtraction; Intersection; certesian intersection
Cache, persist ent cache
glom viewing partition status
sortBy: sorts RDD elements
-
coalesce: rdd.coalesce(numPartitions,[isShuffle=False]) repartitions the RDD. Is there a shuffling operation in the partitioning process
rdd=sc.parallelize([1, 2, 3, 4, 5], 3).glom() #[[1], [2, 3], [4, 5]] rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6], 3).coalesce(1,False) #[1, 2, 3, 4, 5, 6]
-
Repartition: repartition and shuffle like coalesce(1,True)
-
distinct: de duplication
-
Filter: returns the elements that satisfy the filter function to be True to form a filter(lambda x: x%2 == 0)
#filter rdd5 = sc.parallelize([1,2,3,4,5]).filter(lambda x: x%2 == 0) print(rdd5.collect()) [2,4]
-
Map: each element of RDD is processed according to the logic defined by func. RDD is commonly used in counting the number of words map(func,preservesPartitioning=Flase)
rdd = sc.parallelize(["b", "a", "c", "d"]) rdd2 = rdd.map(lambda x: (x, 1)) #[('b', 1), ('a', 1), ('c', 1), ('d', 1)]
-
flatMap: operate each element in RDD according to the processing logic of func, and flatten the result
#faltMap rdd5 = sc.parallelize([1,2,3,4,5]).flatMap(lambda x:[(x,1)]) print(rdd5.collect()) [(1, 2), (2, 4), (3, 6), (4, 8), (5, 10)]
-
flatMapValues: perform func defined logical processing on the Value in the RDD element format KV pair to form a new KV, and flatten the result
#flatMapValues rdd = sc.parallelize([("a", [1, 2, 3]), ("c", ["w", "m"])]) ret = rdd.flatMapValues(lambda x: x) #[('a', 1), ('a', 2), ('a', 3), ('c', 'w'), ('c', 'm')]
-
mapPartitions: the elements in each RDD partition are returned and processed according to the defined logic and return values respectively
rdd = sc.parallelize([1, 2, 3, 4 , 5], 2) def f(iter): yield sum(iter) #The function of yield is to turn the function into a generator and return an iterable object rdd2 = rdd.mapPartitions(f) print(rdd2.collect()) #[3,12]
-
mapValues: apply a function to each element in the RDD in KV format. The K Value remains unchanged and the original partition is retained. Operate on Value
rdd = sc.parallelize([("a", ["hello", "spark", "!"]), ("b", ["cumt"])]) rdd2 = rdd.mapValues(lambda x:len(x)) #[('a', 3), ('b', 1)]
-
mapPartitionsWithIndex: the elements in each RDD partition are returned and processed according to the defined logic, tracking the index of the original partition
rdd = sc.parallelize([1, 2, 3, 4 ,5 ,6], 3) def f(index, iter): #Partition index 0,1,2 print(index) for x in iter: #1,2;3,4;5,6 print(x) yield index ret = rdd.mapPartitionsWithIndex(f).sum() #3=0+1+2 print(ret)
-
Reduce: calculate RDD elements according to func to reduce the number of elements
rdd = sc.parallelize([1, 2, 3, 4, 5]) ret = rdd.reduce(lambda x,y : x+y) 15
-
reduceByKey: calculate KV data and reduce the number of elements
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b", 3)]) rdd2 = rdd.reduceByKey(lambda x,y:x+y) #[('a', 3), ('b', 4)]
-
join: all pairs of elements containing itself and another matching key. Each pair of elements is returned in (k,(v1,v2)) tuples, where (k,v1) is in itself and (k,v2) is in the other
x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) ret = x.join(y).collect() #[('a', (1, 2)), ('a', (1, 3))]
-
fullOuterJoin: None if all external connections do not match
x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("c", 8)]) rdd = x.fullOuterJoin(y) # [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
-
leftOuterJoin and rightOuterJoin: left outer connection and right outer connection
x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("c", 8)]) rdd = x.leftOuterJoin(y) #[('b', (4, None)), ('a', (1, 2))] rdd = x.rightOuterJoin(y) #[('c', (None, 8)), ('a', (1, 2))]
-
groupBy: the return of the groupBy (func, numpartitions = none, partitionfunc = < function portable_hash) function is taken as the key, and its elements are grouped through the key to return a new RDD
-
rdd = sc.parallelize([1, 2, 3, 4, 5, 10]) rdd = rdd.groupBy(lambda x:x%2) result = rdd.collect() #[(0, <pyspark.resultiterable.ResultIterable object at 0x110ef9c50>), (1, <pyspark.resultiterable.ResultIterable object at 0x110ef94d0>)] ret = sorted([(x, sorted(y)) for (x, y) in result]) #[(0, [2, 4, 10]), (1, [1, 3, 5])]
-
groupByKey: group the values of each key in the RDD into a single sequence, and use numpartitions partition to hash the generated RDD. If the sum or average value is calculated, it is recommended to use reduceByKey or AggregateByKey
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) rdd2 = rdd.groupByKey().mapValues(lambda x: sum(x)) rdd3 = rdd.reduceByKey(lambda x,y: x+y) #Same as rdd2 # [('a', 2), ('b', 1)] print(sorted(rdd2.collect()))
-
keyBy: take the element in the original RDD as the Key, and the Key returns the value through func as the value to create a tuple
rdd = sc.parallelize(range(0,3)) rdd = rdd.keyBy(lambda x: x*x) #[(0, 0), (1, 1), (4, 2)]
-
keys: get the Key sequence in KV format and return a new RDD
rdd1 = sc.parallelize([("a",1),("b",2),("a",3)]) print(rdd1.keys().collect()) #['a', 'b', 'a']
-
values: get the Value sequence in KV format and return a new RDD
rdd1 = sc.parallelize([("a",1),("b",2),("a",3)]) print(rdd1.keys().collect()) #[1, 2, 3]
-
zip: rdd.zip(otherRDD) takes the element in the first RDD as the Key and the element in the second RDD as the Value to form a new RDD. The number of elements in the two RDDS is the same
x = sc.parallelize(range(1,6)) y = sc.parallelize(range(801, 806)) print(x.zip(y).collect()) #[(1, 801), (2, 802), (3, 803), (4, 804), (5, 805)] #x. Y length must be equal
-
zipWithIndex: RDD element as key and index as Value
rdd = sc.parallelize(["a", "b", "c", "d"], 3) print(rdd.zipWithIndex().collect()) #[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
-
union: the combination of the first RDD element and the second
dd =sc.parallelize(range(1,10)) rdd2 =sc.parallelize(range(11,20)) rdd3 = rdd.union(rdd2) #[1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19]
-
subtract: the elements in the second are excluded from the first
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) y = sc.parallelize([("a", 1), ("b", 5)]) z = x.subtract(y) #[('b', 4), ('a', 3)]
-
subtractByKey: remove another RDD whose element is in KV format. As long as the Key is the same, it will be deleted
x = sc.parallelize([("a", 1), ("b", 4), ("c", 5), ("a", 3)]) y = sc.parallelize([("a", 7), ("b", 0)]) z = x.subtractByKey(y) #[('c', 5)]
-
Intersection: return intersection and de duplication
rdd1 = sc.parallelize([("a", 2), ("b", 1), ("a", 2),("b", 3)]) rdd2 = sc.parallelize([("a", 2), ("b", 1), ("e", 5)]) ret = rdd1.intersection(rdd2).collect() #('a', 2), ('b', 1)]
-
certesian: there are many Cartesian product elements of two RDD S returned, which may lead to insufficient memory
rdd = sc.parallelize([1, 2]) rdd2 = sc.parallelize([3, 7]) rdd3 = sorted(rdd.cartesian(rdd2).collect()) #[(1, 3), (1, 7), (2, 3), (2, 7)] print(rdd3)
-
sortBy: sort RDD elements. sortBy(keyfuc,ascending=True,numPartitions=None) is in ascending order by default
rdd = [('a', 6), ('f', 11), ('c', 7), ('d', 4), ('e', 5)] rdd2 = sc.parallelize(rdd).sortBy(lambda x: x[0]) #[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)] rdd3 = sc.parallelize(rdd).sortBy(lambda x: x[1]) #[('f', 2), ('d', 4), ('e', 5), ('a', 6), ('c', 7)] rdd3 = sc.parallelize(rdd).sortBy(lambda x: x[1],False) #[('c', 7), ('a', 6), ('e', 5), ('d', 4), ('f', 2)]
-
sortByKey: sort sortByKey by Key (ascending = true, numpartitions = none, keyfunc =)
x = [('a', 6), ('f', 2), ('c', 7), ('d', 4), ('e', 5)] rdd = sc.parallelize(x).sortByKey(True, 1) #[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)] print(rdd.collect())
-
takeOrdered: the first num elements obtained after sorting in RDD form RDD, which is in ascending order by default and can support optional functions
rdd =sc.parallelize(range(2,100)) print(rdd.takeOrdered(10)) #[2, 3, 4, 5, 6, 7, 8, 9, 10, 11] print(rdd.takeOrdered(10, key=lambda x: -x)) #[99, 98, 97, 96, 95, 94, 93, 92, 91, 90]
-
takeSample: takeSample(withReplacement,num,seed=None) samples sub data sets of fixed size. The first parameter Boolean value indicates whether multiple samples can be taken, the number of samples in the second, and the third random number generator seed
dd =sc.parallelize(range(2,10)) print(rdd.takeSample(True, 20, 1)) #True means that an element can appear more than once #[5, 9, 5, 3, 2, 2, 7, 7, 5, 7, 9, 9, 5, 3, 2, 4, 5, 5, 6, 8] print(rdd.takeSample(False, 20, 1)) #False means that an element can only appear once #[5, 8, 3, 7, 9, 2, 6, 4]
-
Sample: sample (withreplacement, fraction, seed) second parameter sampling proportion [0,1]
rdd = sc.parallelize(range(100), 1) ret = rdd.sample(False, 2, 1) #Possible output [9, 11, 13, 39, 49, 55, 61, 65, 90, 91, 93, 94]
-
randomSplit: randomly segment the RDD according to the weight and return the list composed of multiple RDDS
rdd = sc.parallelize(range(100), 1) rdd1, rdd2 = rdd.randomSplit([2, 3], 10) print(len(rdd1.collect())) #40 print(len(rdd2.collect())) #60
-
loopup: find relevant elements from RDD according to the key value and return V in KV
rdd = sc.parallelize([('a', 'b'), ('c', 'd')]) print(rdd.lookup('a')) #['b']
-
fold: handle each element of RDD according to the logic of func. fold(value,func) func has two parameters: A, b, the initial value of a is value, followed by cumulative value, and b represents the current element value, which can be used to accumulate and multiply
#fold ret=sc.parallelize([1, 2, 3, 4, 5]).fold(0, lambda x,y:x+y) #15 ret=sc.parallelize([1, 2, 3, 4, 5]).fold(1, lambda x,y:x*y) #120
-
foldByKey: performs logical processing defined by func on keys in RDD element format KV pairs, which can be used for grouping, accumulation and multiplication
#foldByKey rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3),("b", 5)]) rdd2=rdd.foldByKey(0, lambda x,y:x+y) # [('a', 4), ('b', 7)] rdd3=rdd.foldByKey(1, lambda x,y:x*y) # [('a', 3), ('b', 10)]
-
foreach: each element of RDD is processed according to the logic defined by func
-
Foreachpartition: the elements in each partition of RDD are processed according to the logic defined by func. Generally speaking, foreachpartition is more efficient than foreach. It processes one partition data at a time, and its performance is much higher than that of map when writing database
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3),("b", 5)]) def f(x): print(x) return (x[0],x[1]*2) def f2(iter): for x in iter: print(x) ret = rdd.foreach(f) ret2 = sc.parallelize([1,2,3,4,5,6,7,8],2).foreachPartition(f2)
-
aggregateByKey: aggregate(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc =) aggregates the elements on each partition using the seqFunc function and the given zeroValue, and then aggregates all partition results with CombFunc and zeroValue
data=[("a",1),("b",2),("a",3),("b",4),("a",5),("b",6),("a",7),("b",8),("a",9),("b",10)] rdd=sc.parallelize(data,2) print(rdd.glom().collect()) #[[('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)], [('b', 6), ('a', 7), ('b', 8), ('a', 9), ('b', 10)]] def seqFunc(x,y): return x + y def combFunc(x,y): return x + y a=rdd.aggregateByKey(0,seqFunc,combFunc) # [('b', 30), ('a', 25)] print(a.collect())
-
combineByKey:
- createCombiner: v = > C this function takes the current value as a parameter, which can be operated and returned
- mergeValue: (C, V) = > C merges element V into the previous element C (this operation is performed in each partition)
- mergeCombiners: (C, c) = > C merges two elements (this operation is performed between different partitions)
a = [1,2] b = [10,11] a.extend(b) #[1, 2, 10, 11] a.append(b) #[1, 2, [10, 11]] #combineByKey rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2) def to_list(a): return [a] def append(a, b): #Partition merge a.append(b) return a def extend(a, b):#Merging different partitions a.extend(b) return a print(rdd.glom().collect()) ret = sorted(rdd.combineByKey(to_list, append, extend).collect()) #[[('a', 1), ('b', 3)], [('a', 2), ('b', 4)]] #[('a', [1, 2]), ('b', [3, 4])]
-
glom: convert the element T of each partition in RDD into Array[T], and each partition has only one array element
#glom rdd2 = sc.parallelize([1,2,3,4,5],3) print(rdd2.collect()) #[1, 2, 3, 4, 5] print(rdd2.glom().collect()) #[[1], [2, 3], [4, 5]] print(rdd2.coalesce(1).glom().collect()) #[[1, 2, 3, 4, 5]]
-
Cache: default storage level of cache (MEMORY_ONLY)
-
persist: the cache can customize the storage level storageLevel
-
saveAsTextFile: save the RDD file as an object,