PySpark Feature Tool
1. Data preparation
We define some test data to verify the effectiveness of the function; At the same time, for most beginners, if they understand what the input and output of the function are, they can better understand the characteristic function and use characteristics:
df = spark.createDataFrame([ ('zhu', "Hi I heard about pySpark"), ('xiang', "I wish python could use case classes"), ('yu', "Logistic regression models are neat") ], ["id", "sentence"]) # functionTestData +-----+------------------------------------+ |id |sentence | +-----+------------------------------------+ |zhu |Hi I heard about pySpark. | |xiang|I wish python could use case classes| |yu |Logistic regression models are neat | +-----+------------------------------------+
2. Data reading
# !/usr/bin/env python # -*- coding: utf-8 -*- ######################################################################################################################## # Creater : Zhu Xiangyu.DOTA # Creation Time : 2020-2-17 12:45:09 # Description: pyspark Feature Engineering Toolset # Modify By : # Modify Time : # Modify Content : # Script Version : 2.0.0.9 ######################################################################################################################## import math from pyspark.sql import SparkSession spark = SparkSession.builder.appName('DOTAd_Features_Tool').enableHiveSupport().getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", 1000) spark.conf.set("spark.default.parallelism", 2000) def get_params(): return { # Function Can be Used 'column1' : "TFIDF", # Word frequency - reverse file frequency 'column2' : "Word2Vec", 'column3' : "CountVectorizer", 'column4' : "OneHotEncoder", 'column5' : "StringIndexer", 'column6' : "IndexToString", 'column7' : "PCA", 'column8' : "Binarizer", 'column9' : "Tokenizer", 'column10': "StopWordsRemover", # 'column11': "NGram", # 'column12': "DCT", # discrete cosine transform 'column13': "ChiSqSelector", # Chi square check 'column14': "PearsonCorr", # Pearson coefficient } def main(): # Reset params ###################################################################################### # # Library name Table name dataset_Name = "" dataset = spark.sql("select * from {dataset_Name}".format(dataset_Name = dataset_Name)).fillna(0) # # Result store target library name Table name saveAsTable_Name = "" # # Specify function operation on column col {col:function} params = {'sentence': "TFIDF"} # ###################################################################################### # # functionTestData df = spark.createDataFrame([ ('zhu', "Hi I heard about pySpark"), ('xiang', "I wish python could use case classes"), ('yu', "Logistic regression models are neat") ], ["id", "sentence"]) # Feature Transform features = featureTool(dataset,params) # Test-Model : dataset = df features.show(5) # Save Feature as table saveResult(features,saveAsTable_Name)
3. Data storage
# SaveTableAs def saveResult(res,saveAsTable_Name='dota_tmp.dota_features_tool_save_result', saveFormat="orc",saveMode="overwrite"): res.write.saveAsTable(name=saveAsTable_Name, format=saveFormat,mode=saveMode)
4. Characteristic function
def featureTool(df,params): dataCols,targetCols = df.columns,params.keys() exeColumns = list(params.keys())[0] exeDefFunction = params[exeColumns] print(exeColumns+"-->"+exeDefFunction+"(df,{exeColumns})".format(exeColumns=exeColumns)) exeOrder = "feat={exeDef}(df,'{exeCols}','{outputCol}')".format(exeCols=exeColumns, exeDef=exeDefFunction, outputCol=\ exeDefFunction+'_'+exeColumns) print("exeOrder : "+exeOrder) exec(exeOrder) return feat
4.1 TFIDF
The weight calculation method is often used in the vector space model together with cosine similarity to judge the similarity between two documents. At present, TF IDF model is widely used in practical applications such as search engines. The main idea of TF IDF model is that if the word w appears frequently in one document D and rarely in other documents, it is considered that the word w has good discrimination ability and is suitable to distinguish article d from other articles.
def TFIDF(df,inputCol="sentence",outputCol="tfidf", numFeatures=20): """ word frequency-Reverse file frequency( TF-IDF)It is a feature vectorization method widely used in text mining. It can reflect the importance of words in a document in the corpus. # Conclusion: the more a word appears in an article and the less it appears in all documents, the more it can represent the article """ from pyspark.ml.feature import HashingTF, IDF, Tokenizer tokenizerX = Tokenizer(inputCol=inputCol, outputCol="words") wordsDataX = tokenizerX.transform(df) hashingTFX = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=numFeatures) featurizedData = hashingTFX.transform(wordsDataX) idfX = IDF(inputCol="rawFeatures", outputCol=outputCol) idfModel = idfX.fit(featurizedData) tfidfRes = idfModel.transform(featurizedData).drop('words','rawFeatures') return tfidfRes
The output results of the above codes are as follows:
TFIDF Output +-----+------------------------------------+---------------------------------------------------------------------------+ |id |sentence |tfidf | +-----+------------------------------------+---------------------------------------------------------------------------+ |zhu |Hi I heard about pySpark |(20,[0,9,17],[0.693147189,0.5753641037,1.3862943611198906]) | |xiang|I wish python could use case classes|(20,[2,9,13,15],[0.69314853,1.150720234,0.2851785,0.2876072175]) | |yu |Logistic regression models are neat |(20,[4,6,13,15,18],[0.69314,0.6931474,0.2876820785,0.28785,0.69349453]) | +-----+------------------------------------+---------------------------------------------------------------------------+
4.2 Word2Vec
word2vec model is actually a simplified neural network, which can vectorize text data. Word vector has good semantic characteristics and is a common way to express word features. The value of each dimension of word vector represents a feature with certain semantic and grammatical interpretation. Therefore, each dimension of the word vector can be called a word feature. Word vectors have many forms, and distributed representation is one of them. A distributed representation is a dense, low dimensional real valued vector. Each dimension of distributed representation represents a potential feature of the word, which captures useful syntactic and semantic features. It can be seen that the word distributed in distributed representation embodies the feature of word vector: the different syntactic and semantic features of words are distributed to each dimension.
def Word2Vec(df,inputCol="sentence",outputCol="w2v",vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=None, windowSize=5, maxSentenceLength=1000): """ Word2vec: take word Convert to vector,word It is a sequential and meaningful entity, such as words in the document and goods clicked by the user in turn. Word2vec The entity vector can be used to measure the similarity between entities. On this basis, the following directions can be applied: classification,clustering,recommend,Sentence vector,Short text classification. # # Two implementation methods # Skip gram: use a word as input to predict the context around it. # CBOW: use the context of a word as input to predict the word itself. # Spark of Word2vec It's a Estimator,It uses a series of words to represent the document word2vecmodel. [Spark What is achieved is Skip-gram Model] The model maps each word to a fixed size vector. word2vecmodel Use the average number of each word in the document to convert the document into a vector, Then this vector can be used as the predicted feature to calculate the document similarity calculation and so on. """ from pyspark.ml.feature import Word2Vec from pyspark.sql.functions import split # Input data: Each row is a bag of words from a sentence or document. df = df.withColumn("words",split(df[inputCol],' ')) word2VecX = Word2Vec( vectorSize = vectorSize, minCount = minCount, inputCol = "words", outputCol = outputCol, numPartitions = numPartitions, stepSize = stepSize, maxIter = maxIter, seed = seed, windowSize = windowSize, maxSentenceLength = maxSentenceLength ) w2vModel = word2VecX.fit(df) w2vRes = w2vModel.transform(df).drop('words') return w2vRes
The output results of the above codes are as follows:
Word2Vec Output +-----+--------------------+--------------------+ | id| sentence| w2v| +-----+--------------------+--------------------+ | zhu|Hi I heard about ...|[0.08936496693640...| |xiang|I wish python cou...|[7.36715538161141...| | yu|Logistic regressi...|[-0.0063562680035...| +-----+--------------------+--------------------+
4.3 CountVectorizer
Countvectorizer is designed to convert a document into a vector by counting.
def CountVectorizer(df,inputCol="sentence",outputCol="cv",vectorSize=200000, minCount=1.0): """ Countvectorizer The purpose is to convert a document into a vector by counting. When there is no a priori dictionary, Countvectorizer Can be used as Estimator To extract words and generate a Countvectorizermodel. The model produces a sparse representation of words in documents, which can be passed to other algorithms, such as LDA. # stay fitting In the process, countvectorizer The top words will be selected according to the word frequency in the corpus vocabsize A word. An optional parameter minDF Also affect fitting In the process, it specifies the minimum number of occurrences of words in the vocabulary in the document. Another optional binary parameter controls the output vector. If set to true, all non-zero counts are 1. This is very useful for binary discrete probability models. """ from pyspark.ml.feature import CountVectorizer from pyspark.sql.functions import split df = df.withColumn("words",split(df[inputCol],' ')) CountVectorizerX = CountVectorizer(inputCol="words", outputCol=outputCol, vocabSize=vectorSize, minDF=minCount) cvModelX = CountVectorizerX.fit(df) cvRes = cvModelX.transform(df).drop('words') return cvRes
The output results of the above codes are as follows:
CountVectorizer Output +-----+------------------------------------+----------------------------------------------------+ |id |sentence |cv | +-----+------------------------------------+----------------------------------------------------+ |zhu |Hi I heard about pySpark |(16,[0,2,4,12,13],[1.0,1.0,1.0,1.0,1.0]) | |xiang|I wish python could use case classes|(16,[0,3,5,6,8,10,14],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])| |yu |Logistic regression models are neat |(16,[1,7,9,11,15],[1.0,1.0,1.0,1.0,1.0]) | +-----+------------------------------------+----------------------------------------------------+
4.4 OneHotEncoder
Map a category feature to a binary vector with only one valid value (1 and the rest 0).
def OneHotEncoder(df,inputCol="category",outputCol="categoryVec"): """ Map a category feature to a binary vector with only one valid value(Is 1 and the rest is 0). """ from pyspark.ml.feature import OneHotEncoder, StringIndexer stringIndexerX = StringIndexer(inputCol=inputCol, outputCol="categoryIndex") modelX = stringIndexerX.fit(df) indexed = modelX.transform(df) encoderX = OneHotEncoder(inputCol="categoryIndex", outputCol=outputCol) encodedX = encoderX.transform(indexed).drop("categoryIndex") return encodedX
The output results of the above codes are as follows:
OneHotEncoder Output +-------+--------+-------------+-------------+ | id|category|categoryIndex| categoryVec| +-------+--------+-------------+-------------+ | zhu| a| 0.0|(2,[0],[1.0])| |xiangyu| b| 2.0| (2,[],[])| | yu| c| 1.0|(2,[1],[1.0])| | is| a| 0.0|(2,[0],[1.0])| | coming| a| 0.0|(2,[0],[1.0])| | now| c| 1.0|(2,[1],[1.0])| +-------+--------+-------------+-------------+
4.5 StringIndexer
The tags are indexed, and the index values are sorted according to the frequency of occurrence of the tags.
def StringIndexer(df,inputCol="category",outputCol="categoryVec"): """ The tags are indexed, and the index values are sorted according to the frequency of occurrence of the tags. """ from pyspark.ml.feature import StringIndexer indexerX = StringIndexer(inputCol=inputCol, outputCol=outputCol) indexedX = indexerX.fit(df).transform(df) return indexedX
The output results of the above codes are as follows:
StringIndexer Output +-----+--------------------+-----------+ | id| sentence|categoryVec| +-----+--------------------+-----------+ | zhu|Hi I heard about ...| 2.0| |xiang|I wish python cou...| 0.0| | yu|Logistic regressi...| 1.0| +-----+--------------------+-----------+
4.6 IndexToString
Corresponding to StringIndexer, IndexToString restores the indexed label to the original string.
def IndexToString(df,inputCol="categoryVec",outputCol="category"): """ And StringIndexer corresponding, IndexToString Restores the indexed label to the original string. """ from pyspark.ml.feature import IndexToString converterX = IndexToString(inputCol=inputCol, outputCol=outputCol) convertedX = converterX.transform(df) return convertedX
The output results of the above codes are as follows:
IndexToString Output IndexToString(StringIndexer(df,"sentence")) +-----+--------------------+-----------+--------------------+ | id| sentence|categoryVec| category| +-----+--------------------+-----------+--------------------+ | zhu|Hi I heard about ...| 2.0|Hi I heard about ...| |xiang|I wish python cou...| 0.0|I wish python cou...| | yu|Logistic regressi...| 1.0|Logistic regressi...| +-----+--------------------+-----------+--------------------+
4.7 PCA
Principal component analysis is a statistical method for data rotation transformation. Its essence is to carry out a base transformation in linear space to maximize the variance of the transformed data projected on a new set of "coordinate axes". Then, cut out the "coordinate axes" with small variance after transformation, and the remaining new "coordinate axes" are called principal components, They can represent the nature of the original data as much as possible in a lower dimensional subspace.
PCA Input +--------------------+ | features| +--------------------+ | (5,[1,3],[1.0,7.0])| |[2.0,0.0,3.0,4.0,...| |[4.0,0.0,0.0,6.0,...| +--------------------+
The output results of the above codes are as follows:
def PCA(df,vectorSize=3, inputCol="features", outputCol="pcaFeatures"): """ Principal component analysis is a statistical method for rotating data, and its essence is a base transformation in linear space, Make the transformed data projected on a new set of"Coordinate axis"Maximization of variance on, Then, the small variance after transformation is cut out"Coordinate axis",The rest is new"Coordinate axis"That is, it is called the main component, They can represent the nature of the original data as much as possible in a lower dimensional subspace. """ from pyspark.ml.feature import PCA pcaX = PCA(k=vectorSize, inputCol=inputCol, outputCol=outputCol) modelX = pcaX.fit(df) pcaRes = modelX.transform(df) return pcaRes
The output results of the above codes are as follows:
PCA Output +-----------------------------------------------------------+ |pcaFeatures | +-----------------------------------------------------------+ |[1.6485728230883807,-4.013282700516296,-5.524543751369388] | |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]| |[-6.428880535676489,-5.337951427775355,-5.524543751369389] | +-----------------------------------------------------------+
4.8 Binarizer
Convert the numerical eigenvalue into binary (0 / 1) output, and set a threshold. The output greater than the threshold is 1 and the output less than the threshold is 0.
def Binarizer(df,threshold=0.5, inputCol="feature", outputCol="binarized_feature"): """ Convert numerical eigenvalues into binary(0/1)Output: set a threshold value. The output greater than the threshold value is 1 and the output less than the threshold value is 0 """ from pyspark.ml.feature import Binarizer binarizerX = Binarizer(threshold=threshold, inputCol=inputCol, outputCol=outputCol) binarizedX = binarizerX.transform(df) return binarizedX
The output results of the above codes are as follows:
Binarizer Output +---+-------+-----------------+ | id|feature|binarized_feature| +---+-------+-----------------+ | 0| 0.1| 0.0| | 1| 0.8| 1.0| | 2| 0.2| 0.0| +---+-------+-----------------+
4.9 Tokenizer
Word splitter: provides default word segmentation and regular expression word segmentation
def Tokenizer(df,inputCol="sentence", outputCol="words", pattern="\\W"): """ Tokenizer :Default word segmentation and regular expression word segmentation are provided """ from pyspark.ml.feature import RegexTokenizer regexTokenizer = RegexTokenizer(inputCol=inputCol, outputCol=outputCol, pattern=pattern) regexTokenized = regexTokenizer.transform(df) return regexTokenized
The output results of the above codes are as follows:
Tokenizer Output +-----------------------------------+------------------------------------------+------+ |sentence |words |tokens| +-----------------------------------+------------------------------------------+------+ |Hi I heard about Spark |[hi, i, heard, about, spark] |5 | |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 | |Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5 | +-----------------------------------+------------------------------------------+------+
4.10 StopWordsRemover
Stop word filtering
def StopWordsRemover(df,inputCol="words", outputCol="words2",add_stopwords=[]): """ Stop word filtering """ from pyspark.ml.feature import StopWordsRemover remover = StopWordsRemover(inputCol=inputCol, outputCol=outputCol).setStopWords(add_stopwords) # Add stop words # remover = remover.setStopWords(Array("saw","Mary")) removed = remover.transform(df) return removed
The output results of the above codes are as follows:
StopWordsRemover Output +---+----------------------------+--------------------+ |id |words |words2 | +---+----------------------------+--------------------+ |0 |[I, saw, the, red, balloon] |[saw, red, balloon] | |1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]| +---+----------------------------+--------------------+
4.11 NGram
Turn words into continuous words for output.
def NGram(df,n=2, inputCol="words", outputCol="ngrams"): """ Turn words into continuous words for output """ from pyspark.ml.feature import NGram ngram = NGram(n=2, inputCol=inputCol, outputCol=outputCol) ngramDF = ngram.transform(df) return ngramDF
The output results of the above codes are as follows:
NGram Output +---+--------------------------------------------+----------------------------------------------------------------------+ |id |words |ngrams | +---+--------------------------------------------+----------------------------------------------------------------------+ |0 |[Hi, I, heard, about, Spark] |[Hi I, I heard, heard about, about Spark] | |1 |[I, wish, python, could, use, case, classes]|[I wish, wish python, python could, could use, use case, case classes]| |2 |[Logistic, regression, models, are, neat] |[Logistic regression, regression models, models are, are neat] | +---+--------------------------------------------+----------------------------------------------------------------------+
4.12 DCT
Discrete cosine transform is the process of converting the N-dimensional real number sequence in time domain into the N-dimensional real number sequence in frequency domain (a bit similar to discrete Fourier transform).
def DCT(df, inverse=False, inputCol="features", outputCol="featuresDCT"): """ Discrete cosine transform (DCT) is time domain N Transformation of dimensional real number sequence into frequency domain N Process of dimensional real number sequence(A bit like discrete Fourier transform). """ from pyspark.ml.feature import DCT dct = DCT(inverse=inverse, inputCol=inputCol, outputCol=outputCol) dctDf = dct.transform(df) return dctDf
4.13 ChiSqSelector
ChiSqSelector represents chi square feature selection. ChiSqSelector selects the features that the category label mainly depends on according to the independent chi square test. selectorType Supported options: numTopFeatures (default), percentile and fpr.
- 1. numTopFeatures: select the Top(num) features with the most predictive ability through chi square test
- 2. percentile: similar to the previous method, but select a small number of features instead of fixed (num) features
- 3. fpr: select the feature whose P value is lower than the threshold value, so that false positive rate can be controlled for feature selection
def ChiSqSelector(df, featuresCol='features', labelCol='label',numTopFeatures=50,outputCol="selectedFeatures", selectorType='numTopFeatures', percentile=0.1, fpr=0.05): """ ChiSqSelector Represents chi square feature selection. ChiSqSelector According to the independent chi square test, then select the characteristics that the category label mainly depends on. """ # selectorType Supported options: numTopFeatures (default), percentile and fpr. # 1. numTopFeatures: select the Top(num) features with the most predictive ability through chi square test # 2. percentile: similar to the previous method, but select a small number of features instead of fixed (num) features # 3. fpr: select the feature whose P value is lower than the threshold value, so that false positive rate can be controlled for feature selection from pyspark.ml.feature import ChiSqSelector selector = ChiSqSelector( numTopFeatures = numTopFeatures, featuresCol = featuresCol, outputCol = outputCol, labelCol = labelCol, selectorType = selectorType, percentile = percentile, fpr = fpr ) result = selector.fit(df).transform(df) print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures()) return result
The output results of the above codes are as follows:
# ChiSqSelector Output with top 1 features selected +---+------------------+-------+----------------+ | id| features| label|selectedFeatures| +---+------------------+-------+----------------+ | 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0]| | 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0]| | 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0]| +---+------------------+-------+----------------+
4.14 PearsonCorr
Pearson correlation coefficient is used to measure the correlation (linear correlation) between two variables X and Y, and its value is between - 1 and 1.
def PearsonCorr(df,featureCol='feature',labelCol='label'): """ Pearson correlation coefficient( Pearson correlation coefficient) Used to measure two variables X and Y Correlation (linear correlation) between-1 Between and 1. """ return df.corr(featureCol,labelCol,method=None)