#imports and setup
from pyspark.sql import SparkSession
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.classification import (LogisticRegression, RandomForestClassifier, NaiveBayes)
from pyspark.sql.functions import (col, explode, array, lit)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, MulticlassClassificationEvaluator)
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName('HeartDiseaseClassification').getOrCreate()
#Schema of the table
df.printSchema()
root
|-- HeartDisease: string (nullable = true)
|-- BMI: double (nullable = true)
|-- Smoking: string (nullable = true)
|-- AlcoholDrinking: string (nullable = true)
|-- Stroke: string (nullable = true)
|-- PhysicalHealth: double (nullable = true)
|-- MentalHealth: double (nullable = true)
|-- DiffWalking: string (nullable = true)
|-- Sex: string (nullable = true)
|-- AgeCategory: string (nullable = true)
|-- Race: string (nullable = true)
|-- Diabetic: string (nullable = true)
|-- PhysicalActivity: string (nullable = true)
|-- GenHealth: string (nullable = true)
|-- SleepTime: double (nullable = true)
|-- Asthma: string (nullable = true)
|-- KidneyDisease: string (nullable = true)
|-- SkinCancer: string (nullable = true)
# stats of numerical variables
df.select(numerical_cols).describe().show()
+-------+-----------------+------------------+-----------------+-----------------+
|summary| BMI| PhysicalHealth| MentalHealth| SleepTime|
+-------+-----------------+------------------+-----------------+-----------------+
| count| 319795| 319795| 319795| 319795|
| mean|28.32539852092807|3.3717100017198516|3.898366140808956|7.097074688472302|
| stddev|6.356100200470741| 7.95085018257137|7.955235218943606|1.436007060964281|
| min| 12.02| 0.0| 0.0| 1.0|
| max| 94.85| 30.0| 30.0| 24.0|
+-------+-----------------+------------------+-----------------+-----------------+
#spliting df by classes
major_df = train_df.filter(col(label) == 'No')
minor_df = train_df.filter(col(label) == 'Yes')
#ratio of number observation major vs minor class
r = int(major_df.count()/minor_df.count())
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(r)]))).drop('dummy')
# combine both oversampled minority rows and previous majority rows
combined_train_df = major_df.unionAll(oversampled_df)
combined_train_df.groupBy(label).count().toPandas().plot.bar(x='HeartDisease', rot=0, title='Number of Observations in Train subset after Oversampling')
Out[164]: <AxesSubplot:title={'center':'Number of Observations in Train subset after Oversampling'}, xlabel='HeartDisease'>
# Indexers for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+'_indexed') for col in categorical_cols]
# Encoders for categorical columns
encoders = [OneHotEncoder(inputCol=col+'_indexed', outputCol=col+'_encoded') for col in categorical_cols]
# Indexer for classification label:
label_indexer = StringIndexer(inputCol=label, outputCol=label+'_indexed')
acc_lr = acc_evaluator.evaluate(pred_lr)
acc_rfc = acc_evaluator.evaluate(pred_rfc)
acc_nb = acc_evaluator.evaluate(pred_nb)
print('Logistic Regression accuracy: ', '{:.2f}'.format(acc_lr*100), '%', sep='')
print('Random Forest accuracy: ', '{:.2f}'.format(acc_rfc*100), '%', sep='')
print('Naive Bayes accuracy: ', '{:.2f}'.format(acc_nb*100), '%', sep='')
Logistic Regression accuracy: 76.02%
Random Forest accuracy: 74.45%
Naive Bayes accuracy: 81.55%
def confusion_matrix(pred_df):
preds_labels = pred_df.select(['prediction',label+'_indexed']).withColumn(label+'_indexed', F.col(label+'_indexed').cast(FloatType())).orderBy('prediction')
preds_labels = preds_and_labels.select(['prediction',label+'_indexed'])
metrics = MulticlassMetrics(preds_labels.rdd.map(tuple))
return metrics.confusionMatrix().toArray()
def confusion_matrix_plot(conf_mat, ax, title = 'Confusion Matrix'):
names = ['True Negative','False Positive','False Negative','True Positive']
number = ["{0:0.0f}".format(value) for value in conf_mat.flatten()]
percent = ["{0:.2%}".format(value) for value in conf_mat.flatten()/np.sum(conf_mat)]
labels = [f"{v1}\n\n{v2}\n\n{v3}" for v1, v2, v3 in zip(names, number, percent)]
labels = np.asarray(labels).reshape(2,2)
ax = sns.heatmap(conf_mat, annot=labels, fmt='', cmap='Blues', cbar=False, ax=ax)
ax.set_title(title+'\n');
ax.set_xlabel('\nPredicted Labels')
ax.set_ylabel('Real Labels');
ax.xaxis.set_ticklabels(['No','Yes'])
ax.yaxis.set_ticklabels(['No','Yes'])
return ax
fig, (ax1, ax2, ax3) = plt.subplots(1,3, figsize=(20,5))
ax1 = confusion_matrix_plot(conf_lr, ax1,'Logistic Regression - Confusion Matrix')
ax2 = confusion_matrix_plot(conf_rfc, ax2,'Random Forest Classifier - Confusion Matrix')
ax3 = confusion_matrix_plot(conf_nb, ax3, 'Naive Bayes - Confusion Matrix')
plt.show()
print('Logistic Regression sensitivity: ', (sensitivity(conf_lr)*100).round(2), '%', sep='')
print('Random Forest sensitivity: ', (sensitivity(conf_rfc)*100).round(2), '%', sep='')
print('Naive Bayes sensitivity: ', (sensitivity(conf_nb)*100).round(2), '%', sep='')
Logistic Regression sensitivity: 77.08%
Random Forest sensitivity: 69.15%
Naive Bayes sensitivity: 38.44%
Heart Disease Prediction - A Classification Problem using PySpark
Objective:
Topics and Methods Covered:
Objective of the Analysis: