Source code for mmtfPyspark.ml.datasetClassifier
#!/usr/bin/env python
'''datasetClassifier.py
Runs binary and multi-class classifiers on a given dataset.
Dataset are read as Parquet file. The dataset must contain
a feature vector named "features" and a classification column.
The column name of the classification column must be specified
on the command lines.
'''
__author__ = "Mars (Shih-Cheng) Huang"
__maintainer__ = "Mars (Shih-Cheng) Huang"
__email__ = "marshuang80@gmail.com"
__version__ = "0.2.0"
__status__ = "Done"
from mmtfPyspark.ml import SparkMultiClassClassifier, datasetBalancer
from pyspark.sql import SparkSession
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, MultilayerPerceptronClassifier, RandomForestClassifier
import sys
import time
[docs]def main(argv):
# Name of prediction column
label = argv[1]
start = time.time()
spark = SparkSession.builder \
.master("local[*]") \
.appName("datasetClassifier") \
.getOrCreate()
data = spark.read.parquet(argv[0]).cache()
vector = data.first()["features"]
featureCount = len(vector)
print(f"Feature count : {featureCount}")
classCount = int(data.select(label).distinct().count())
print(f"Class count : {classCount}")
print(f"Dataset size (unbalanced) : {data.count()}")
data.groupby(label).count().show(classCount)
data = datasetBalancer.downsample(data, label, 1)
print(f"Dataset size (balanced) : {data.count()}")
data.groupby(label).count().show(classCount)
testFraction = 0.3
seed = 123
# DecisionTree
dtc = DecisionTreeClassifier()
mcc = SparkMultiClassClassifier(dtc, label, testFraction, seed)
matrics = mcc.fit(data)
for k,v in matrics.items(): print(f"{k}\t{v}")
# RandomForest
rfc = RandomForestClassifier()
mcc = SparkMultiClassClassifier(rfc, label, testFraction, seed)
matrics = mcc.fit(data)
for k,v in matrics.items(): print(f"{k}\t{v}")
# LogisticRegression
lr = LogisticRegression()
mcc = SparkMultiClassClassifier(lr, label, testFraction, seed)
matrics = mcc.fit(data)
for k,v in matrics.items(): print(f"{k}\t{v}")
# MultilayerPerceptronClassifier
layers = [featureCount, 10, classCount]
mpc = MultilayerPerceptronClassifier().setLayers(layers) \
.setBlockSize(128) \
.setSeed(1234) \
.setMaxIter(200)
mcc = SparkMultiClassClassifier(mpc, label, testFraction, seed)
matrics = mcc.fit(data)
for k,v in matrics.items(): print(f"{k}\t{v}")
end = time.time()
print("Time: %f sec." %(end-start))
if __name__ == "__main__":
if len(sys.argv) < 3:
raise Exception("python datasetClassifier.py <parquet file> <prediction column name>")
sys.exit()
main(sys.argv[1:])