Spark MLlib Data Preprocessing-Feature Transform

Tokenizer (Word segmentation device

Introduction to the algorithm:

Tokenization divides text into separate individuals (usually words).

RegexTokenizer provides more partitioning options based on regular expressions. By default, the parameter "pattern" is a delimiter for dividing text. Or you can specify the parameter "gaps" to indicate that the regular "patten" means "tokens" instead of a separator, so that you can find all possible matches for the segmentation results.

Call:

Scala:

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)
StopWordsRemover

Introduction to the algorithm:

Stop words are words that appear frequently in documents but do not carry much meaning. They should not be included in the algorithm input.

The input of StopWordsRemover is a series of strings (such as the output of the word splitter), and all stop words are deleted from the output. The stop vocabulary is provided by the stopWords parameter. The default stop vocabulary for some languages can be invoked through StopWordsRemover.loadDefaultStopWords(language). The Boolean parameter case Sensitive specifies whether case-sensitive (default).

Example:

Suppose we have the following DataFrame with id and ray columns:

id | raw

----|----------

0 | [I, saw, the, red, baloon]

1 | [Mary, had, a, little, lamb]

By calling StopWords Remover on the raw column, we can get the results as follows:

id | raw | filtered

----|-----------------------------|--------------------

0 | [I, saw, the, red, baloon] | [saw, red, baloon]

1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]

Among them, "I", "the", "had" and "a" were removed.

Call:

Scala:

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "baloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show()
n-gram

Introduction to the algorithm:

An n-gram is a sequence of words whose length is an integer n. NGram can be used to convert input to n-gram.

The input of NGram is a series of strings (such as the output of a word segmenter). The parameter n determines the number of objects per n-gram. The result contains a series of n-grams, where each n-gram represents a space-partitioned n-continuous character. If fewer than n strings are input, there will be no output.

Call:

Scala:

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("label", "words")

val ngram = new NGram().setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)
Binarizer

Introduction to the algorithm:

Binarization is the process of converting continuous numerical features into 0-1 features according to the threshold value.

Binarizer parameters have input, output and threshold values. If the eigenvalue is greater than the threshold, it will be mapped to 1.0, and if the eigenvalue is less than or equal to the threshold, it will be mapped to 0.0.

Call:

Scala:

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("label", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeatures = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)
PCA

 

Introduction to the algorithm:

Principal component analysis (PCA) is a statistical method. It uses orthogonal transformation to extract a set of linear independent variables from a series of possible related variables. The elements in the set of variables extracted are called principal components. Using PCA
The method can reduce the dimension of variable set. The following example shows how to transform a 5-Dimensional feature vector into a 3-dimensional principal component vector.

Call:

Scala:

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()
PolynomialExpansion

Introduction to the algorithm:

Polynomial extension extends the original feature to polynomial space by generating n-dimensional combinations. The following example shows how to extend your feature set to 3-dimensional polynomial space.

Call:

Scala:

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(-2.0, 2.3),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(0.6, -1.1)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)
val polyDF = polynomialExpansion.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)
Discrete Cosine Transform (DCT)

Introduction to the algorithm:

Discrete cosine transform is a transform related to Fourier transform. It is similar to discrete Fourier transform but only uses real numbers. Discrete cosine transform is equivalent to a discrete Fourier transform whose length is about twice as long. This discrete Fourier transform is performed on a real even function (because the Fourier transform of a real even function is still a real even function). Discrete cosine transform (DCT) is often used in signal processing and image processing for lossy data compression of signals and images (including still and moving images).

Call:

Scala:

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(3)
Stringindexer

Introduction to the algorithm:

String Indexer encodes string tags as tag metrics. The index range is [0,numLabels], which is sorted according to the frequency of tags appearing, so the index of tags appearing most frequently is 0. If the input is numeric, we first map it to the string and then index the value of the string. If the downstream pipeline node needs to use a string-indicator label, the input and drilling must be returned to the string-indicator column name.

Example:

Suppose we have the following DataFrame with id and category columns:

id | category

----|----------

0 | a

1 | b

2 | c

3 | a

4 | a

5 | c

category is a string column with three values. After conversion with String Indexer, we can get the following output:

id | category | categoryIndex

----|----------|---------------

0 | a | 0.0

1 | b | 2.0

2 | c | 1.0

3 | a | 0.0

4 | a | 0.0

5 | c | 1.0

In addition, if a label does not appear in the training when converting new data, String Indexer will report an error (default value) or skip the label instance that does not appear.

Call:

Scala:

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
IndexToString

Introduction to the algorithm:

Corresponding to String Indexer, String Indexer maps the indicator tag back to the original string tag. A common scenario is to generate index tags through String Indexer, then train them with index tags, and finally use String Indexer to get the original tag string for the prediction results.

Example:

Suppose we have the following DataFrame with id and categoryIndex columns:

id | categoryIndex

----|---------------

0 | 0.0

1 | 2.0

2 | 1.0

3 | 0.0

4 | 0.0

5 | 1.0

Using the originalCategory, we can obtain its original signature string as follows:

id | categoryIndex | originalCategory

----|---------------|-----------------

0 | 0.0 | a

1 | 2.0 | b

2 | 1.0 | c

3 | 0.0 | a

4 | 0.0 | a

5 | 1.0 | c

Call:

Scala:

import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
OneHotEncoder

Introduction to the algorithm:

Single-hot coding maps label metrics to binary vectors, of which at most one is a single value. This encoding is used to apply category features to algorithms requiring continuous features, such as logistic regression.

Call:

Scala:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
VectorIndexer

Introduction to the algorithm:

Vector Indexer solves the category feature Vector in the data set. It can automatically identify which features are categorized and convert the original values into categorized indicators. Its processing flow is as follows:

1. Get an input of vector type and maxCategories parameter.

2. Identify which features need to be categorized based on the original values, and the maxCategories need to be categorized at most.

3. Calculate the 0-based category index for each category feature.

4. Indexing the category features and then converting the original values into indicators.

Category features after indexing can help decision tree and other algorithms to deal with category features and get better results.

In the following example, we read in a data set and then use Vector Indexer to determine which features need to be treated as non-numeric types, converting non-numeric features into their indexes.

Call examples:

Scala:

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
  categoricalFeatures.mkString(", "))

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
Normalizer (Regularization)

Introduction to the algorithm:

Normalizer is a converter that converts multi-line vector input into a unified form. The parameter P (default: 2) specifies the p-norm used in regularization. Regularization operation can standardize input data and improve the effect of later learning algorithm.

The following example shows how to read data in a libsvm format and then convert each line toas well asForm.

Call examples:

Scala:

import org.apache.spark.ml.feature.Normalizer

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()
StandardScaler

Introduction to the algorithm:

Standard Scaler processes Vector data, standardizing each feature so that it has a uniform standard deviation and (or) a zero mean. It requires the following parameters:

1. withStd: The default value is true, using the uniform standard deviation method.

2. withMean: The default is false. This method produces a dense output, so it is not suitable for sparse input.

Standard Scaler is an Estimator, which can fit data sets to produce a Standard Scaler Model for calculating aggregate statistics. The resulting modules can then be used to transform vectors to uniform standard deviations and (or) zero mean features.

Note that if the standard deviation of the feature is zero, the default value returned by the feature in the vector is 0.0.

The following example shows how to read data in the form of a libsvm and return standardized features with uniform standard deviations.

Call examples:

Scala:

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler

Introduction to the algorithm:

MinMaxScaler converts Vector-style columns to specified ranges by resizing them, usually [0,1], with parameters such as:

1. min: Default is 0.0, which is the lower boundary of all features after conversion.

2. max: default is 1.0, which is the lower boundary of all features after conversion.

MinMax Scaler calculates aggregate presidential measurements of data sets and generates a MinMax Scaler Model. The model can convert the values of independent features to specified ranges.

For feature E, the adjusted eigenvalues are as follows:

IfThen.

Note that the output may be dense vectors even if it is sparse input because the zero-value conversion may become non-zero.

The following example shows how to read data in the form of a libsvm and adjust its characteristic values to [0,1].

Call examples:

Scala:

import org.apache.spark.ml.feature.MinMaxScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MaxAbsScaler

Introduction to the algorithm:

MaxAbsScaler uses the absolute value of the maximum value of each feature to convert the eigenvalue of the input vector to [-1,1]. Because it does not transfer/centralize data, it does not destroy data sparsity.

The following example shows how to read data in the form of a libsvm and adjust its characteristic values to [-1,1].

Call examples:

Scala:

import org.apache.spark.ml.feature.MaxAbsScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Bucketizer

