Heart Disease Classification with PySpark(Python)

Loading...

Heart Disease Prediction - A Classification Problem using PySpark

Objective:

  • The main objective of this small project was to implement a machine learning pipeline using PySpark after taking the udemy course Spark and Python for Big Data using Pyspark
  • All of the methods used may be done fairly efficiently without the need of Spark by utilizing packages such as Scikit-learn.

Topics and Methods Covered:

  • Pyspark
  • Data Visualization
  • Exploratory Data Analysis
  • Sampling for Unbalanced Datasets
  • Classification Algorithms:
    • Logistic Regression
    • Random Forest Classifier
    • Naive Bayes
  • Evaluation of classification models

Objective of the Analysis:

  • Develop a model capable of detecting people who may have undetected heart disease based on other health related metrics.
#imports and setup
from pyspark.sql import SparkSession
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.classification import (LogisticRegression, RandomForestClassifier, NaiveBayes)
from pyspark.sql.functions import (col, explode, array, lit)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, MulticlassClassificationEvaluator)
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
 
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
 
spark = SparkSession.builder.appName('HeartDiseaseClassification').getOrCreate()

Reading Data

Data originates from a CDC telephone survey on American Citizens. There's a significant volume of data - around 400k individuals.

Source https://www.kaggle.com/datasets/kamilpytlak/personal-key-indicators-of-heart-disease

# source file
display(dbutils.fs.ls("dbfs:/FileStore/tables/heart_2020.csv"))
 
path
name
size
modificationTime
1
dbfs:/FileStore/tables/heart_2020.csv
heart_2020.csv
25189554
1648329065000

Showing all 1 rows.

df = spark.read.csv('dbfs:/FileStore/tables/heart_2020.csv',inferSchema=True,header=True)
display(df.head(5))
 
HeartDisease
BMI
Smoking
AlcoholDrinking
Stroke
PhysicalHealth
MentalHealth
DiffWalking
Sex
AgeCategory
Race
Diabetic
1
2
3
4
5
No
16.6
Yes
No
No
3
30
No
Female
55-59
White
Yes
No
20.34
No
No
Yes
0
0
No
Female
80 or older
White
No
No
26.58
Yes
No
No
20
30
No
Male
65-69
White
Yes
No
24.21
No
No
No
0
0
No
Female
75-79
White
No
No
23.71
No
No
No
28
0
Yes
Female
40-44
White
No

Showing all 5 rows.

#Schema of the table
df.printSchema()
root |-- HeartDisease: string (nullable = true) |-- BMI: double (nullable = true) |-- Smoking: string (nullable = true) |-- AlcoholDrinking: string (nullable = true) |-- Stroke: string (nullable = true) |-- PhysicalHealth: double (nullable = true) |-- MentalHealth: double (nullable = true) |-- DiffWalking: string (nullable = true) |-- Sex: string (nullable = true) |-- AgeCategory: string (nullable = true) |-- Race: string (nullable = true) |-- Diabetic: string (nullable = true) |-- PhysicalActivity: string (nullable = true) |-- GenHealth: string (nullable = true) |-- SleepTime: double (nullable = true) |-- Asthma: string (nullable = true) |-- KidneyDisease: string (nullable = true) |-- SkinCancer: string (nullable = true)

Data Labels

The table, as seen above, contains only four numerical columns. Columns are classified into three categories:

  • label: the label of the data ('HeartDisease') used for classification;
  • numerical_cols: the names of the 4 numerical columns;
  • categorical_cols: the remanining nominal columns.
label = 'HeartDisease'
numerical_cols = ['BMI', 'PhysicalHealth','MentalHealth','SleepTime']
categorical_cols = list(set(df.columns) - set(numerical_cols) -set([label]))

Data Distribution

# stats of numerical variables
df.select(numerical_cols).describe().show()
+-------+-----------------+------------------+-----------------+-----------------+ |summary| BMI| PhysicalHealth| MentalHealth| SleepTime| +-------+-----------------+------------------+-----------------+-----------------+ | count| 319795| 319795| 319795| 319795| | mean|28.32539852092807|3.3717100017198516|3.898366140808956|7.097074688472302| | stddev|6.356100200470741| 7.95085018257137|7.955235218943606|1.436007060964281| | min| 12.02| 0.0| 0.0| 1.0| | max| 94.85| 30.0| 30.0| 24.0| +-------+-----------------+------------------+-----------------+-----------------+
# check number of observations of differente samples
df.groupBy(label).count().toPandas().plot.bar(x='HeartDisease', rot=0, title='Number of Observations per label')
Out[163]: <AxesSubplot:title={'center':'Number of Observations per label'}, xlabel='HeartDisease'>

