Introduction summary of PySpark Feature Engineering

PySpark Feature Tool

1. Data preparation

We define some test data to verify the effectiveness of the function; At the same time, for most beginners, if they understand what the input and output of the function are, they can better understand the characteristic function and use characteristics:

df = spark.createDataFrame([
    ('zhu', "Hi I heard about pySpark"),
    ('xiang', "I wish python could use case classes"),
    ('yu', "Logistic regression models are neat")
], ["id", "sentence"])
# functionTestData
+-----+------------------------------------+
|id   |sentence                            |
+-----+------------------------------------+
|zhu  |Hi I heard about pySpark.           |
|xiang|I wish python could use case classes|
|yu   |Logistic regression models are neat |
+-----+------------------------------------+

2. Data reading

# !/usr/bin/env python
# -*- coding: utf-8 -*-

########################################################################################################################
#  Creater        : Zhu Xiangyu.DOTA
#  Creation Time  : 2020-2-17 12:45:09
#  Description: pyspark Feature Engineering Toolset
#  Modify By      :
#  Modify Time    :
#  Modify Content :
#  Script Version : 2.0.0.9
########################################################################################################################

import math
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DOTAd_Features_Tool').enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.default.parallelism", 2000)

def get_params():
    return {
    # Function Can be Used
    'column1' : "TFIDF",            # Word frequency - reverse file frequency
    'column2' : "Word2Vec",
    'column3' : "CountVectorizer",
    'column4' : "OneHotEncoder",
    'column5' : "StringIndexer",
    'column6' : "IndexToString",
    'column7' : "PCA",
    'column8' : "Binarizer",
    'column9' : "Tokenizer",
    'column10': "StopWordsRemover", #
    'column11': "NGram",            #
    'column12': "DCT",              # discrete cosine transform 
    'column13': "ChiSqSelector",    # Chi square check
    'column14': "PearsonCorr",      # Pearson coefficient
    }

def main():
    # Reset params
    ######################################################################################
    #
    # Library name Table name
    dataset_Name = ""
    dataset = spark.sql("select * from {dataset_Name}".format(dataset_Name = dataset_Name)).fillna(0)
    #
    # Result store target library name Table name
    saveAsTable_Name = ""
    #
    # Specify function operation on column col {col:function}
    params = {'sentence': "TFIDF"}
    #
    ######################################################################################
    #
    # functionTestData
    df = spark.createDataFrame([
        ('zhu', "Hi I heard about pySpark"),
        ('xiang', "I wish python could use case classes"),
        ('yu', "Logistic regression models are neat")
    ], ["id", "sentence"])
    # Feature Transform
    features = featureTool(dataset,params) # Test-Model : dataset = df
    features.show(5)
    # Save Feature as table
    saveResult(features,saveAsTable_Name)

3. Data storage

# SaveTableAs
def saveResult(res,saveAsTable_Name='dota_tmp.dota_features_tool_save_result', saveFormat="orc",saveMode="overwrite"):
    res.write.saveAsTable(name=saveAsTable_Name, format=saveFormat,mode=saveMode)

4. Characteristic function

def featureTool(df,params):
    dataCols,targetCols = df.columns,params.keys()
    exeColumns = list(params.keys())[0]
    exeDefFunction = params[exeColumns]
    print(exeColumns+"-->"+exeDefFunction+"(df,{exeColumns})".format(exeColumns=exeColumns))
    exeOrder = "feat={exeDef}(df,'{exeCols}','{outputCol}')".format(exeCols=exeColumns, exeDef=exeDefFunction, outputCol=\ 	                  exeDefFunction+'_'+exeColumns)
    print("exeOrder : "+exeOrder)
    exec(exeOrder)
    return feat

4.1 TFIDF

The weight calculation method is often used in the vector space model together with cosine similarity to judge the similarity between two documents. At present, TF IDF model is widely used in practical applications such as search engines. The main idea of TF IDF model is that if the word w appears frequently in one document D and rarely in other documents, it is considered that the word w has good discrimination ability and is suitable to distinguish article d from other articles.

