Spark's LinearRegressionWithSGD is very sensitive to feature scaling

4.5k Views Asked by At

I have a problem fitting with LinearRegressionWithSGD in Spark's MLlib. I used their example for fitting from here https://spark.apache.org/docs/latest/mllib-linear-methods.html (using Python interface).

In their example all features are almost scaled with mean around 0 and standard deviation around 1. Now if I un-scale one of them by a factor of 10, the regression breaks (gives nans or very large coefficients):

from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    # UN-SCALE one of the features by a factor of 10
    values[3] *= 10

    return LabeledPoint(values[0], values[1:])

data = sc.textFile(spark_home+"data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label,     model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
print "Model coefficients:", str(model)

So, I guess I need to do the feature scaling. If I do pre-scaling it works (because I'm back at scaled features). However now I don't know how to get coefficients in the original space.

from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import StandardScalerModel

# Load and parse the data
def parseToDenseVector(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    # UN-SCALE one of the features by a factor of 10
    values[3] *= 10
    return Vectors.dense(values[0:])

# Load and parse the data
def parseToLabel(values):
    return LabeledPoint(values[0], values[1:])

data = sc.textFile(spark_home+"data/mllib/ridge-data/lpsa.data")

parsedData = data.map(parseToDenseVector)
scaler = StandardScaler(True, True)
scaler_model = scaler.fit(parsedData)
parsedData_scaled = scaler_model.transform(parsedData)

parsedData_scaled_transformed = parsedData_scaled.map(parseToLabel)

# Build the model
model = LinearRegressionWithSGD.train(parsedData_scaled_transformed)

# Evaluate the model on training data
valuesAndPreds = parsedData_scaled_transformed.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
print "Model coefficients:", str(model)

So, here I have all the coefficients in the transformed space. Now how do I get to the original space? I also have scaler_model which is StandardScalerModel object. But I can't get neither means or variances out of it. The only public method that this class has is transform which can transform points from original space to transform. But I can't get it reverse.

3

There are 3 best solutions below

0
On

I just ran into this problem. The models cannot even learn f(x) = x if x is high (>3) in the training data. So terrible.

I think rather than scaling the data another option is to change the step size. This is discussed in SPARK-1859. To paraphrase from there:

The step size should be smaller than 1 over the Lipschitz constant L. For quadratic loss and GD, the best convergence happens at stepSize = 1/(2L). Spark has a (1/n) multiplier on the loss function.

Let's say you have n = 5 data points and the largest feature value is 1500. So L = 1500 * 1500 / 5. The best convergence happens at stepSize = 1/(2L) = 10 / (1500 ^ 2).

The last equality doesn't even make sense (how did we get a 2 in the numerator?) but I've never heard of a Lipschitz constant before, so I am not qualified to fix it. Anyway I think we can just try different step sizes until it starts to work.

0
On

As you pointed out StandardScalerModel object in pyspark doesn't expose std and mean attributes. There is an issue https://issues.apache.org/jira/browse/SPARK-6523

You can easily calculate them yourself

import numpy as np
from pyspark.mllib.stat import Statistics

summary = Statistics.colStats(features)
mean = summary.mean()
std = np.sqrt(features.variance())

These are the same mean and std that your Scaler uses. You can verify this using python magic dict

print scaler_model.__dict__.get('_java_model').std()
print scaler_model.__dict__.get('_java_model').mean()
0
On

To rephrase your question, you want to find the intercept I and coefficients C_1 and C_2 that solve the equation: Y = I + C_1 * x_1 + C_2 * x_2 (where x_1 and x_2 are unscaled).

Let i be the intercept that mllib returns. Likewise let c_1 and c_2 be the coefficients (or weights) that mllib returns.

Let m_1 be the unscaled mean of x_1 and m_2 be the unscaled mean of x_2.

Let s_1 be the unscaled standard deviation of x_1 and s_2 be the unscaled standard deviation of x_2.

Then C_1 = (c_1 / s_1), C_2 = (c_2 / s_2), and

I = i - c_1 * m_1 / s_1 - c_2 * m_2 / s_2

This can easily be extended to 3 input variables:

C_3 = (c_3 / s_3) and I = i - c_1 * m_1 / s_1 - c_2 * m_2 / s_2 - c_3 * m_3 / s_3