Preparing Data for Classification Models

Working with an Unbalanced Dataset. Oversampling Smallest Class

As can be seen above, this dataset is extremely imbalanced. This is frequent in disease-related datasets.

In this section, I will perform oversampling on the smaller class to lessen the bias of the classification models.

#splitting data into train and test sets before Oversampling
train_df, test_df = df.randomSplit([.7,.3])
#spliting df by classes
major_df = train_df.filter(col(label) == 'No')
minor_df = train_df.filter(col(label) == 'Yes')
#ratio of number observation major vs minor class
r = int(major_df.count()/minor_df.count())
 
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(r)]))).drop('dummy')
 
# combine both oversampled minority rows and previous majority rows 
combined_train_df = major_df.unionAll(oversampled_df)
                                                           
combined_train_df.groupBy(label).count().toPandas().plot.bar(x='HeartDisease', rot=0, title='Number of Observations in Train subset after Oversampling')
Out[164]: <AxesSubplot:title={'center':'Number of Observations in Train subset after Oversampling'}, xlabel='HeartDisease'>

Processing Categorical Columns for Spark Pipeline

String columns cannot be used as input to Spark. To address this, I'll need to employ an indexer on these columns, followed by an encoding.

I also need to vectorize all the features into a features column after I have all columns with numerical values.

# Indexers for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+'_indexed') for col in categorical_cols]
# Encoders for categorical columns
encoders = [OneHotEncoder(inputCol=col+'_indexed', outputCol=col+'_encoded') for col in categorical_cols]
 
# Indexer for classification label:
label_indexer = StringIndexer(inputCol=label, outputCol=label+'_indexed')
#assemble all features as vector to be used as input for Spark MLLib
assembler = VectorAssembler(inputCols= [col+'_encoded' for col in categorical_cols] + numerical_cols, outputCol='features')
# Creating data processing pipeline
pipeline = Pipeline(stages= indexers + encoders + [label_indexer, assembler])

Applying Classification Models

Models Implemented:

  • lr - Logistic Regression
  • rfc - Random Forest Classifier
  • nb - Naive Bayes
lr = LogisticRegression(featuresCol='features', labelCol=label+'_indexed')
rfc = RandomForestClassifier(featuresCol='features', labelCol=label+'_indexed', numTrees=100)
nb = NaiveBayes(featuresCol='features', labelCol=label+'_indexed')
# creating pipelines with machine learning models
pipeline_lr = Pipeline(stages=[pipeline, lr])
pipeline_rfc = Pipeline(stages=[pipeline, rfc])
pipeline_nb = Pipeline(stages=[pipeline, nb])
#fitting models with train subset
lr_fit = pipeline_lr.fit(combined_train_df)
rfc_fit = pipeline_rfc.fit(combined_train_df)
nb_fit = pipeline_nb.fit(combined_train_df)
# predictions for test subset
pred_lr = lr_fit.transform(test_df)
pred_rfc = rfc_fit.transform(test_df)
pred_nb = nb_fit.transform(test_df)

Evaluating Results

Area Under Curve - AUC

The AUC of a random selection of labels is 0.5. The closer this metric is to one, the better the model predicts the data labels.

Regarding this metric, the logistic regression model outperforms the Random Forest Classifier. The Naive Bayes Classifier performed the poorest.

pred_AUC = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol=label+'_indexed')
AUC_lr = pred_AUC.evaluate(pred_lr)
AUC_rfc = pred_AUC.evaluate(pred_rfc)
AUC_nb = pred_AUC.evaluate(pred_nb)
print(AUC_lr, AUC_rfc, AUC_nb)
0.7650009859431405 0.7205029592712221 0.6200260624247653

Accuracy - A poor Evaluation Metric for Unbalanced Classification

Accuracy is a common metric used when evaluating classification problems. It is calculated by

TP+TNAll Samples\frac{TP + TN}{\textit{All Samples}}

Where TP = True Positives and TN = True Negatives

Note that for this particular case this is not the best metric because the Negative label represents the grand majority of the observation.

As an extreme example, if I predicted that all observations would be negative for heart disease, the accuracy for this test subgroup would be 91.48 percent.