Introduction to the algorithm:

Bucketizer converts a series of continuous features into feature intervals, which are specified by users. The parameters are as follows:

1. splits: when the splitting number is n+1, n intervals will be generated. Except for the last interval, the range of each interval [x,y] is determined by the splitting x,y. The division must be strictly incremental. Values that are not specified in splitting will be classified as errors. Two examples of splitting are Array (Double. Negative Infinity, 0.0, 1.0, Double. Positive Infinity) and Array(0.0, 1.0, 2.0).

Note that Double. Negative Infinity and Double. Positive Infinity should be added to avoid crossing the boundaries when the upper and lower boundaries of uncertain splitting are uncertain.

Next, I'll show you how Bucketizer works.

Call examples:

Scala:

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
ElementwiseProduct

Introduction to the algorithm:

ElementwiseProduct returns the product at the element level of the input vector by the provided "weight" vector. That is to say, the input data is scaled according to the weight provided, and the Hadamard product of the input vector v and the weight vector w is obtained.

The following example shows how to adjust a vector by converting its value.

Call examples:

Scala:

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
SQLTransformer

Introduction to the algorithm:

The SQL Transformer tool is used to transform statements defined by SQL. At present, only SQL grammar such as "SELECT... FROM THIS" is supported, where "THIS" represents the basic table of input data. Selection statement specifies the fields, elements and expressions displayed in the output, and supports all selection statements in Spark SQL. Users can use Spark SQL to establish equations or user-defined functions based on the selection results. An example of SQL Transformer support grammar is as follows:

1. SELECT a, a + b AS a_b FROM __THIS__

2. SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5

3. SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

Example:

Suppose we have the following DataFrame with columns id, v1, v2:
id | v1 | v2

----|-----|-----

0 | 1.0 | 3.0

2 | 2.0 | 5.0

Using the SQL Transformer statement "SELECT*, (v1 + v2) AS v3, (v1 * v2) AS V4 FROM _THIS_" the output is as follows:

id | v1 | v2 | v3 | v4

----|-----|-----|-----|-----

0 | 1.0 | 3.0 | 4.0 | 3.0

2 | 2.0 | 5.0 | 7.0 |10.0

Call examples:

Scala:

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
 sqlTrans.transform(df).show()
VectorAssembler

Introduction to the algorithm:

Vector Assembler is a converter that combines a given number of columns into one column vector. It can combine the original features and a series of features obtained by other converters into a single feature vector to train machine learning algorithms such as logical regression and decision tree. The types of input columns acceptable to Vector Assembler are numeric, Boolean, and vector. The values of the input column are added to a new vector in the specified order.

Example:

Suppose we have the following DataFrame columns: id, hour, mobile, userFeatures, and clicked:

id | hour | mobile | userFeatures | clicked

----|------|--------|------------------|---------

0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0

The userFeatures column contains three user features. We want to merge hour, mobile, and user features into a new column. The input of Vector Assembler is specified as hour, mobile and user Features, and the output is specified as features. We will get the following results by transforming:

id | hour | mobile | userFeatures | clicked | features

----|------|--------|------------------|---------|-----------------------------

0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]

Call examples:

Scala:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
QuantileDiscretizer

Introduction to the algorithm:

Quantile Discretizer talks about transforming continuous features into hierarchical category features. The number of classifications is determined by the numBuckets parameter. The grading range is determined by an evolutionary algorithm. The accuracy of increment is determined by the relativeError parameter. When relativeError is set to 0, the exact quantile will be computed (the computational cost is high). The upper and lower boundaries of the hierarchy are negative infinite to positive infinite, covering all real values.

Example:
Suppose we have the following DataFrame containing id, hour:

id | hour

----|------

0 | 18.0

----|------

1 | 19.0

----|------

2 | 8.0

----|------

3 | 5.0

----|------

4 | 2.2

Hour is a continuous feature of Double type. By setting the parameter numBuckets to 3, we can transform the hour into the following hierarchical feature.

id | hour | result

----|------|------

0 | 18.0 | 2.0

----|------|------

1 | 19.0 | 2.0

----|------|------

2 | 8.0 | 1.0

----|------|------

3 | 5.0 | 1.0

----|------|------

4 | 2.2 | 0.0

Call examples:

Scala:

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()

Keywords: Spark Apache Scala SQL

Added by axm on Fri, 31 May 2019 21:05:00 +0300