def TFIDF(df,inputCol="sentence",outputCol="tfidf", numFeatures=20):
    """
    word frequency-Reverse file frequency( TF-IDF)It is a feature vectorization method widely used in text mining. It can reflect the importance of words in a document in the corpus.
    # Conclusion: the more a word appears in an article and the less it appears in all documents, the more it can represent the article
    """
    from pyspark.ml.feature import HashingTF, IDF, Tokenizer
    tokenizerX = Tokenizer(inputCol=inputCol, outputCol="words")
    wordsDataX = tokenizerX.transform(df)
    hashingTFX = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=numFeatures)
    featurizedData = hashingTFX.transform(wordsDataX)
    idfX = IDF(inputCol="rawFeatures", outputCol=outputCol)
    idfModel = idfX.fit(featurizedData)
    tfidfRes = idfModel.transform(featurizedData).drop('words','rawFeatures')
    return tfidfRes

The output results of the above codes are as follows:

TFIDF Output
+-----+------------------------------------+---------------------------------------------------------------------------+
|id   |sentence                            |tfidf                                                                      |
+-----+------------------------------------+---------------------------------------------------------------------------+
|zhu  |Hi I heard about pySpark            |(20,[0,9,17],[0.693147189,0.5753641037,1.3862943611198906])                |
|xiang|I wish python could use case classes|(20,[2,9,13,15],[0.69314853,1.150720234,0.2851785,0.2876072175])           |
|yu   |Logistic regression models are neat |(20,[4,6,13,15,18],[0.69314,0.6931474,0.2876820785,0.28785,0.69349453])    |
+-----+------------------------------------+---------------------------------------------------------------------------+

4.2 Word2Vec

word2vec model is actually a simplified neural network, which can vectorize text data. Word vector has good semantic characteristics and is a common way to express word features. The value of each dimension of word vector represents a feature with certain semantic and grammatical interpretation. Therefore, each dimension of the word vector can be called a word feature. Word vectors have many forms, and distributed representation is one of them. A distributed representation is a dense, low dimensional real valued vector. Each dimension of distributed representation represents a potential feature of the word, which captures useful syntactic and semantic features. It can be seen that the word distributed in distributed representation embodies the feature of word vector: the different syntactic and semantic features of words are distributed to each dimension.

def Word2Vec(df,inputCol="sentence",outputCol="w2v",vectorSize=100, minCount=5, numPartitions=1,
    stepSize=0.025, maxIter=1, seed=None, windowSize=5, maxSentenceLength=1000):
    """
    Word2vec: take word Convert to vector,word It is a sequential and meaningful entity, such as words in the document and goods clicked by the user in turn.
    Word2vec The entity vector can be used to measure the similarity between entities. On this basis, the following directions can be applied: classification,clustering,recommend,Sentence vector,Short text classification.
    #
    # Two implementation methods
    # Skip gram: use a word as input to predict the context around it.
    # CBOW: use the context of a word as input to predict the word itself.
    #
    Spark of Word2vec It's a Estimator,It uses a series of words to represent the document word2vecmodel. [Spark What is achieved is Skip-gram Model]
    The model maps each word to a fixed size vector.
    word2vecmodel Use the average number of each word in the document to convert the document into a vector,
    Then this vector can be used as the predicted feature to calculate the document similarity calculation and so on.
    """
    from pyspark.ml.feature import Word2Vec
    from pyspark.sql.functions import split
    # Input data: Each row is a bag of words from a sentence or document.
    df = df.withColumn("words",split(df[inputCol],' '))
    word2VecX = Word2Vec(
                        vectorSize = vectorSize,
                        minCount = minCount,
                        inputCol = "words",
                        outputCol = outputCol,
                        numPartitions = numPartitions,
                        stepSize = stepSize,
                        maxIter = maxIter,
                        seed = seed,
                        windowSize = windowSize,
                        maxSentenceLength = maxSentenceLength
                        )
    w2vModel = word2VecX.fit(df)
    w2vRes = w2vModel.transform(df).drop('words')
    return w2vRes

The output results of the above codes are as follows:

Word2Vec Output
+-----+--------------------+--------------------+
|   id|            sentence|                 w2v|
+-----+--------------------+--------------------+
|  zhu|Hi I heard about ...|[0.08936496693640...|
|xiang|I wish python cou...|[7.36715538161141...|
|   yu|Logistic regressi...|[-0.0063562680035...|
+-----+--------------------+--------------------+

4.3 CountVectorizer

Countvectorizer is designed to convert a document into a vector by counting.

def CountVectorizer(df,inputCol="sentence",outputCol="cv",vectorSize=200000, minCount=1.0):
    """
    Countvectorizer The purpose is to convert a document into a vector by counting.
    When there is no a priori dictionary, Countvectorizer Can be used as Estimator To extract words and generate a Countvectorizermodel. 
    The model produces a sparse representation of words in documents, which can be passed to other algorithms, such as LDA. 
    #
    stay fitting In the process, countvectorizer The top words will be selected according to the word frequency in the corpus vocabsize A word.
    An optional parameter minDF Also affect fitting In the process, it specifies the minimum number of occurrences of words in the vocabulary in the document.
    Another optional binary parameter controls the output vector. If set to true, all non-zero counts are 1. This is very useful for binary discrete probability models.
    """
    from pyspark.ml.feature import CountVectorizer
    from pyspark.sql.functions import split
    df = df.withColumn("words",split(df[inputCol],' '))
    CountVectorizerX = CountVectorizer(inputCol="words", outputCol=outputCol, vocabSize=vectorSize, minDF=minCount)
    cvModelX = CountVectorizerX.fit(df)
    cvRes = cvModelX.transform(df).drop('words')
    return cvRes

The output results of the above codes are as follows:

CountVectorizer Output
+-----+------------------------------------+----------------------------------------------------+
|id   |sentence                            |cv                                                  |
+-----+------------------------------------+----------------------------------------------------+
|zhu  |Hi I heard about pySpark            |(16,[0,2,4,12,13],[1.0,1.0,1.0,1.0,1.0])            |
|xiang|I wish python could use case classes|(16,[0,3,5,6,8,10,14],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|yu   |Logistic regression models are neat |(16,[1,7,9,11,15],[1.0,1.0,1.0,1.0,1.0])            |
+-----+------------------------------------+----------------------------------------------------+

4.4 OneHotEncoder

Map a category feature to a binary vector with only one valid value (1 and the rest 0).

def OneHotEncoder(df,inputCol="category",outputCol="categoryVec"):
    """
    Map a category feature to a binary vector with only one valid value(Is 1 and the rest is 0). 
    """
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    stringIndexerX = StringIndexer(inputCol=inputCol, outputCol="categoryIndex")
    modelX = stringIndexerX.fit(df)
    indexed = modelX.transform(df)
    encoderX = OneHotEncoder(inputCol="categoryIndex", outputCol=outputCol)
    encodedX = encoderX.transform(indexed).drop("categoryIndex")
    return encodedX

The output results of the above codes are as follows:

OneHotEncoder Output
+-------+--------+-------------+-------------+
|     id|category|categoryIndex|  categoryVec|
+-------+--------+-------------+-------------+
|    zhu|       a|          0.0|(2,[0],[1.0])|
|xiangyu|       b|          2.0|    (2,[],[])|
|     yu|       c|          1.0|(2,[1],[1.0])|
|     is|       a|          0.0|(2,[0],[1.0])|
| coming|       a|          0.0|(2,[0],[1.0])|
|    now|       c|          1.0|(2,[1],[1.0])|
+-------+--------+-------------+-------------+

4.5 StringIndexer

The tags are indexed, and the index values are sorted according to the frequency of occurrence of the tags.

def StringIndexer(df,inputCol="category",outputCol="categoryVec"):
    """
    The tags are indexed, and the index values are sorted according to the frequency of occurrence of the tags.
    """
    from pyspark.ml.feature import StringIndexer
    indexerX = StringIndexer(inputCol=inputCol, outputCol=outputCol)
    indexedX = indexerX.fit(df).transform(df)
    return indexedX

The output results of the above codes are as follows:

StringIndexer Output
+-----+--------------------+-----------+
|   id|            sentence|categoryVec|
+-----+--------------------+-----------+
|  zhu|Hi I heard about ...|        2.0|
|xiang|I wish python cou...|        0.0|
|   yu|Logistic regressi...|        1.0|
+-----+--------------------+-----------+

4.6 IndexToString

Corresponding to StringIndexer, IndexToString restores the indexed label to the original string.

def IndexToString(df,inputCol="categoryVec",outputCol="category"):
    """
    And StringIndexer corresponding, IndexToString Restores the indexed label to the original string.
    """
    from pyspark.ml.feature import IndexToString
    converterX = IndexToString(inputCol=inputCol, outputCol=outputCol)
    convertedX = converterX.transform(df)
    return convertedX

The output results of the above codes are as follows:

IndexToString Output
IndexToString(StringIndexer(df,"sentence"))
+-----+--------------------+-----------+--------------------+
|   id|            sentence|categoryVec|            category|
+-----+--------------------+-----------+--------------------+
|  zhu|Hi I heard about ...|        2.0|Hi I heard about ...|
|xiang|I wish python cou...|        0.0|I wish python cou...|
|   yu|Logistic regressi...|        1.0|Logistic regressi...|
+-----+--------------------+-----------+--------------------+

4.7 PCA

Principal component analysis is a statistical method for data rotation transformation. Its essence is to carry out a base transformation in linear space to maximize the variance of the transformed data projected on a new set of "coordinate axes". Then, cut out the "coordinate axes" with small variance after transformation, and the remaining new "coordinate axes" are called principal components, They can represent the nature of the original data as much as possible in a lower dimensional subspace.

PCA Input
+--------------------+
|            features|
+--------------------+
| (5,[1,3],[1.0,7.0])|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+

The output results of the above codes are as follows:

def PCA(df,vectorSize=3, inputCol="features", outputCol="pcaFeatures"):
    """
    Principal component analysis is a statistical method for rotating data, and its essence is a base transformation in linear space,
    Make the transformed data projected on a new set of"Coordinate axis"Maximization of variance on,
    Then, the small variance after transformation is cut out"Coordinate axis",The rest is new"Coordinate axis"That is, it is called the main component,
    They can represent the nature of the original data as much as possible in a lower dimensional subspace.
    """
    from pyspark.ml.feature import PCA
    pcaX = PCA(k=vectorSize, inputCol=inputCol, outputCol=outputCol)
    modelX = pcaX.fit(df)
    pcaRes = modelX.transform(df)
    return pcaRes

The output results of the above codes are as follows:

PCA Output
+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+

4.8 Binarizer

Convert the numerical eigenvalue into binary (0 / 1) output, and set a threshold. The output greater than the threshold is 1 and the output less than the threshold is 0.

def Binarizer(df,threshold=0.5, inputCol="feature", outputCol="binarized_feature"):
    """
    Convert numerical eigenvalues into binary(0/1)Output: set a threshold value. The output greater than the threshold value is 1 and the output less than the threshold value is 0
    """
    from pyspark.ml.feature import Binarizer
    binarizerX = Binarizer(threshold=threshold, inputCol=inputCol, outputCol=outputCol)
    binarizedX = binarizerX.transform(df)
    return binarizedX

The output results of the above codes are as follows:

Binarizer Output
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+

4.9 Tokenizer

Word splitter: provides default word segmentation and regular expression word segmentation

def Tokenizer(df,inputCol="sentence", outputCol="words", pattern="\\W"):
    """
    Tokenizer :Default word segmentation and regular expression word segmentation are provided
    """
    from pyspark.ml.feature import RegexTokenizer
    regexTokenizer = RegexTokenizer(inputCol=inputCol, outputCol=outputCol, pattern=pattern)
    regexTokenized = regexTokenizer.transform(df)
    return regexTokenized

The output results of the above codes are as follows:

Tokenizer Output
+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+

4.10 StopWordsRemover

Stop word filtering

def StopWordsRemover(df,inputCol="words", outputCol="words2",add_stopwords=[]):
    """
    Stop word filtering
    """
    from pyspark.ml.feature import StopWordsRemover
    remover = StopWordsRemover(inputCol=inputCol, outputCol=outputCol).setStopWords(add_stopwords)
    # Add stop words
    # remover = remover.setStopWords(Array("saw","Mary"))
    removed = remover.transform(df)
    return removed

The output results of the above codes are as follows:

StopWordsRemover Output
+---+----------------------------+--------------------+
|id |words                       |words2              |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+

4.11 NGram

Turn words into continuous words for output.

def NGram(df,n=2, inputCol="words", outputCol="ngrams"):
    """
    Turn words into continuous words for output
    """
    from pyspark.ml.feature import NGram
    ngram = NGram(n=2, inputCol=inputCol, outputCol=outputCol)
    ngramDF = ngram.transform(df)
    return ngramDF

The output results of the above codes are as follows:

NGram Output
+---+--------------------------------------------+----------------------------------------------------------------------+
|id |words                                       |ngrams                                                                |
+---+--------------------------------------------+----------------------------------------------------------------------+
|0  |[Hi, I, heard, about, Spark]                |[Hi I, I heard, heard about, about Spark]                             |
|1  |[I, wish, python, could, use, case, classes]|[I wish, wish python, python could, could use, use case, case classes]|
|2  |[Logistic, regression, models, are, neat]   |[Logistic regression, regression models, models are, are neat]        |
+---+--------------------------------------------+----------------------------------------------------------------------+

4.12 DCT

Discrete cosine transform is the process of converting the N-dimensional real number sequence in time domain into the N-dimensional real number sequence in frequency domain (a bit similar to discrete Fourier transform).

def DCT(df, inverse=False, inputCol="features", outputCol="featuresDCT"):
    """
    Discrete cosine transform (DCT) is time domain N Transformation of dimensional real number sequence into frequency domain N Process of dimensional real number sequence(A bit like discrete Fourier transform). 
    """
    from pyspark.ml.feature import DCT
    dct = DCT(inverse=inverse, inputCol=inputCol, outputCol=outputCol)
    dctDf = dct.transform(df)
    return dctDf

4.13 ChiSqSelector

ChiSqSelector represents chi square feature selection. ChiSqSelector selects the features that the category label mainly depends on according to the independent chi square test. selectorType Supported options: numTopFeatures (default), percentile and fpr.

  • 1. numTopFeatures: select the Top(num) features with the most predictive ability through chi square test
  • 2. percentile: similar to the previous method, but select a small number of features instead of fixed (num) features
  • 3. fpr: select the feature whose P value is lower than the threshold value, so that false positive rate can be controlled for feature selection
def ChiSqSelector(df, featuresCol='features', labelCol='label',numTopFeatures=50,outputCol="selectedFeatures",
    selectorType='numTopFeatures', percentile=0.1, fpr=0.05):
    """
    ChiSqSelector Represents chi square feature selection. ChiSqSelector According to the independent chi square test, then select the characteristics that the category label mainly depends on.
    """
    # selectorType Supported options: numTopFeatures (default), percentile and fpr.
    # 1. numTopFeatures: select the Top(num) features with the most predictive ability through chi square test
    # 2. percentile: similar to the previous method, but select a small number of features instead of fixed (num) features
    # 3. fpr: select the feature whose P value is lower than the threshold value, so that false positive rate can be controlled for feature selection
    from pyspark.ml.feature import ChiSqSelector
    selector = ChiSqSelector(
                            numTopFeatures = numTopFeatures,
                            featuresCol = featuresCol,
                            outputCol = outputCol,
                            labelCol = labelCol,
                            selectorType = selectorType,
                            percentile = percentile,
                            fpr = fpr
                            )
    result = selector.fit(df).transform(df)
    print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
    return result

The output results of the above codes are as follows:

# ChiSqSelector Output with top 1 features selected
+---+------------------+-------+----------------+
| id|          features|  label|selectedFeatures|
+---+------------------+-------+----------------+
|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]|
|  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]|
|  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|
+---+------------------+-------+----------------+

4.14 PearsonCorr

Pearson correlation coefficient is used to measure the correlation (linear correlation) between two variables X and Y, and its value is between - 1 and 1.

def PearsonCorr(df,featureCol='feature',labelCol='label'):
    """
    Pearson correlation coefficient( Pearson correlation coefficient)
    Used to measure two variables X and Y Correlation (linear correlation) between-1 Between and 1.
    """
    return df.corr(featureCol,labelCol,method=None)

Keywords: Python Spark Machine Learning

Added by gumby51 on Tue, 14 Dec 2021 21:04:34 +0200