Looking at the results for these three models, the Naive Bayes has the best accuracy while being the lowest performing model in terms of TP. When analyzing these models, special emphasis should be placed on the TP cases.

# calculating accuracy for all negative prediction mentioned above
acc_all_negative = test_df.filter(test_df[label]=='No').count() / test_df.count()
acc_all_negative
Out[52]: 0.9148274499199135
acc_evaluator = MulticlassClassificationEvaluator(labelCol=label+'_indexed', predictionCol="prediction", metricName="accuracy")
acc_lr = acc_evaluator.evaluate(pred_lr)
acc_rfc = acc_evaluator.evaluate(pred_rfc)
acc_nb = acc_evaluator.evaluate(pred_nb)
 
print('Logistic Regression accuracy: ', '{:.2f}'.format(acc_lr*100), '%', sep='')
print('Random Forest accuracy: ', '{:.2f}'.format(acc_rfc*100), '%', sep='')
print('Naive Bayes accuracy: ', '{:.2f}'.format(acc_nb*100), '%', sep='')
Logistic Regression accuracy: 76.02% Random Forest accuracy: 74.45% Naive Bayes accuracy: 81.55%

Confusion Matrices

def confusion_matrix(pred_df):
    preds_labels = pred_df.select(['prediction',label+'_indexed']).withColumn(label+'_indexed', F.col(label+'_indexed').cast(FloatType())).orderBy('prediction')
    preds_labels = preds_and_labels.select(['prediction',label+'_indexed'])
    metrics = MulticlassMetrics(preds_labels.rdd.map(tuple))
    return metrics.confusionMatrix().toArray()
def confusion_matrix_plot(conf_mat, ax, title = 'Confusion Matrix'):
    names = ['True Negative','False Positive','False Negative','True Positive']
 
    number = ["{0:0.0f}".format(value) for value in conf_mat.flatten()]
 
    percent = ["{0:.2%}".format(value) for value in conf_mat.flatten()/np.sum(conf_mat)]
 
    labels = [f"{v1}\n\n{v2}\n\n{v3}" for v1, v2, v3 in zip(names, number, percent)]
 
    labels = np.asarray(labels).reshape(2,2)
 
    ax = sns.heatmap(conf_mat, annot=labels, fmt='', cmap='Blues', cbar=False, ax=ax)
 
    ax.set_title(title+'\n');
    ax.set_xlabel('\nPredicted Labels')
    ax.set_ylabel('Real Labels');
 
    ax.xaxis.set_ticklabels(['No','Yes'])
    ax.yaxis.set_ticklabels(['No','Yes'])
    
    return ax
conf_lr = confusion_matrix(pred_lr)
conf_rfc = confusion_matrix(pred_rfc)
conf_nb = confusion_matrix(pred_nb)
fig, (ax1, ax2, ax3) = plt.subplots(1,3, figsize=(20,5))
 
ax1 = confusion_matrix_plot(conf_lr, ax1,'Logistic Regression - Confusion Matrix')
ax2 = confusion_matrix_plot(conf_rfc, ax2,'Random Forest Classifier - Confusion Matrix')
ax3 = confusion_matrix_plot(conf_nb, ax3, 'Naive Bayes - Confusion Matrix')
 
plt.show()
 

Sensitivity Metric

Sensitivity is the True Positive Rate of the classification:

TPTP+FN\frac{TP}{TP + FN}

where TP = True Positive and FN = False Negative.

It is a measure of how well the Positive label is predicted.

def sensitivity(conf_mat):
    TP = conf_mat[1][1]
    FN = conf_mat[1][0]
    result = TP / (TP + FN)
    return result
print('Logistic Regression sensitivity: ', (sensitivity(conf_lr)*100).round(2), '%', sep='')
print('Random Forest sensitivity: ', (sensitivity(conf_rfc)*100).round(2), '%', sep='')
print('Naive Bayes sensitivity: ', (sensitivity(conf_nb)*100).round(2), '%', sep='')
Logistic Regression sensitivity: 77.08% Random Forest sensitivity: 69.15% Naive Bayes sensitivity: 38.44%

Results

  • The best performing model was Logistic Regression;
  • The true positive rate was 77%. This indicates that 77 percent of heart disease patients were appropriately identified;
  • The model's False Positive rate (or Specificity) is high, although lowering this statistic is not the primary goal.

Overall, the Logistic Regression model yields useful results (sensitivity higher than 50% = better than random guess). Despite this, 77 percent is still a low percentage for classification algorithms.