Python Notebook for linear regression

This is a Python Notebook from EdX Machine Learning course. The goal is to know the ins and outs of linear regression, and use it to predict power plant electricity consumption.

# Power plant power output prediction - Linear Regression, Random Forest
# %sh pip install --upgrade spark_mooc_meta
### Above is for databricks_test_helper
# ‘AT’# ‘V’# ‘AP’# ‘RH’# ‘PE’

powerPlantDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='true', inferschema='true').load("/databricks-datasets/power-plant/data")
altPowerPlantDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='true', inferschema='false').load("/databricks-datasets/power-plant/data", schema=customSchema)

sqlContext.sql("DROP TABLE IF EXISTS power_plant")dbutils.fs.rm("dbfs:/user/hive/warehouse/power_plant", True)sqlContext.registerDataFrameAsTable(powerPlantDF, "power_plant")

# We'll hold out 20% of our data for testing and leave 80% for training
seed = 1800009193L(split20DF, split80DF) = datasetDF.randomSplit([0.2, 0.8], seed)

# Let's cache these datasets for performance
testSetDF = split20DF.cache()trainingSetDF = split80DF.cache()

# ***** LINEAR REGRESSION MODEL ****
from pyspark.ml.regression import LinearRegressionfrom pyspark.ml.regression 
import LinearRegressionModel
from pyspark.ml import Pipeline

# Let's initialize our linear regression 
learnerlr = LinearRegression()

# We use explain params to dump the parameters we can use
print(lr.explainParams())

# Now we set the parameters for the method
lr.setPredictionCol("Predicted_PE").setLabelCol("PE").setMaxIter(100).setRegParam(0.1)

# We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
lrPipeline = Pipeline()
lrPipeline.setStages([vectorizer, lr])

# Let's first train on the entire dataset to see what we get
lrModel = lrPipeline.fit(trainingSetDF)

# The intercept is as follows:
intercept = lrModel.stages[1].intercept

# The coefficents (i.e., weights) are as follows:
weights = lrModel.stages[1].coefficients

# Create a list of the column names (without PE)
featuresNoLabel = [col for col in datasetDF.columns if col != "PE"]

# Merge the weights and labels
coefficents = zip(weights, featuresNoLabel)

# Now let's sort the coefficients from greatest absolute weight most to the least absolute weight
coefficents.sort(key=lambda tup: abs(tup[0]), reverse=True)
equation = "y = {intercept}".format(intercept=intercept)
variables = []

for x in coefficents:    
    weight = abs(x[0])    
    name = x[1]    
    symbol = "+" if (x[0] > 0) else "-"
    equation += (" {} ({} * {})".format(symbol, weight, name))

# Finally here is our equation
print("Linear Regression Equation: " + equation)

# Apply our LR model to the test data and predict power output
predictionsAndLabelsDF = lrModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")

# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")

# Run the evaluator on the DataFrame
rmse = regEval.evaluate(predictionsAndLabelsDF)print("Root Mean Squared Error: %.2f" % rmse)

# Now let's compute another evaluation metric for our test dataset
r2 = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})print("r2: {0:.2f}".format(r2))

# First we remove the table if it already exists
sqlContext.sql("DROP TABLE IF EXISTS Power_Plant_RMSE_Evaluation")
dbutils.fs.rm("dbfs:/user/hive/warehouse/Power_Plant_RMSE_Evaluation", True)

# Next we calculate the residual error and divide it by the RMSE
""" NOTE: selectExpr("col_name_in_table col_name_in_dataframe") -> NO 'AS' in between the two!! 
"""
predictionsAndLabelsDF.selectExpr("PE", "Predicted_PE", "PE - Predicted_PE Residual_Error",  "(PE - Predicted_PE) / {} Within_RSME".format(rmse)).registerTempTable("Power_Plant_RMSE_Evaluation")

%sql
SELECT 
case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1            when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3       end RSME_Multiple, COUNT(*) AS count
FROM Power_Plant_RMSE_Evaluation
GROUP BY case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1                when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3 end

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# We can reuse the RegressionEvaluator, regEval, to judge the model based on the best Root Mean Squared Error

# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

# Let's tune over our regularization parameter from 0.01 to 0.10
regParam = [x / 100.0 for x in range(1, 11)]

# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
cvModel = crossval.fit(trainingSetDF).bestModel

# Now let's use cvModel to compute an evaluation metric for our test dataset: 
testSetDFpredictionsAndLabelsDF = cvModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseNew = regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2New = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})
print("Original Root Mean Squared Error: {0:2.2f}".format(rmse))
print("New Root Mean Squared Error: {0:2.2f}".format(rmseNew))
print("Old r2: {0:2.2f}".format(r2))print("New r2: {0:2.2f}".format(r2New))

# Get regularization parameter by reaching through the JVM version of the API
print("Regularization parameter of the best model: {0:.2f}". format(cvModel.stages[-1]._java_obj.parent().getRegParam()))

# Decision tree instantiation and pipeline construction
from pyspark.ml.regression import DecisionTreeRegressor

# Create a DecisionTreeRegressor
dt = DecisionTreeRegressor()
dt.setLabelCol("PE")\
  .setPredictionCol("Predicted_PE")\
  .setFeaturesCol("features")\
  .setMaxBins(100)

# Create a Pipeline
dtPipeline = Pipeline()

# Set the stages of the Pipeline
dtPipeline.setStages([vectorizer, dt])

# Let's just reuse our CrossValidator with the new dtPipeline,  RegressionEvaluator regEval, and 3 fold cross validation
crossval.setEstimator(dtPipeline)

# Let's tune over our dt.maxDepth parameter on the values 2 and 3, create a paramter grid using the ParamGridBuilder
paramGrid = (ParamGridBuilder()
            .addGrid(dt.maxDepth, [2, 3])
            .build())

# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
dtModel = crossval.fit(trainingSetDF).bestModel

# Now let's use dtModel to compute an evaluation metric for our test dataset: 
testSetDFpredictionsAndLabelsDF = dtModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseDT = regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2DT = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})
print("LR Root Mean Squared Error: {0:.2f}".format(rmseNew))
print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT))
print("LR r2: {0:.2f}".format(r2New))print("DT r2: {0:.2f}".format(r2DT))

# DecisionTreeRegressionModel: display as if-then clauses 
print dtModel.stages[-1]._java_obj.toDebugString()

# Random Forest (Ensemble of Decision Trees)
from pyspark.ml.regression import RandomForestRegressor

# Create a RandomForestRegressor
rf = RandomForestRegressor()
rf.setLabelCol("PE")\
  .setPredictionCol("Predicted_PE")\
  .setFeaturesCol("features")\
  .setSeed(100088121L)\
  .setMaxDepth(8)\
  .setNumTrees(30)

# Create a Pipeline
rfPipeline = Pipeline()

# Set the stages of the Pipeline
rfPipeline.setStages([vectorizer, rf])

# Let's just reuse our CrossValidator with the new rfPipeline,  RegressionEvaluator regEval, and 3 fold cross validation
crossval.setEstimator(rfPipeline)

# Let's tune over our rf.maxBins parameter on the values 50 and 100, create a parameter grid using the ParamGridBuilder
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxBins, [50, 100])
            .build())

# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
rfModel = crossval.fit(trainingSetDF).bestModel

There you go! It’s long but not complicated. Basically you:

  1. Set up the environment
  2. Build ML pipeline
  3. Apply different algorithms: linear regression, decision tree, and random forest
  4. Cross validation
  5. Grid search for hyper parameter
  6. Predict test set labels.

The above are standard ML end-to-end process. Have fun!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s