Source code for mmtfPyspark.ml.sparkMultiClassClassifier
#!/usr/bin/env python
'''sparkMultiClassClassifier.py
Fits a multi-class classification model using mllib classification method and
returns classification metrics.
'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from collections import OrderedDict
import time
[docs]class SparkMultiClassClassifier(object):
'''Fits a multi-class classification model using mllib classification method and
returns classification metrics.
Attributes
----------
predictor
type of multi-class classifier
label : str
classification label
testFraction : float
test set fraction [0.3]
seed : int
random seed
'''
def __init__(self, predictor, label, testFraction=0.3, seed=1):
self.predictor = predictor
self.label = label
self.testFraction = testFraction
self.seed = seed
[docs] def fit(self, data):
'''Dataset must at least contain the following two columns:
label: the class labels
features: feature vector
Parameters
----------
data : Dataset<Row>
input data
Returns
-------
dict
map with metrics
'''
start = time.time()
classCount = int(data.select(self.label).distinct().count())
labelIndexer = StringIndexer().setInputCol(self.label) \
.setOutputCol("indexedLabel") \
.fit(data)
# Split the data into training and test sets (30% held out for testing)
splits = data.randomSplit(
[1.0 - self.testFraction, self.testFraction], self.seed)
trainingData = splits[0]
testData = splits[1]
labels = labelIndexer.labels
print("\n Class\tTrain\tTest")
for l in labels:
print("%s\t%i\t%i" % (l \
,(trainingData.filter(trainingData[self.label] == l)).count() \
,(testData.filter(testData[self.label] == l)).count() \
)
)
# Set input columns
self.predictor.setLabelCol("indexedLabel").setFeaturesCol("features")
# Convert indexed labels back to original labels
labelConverter = IndexToString().setInputCol("prediction") \
.setOutputCol("predictedLabel") \
.setLabels(labelIndexer.labels)
# Chain indexers and forest ina Pipline
pipeline = Pipeline().setStages([labelIndexer, self.predictor, labelConverter])
# Train model. This also runs the indexers
model = pipeline.fit(trainingData)
# Make predictions
predictions = model.transform(testData).cache()
# Display some sample predictions
print(f"\nSample predictions: {str(self.predictor).split('_')[0]}") # TODO predictor.getClass().getSimpleName()
predictions.sample(False, 0.1, self.seed).show(5)
predictions = predictions.withColumnRenamed(self.label, "stringLabel")
predictions = predictions.withColumnRenamed("indexedLabel", self.label)
# Collect metrics
pred = predictions.select("prediction", self.label)
metrics = OrderedDict()
metrics["Method"] = str(self.predictor).split('_')[0]
if classCount == 2:
b = BinaryClassificationMetrics(pred.rdd)
metrics["AUC"] = str(b.areaUnderROC)
m = MulticlassMetrics(pred.rdd)
metrics["F"] = str(m.weightedFMeasure())
metrics["Accuracy"] = str(m.accuracy)
metrics["Precision"] = str(m.weightedPrecision)
metrics["Recall"] = str(m.weightedRecall)
metrics["False Positive Rate"] = str(m.weightedFalsePositiveRate)
metrics["True Positive Rate"] = str(m.weightedTruePositiveRate)
metrics[""] = f"\nConfusion Matrix\n{labels}\n{m.confusionMatrix()}"
end = time.time()
print(f"Total time taken: {end-start}\n")
return metrics