Source code for mmtfPyspark.ml.sparkRegressor

#!/usr/bin/env python
'''sparkRegressor.py

Fits a regression model using an MLlib regression method and returns regression
metrics

'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from collections import OrderedDict

[docs]class SparkRegressor(object): '''Fits a regression model using an MLlib regression method and returns regression metrics Attributes ---------- predictor type of multi-class classifier label : str classification label testFraction : float test set fraction [0.3] seed : int random seed ''' def __init__(self, predictor, label, testFraction = 0.3 , seed = 1): self.predictor = predictor self.label = label self.testFraction = testFraction self.seed = seed
[docs] def fit(self, data): '''Dataset must at least contain the following two columns: label : the class labels features : feature vector Parameters ---------- data : Dataset<Row> Returns ------- dict mapping of metrics ''' # Split the data into training and test sets (30% held out for testing) splits = data.randomSplit([1.0-self.testFraction, self.testFraction], self.seed) trainingData = splits[0] testData = splits[1] # Train a RandomForest model self.predictor.setLabelCol(self.label).setFeaturesCol("features") # Chain indexer and forest in a Pipeline pipeline = Pipeline().setStages([self.predictor]) # Train Model. This also runs the indexer. model = pipeline.fit(trainingData) # Make predictions predictions = model.transform(testData) # Display some sample predictions print(f"Sample predictions: {str(self.predictor).split('_')[0]}") # TODO self.predictor.getClass().getSimpleName primaryKey = predictions.columns[0] predictions.select(primaryKey, self.label, "prediction").sample(False, 0.1, self.seed).show(50) # Collect Metrics metrics = OrderedDict() metrics["Method"] = str(self.predictor).split("_")[0] # TODO evaluator = RegressionEvaluator().setLabelCol(self.label) \ .setPredictionCol("prediction") \ .setMetricName("rmse") metrics["rmse"] = str(evaluator.evaluate(predictions)) return metrics