# raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r".\\diabetes.csv")
ds = spark.read.csv('diabetes.csv',inferSchema = True, header =True)
ds.show() # let me see a sample of the data
ds.count() # how many records are there in this dataset?
ds.columns # list all the headings
# the min value of 0 show that there is some human error in the dataset
ds.describe().select("Summary","Pregnancies","Glucose","BloodPressure").show()
dataset.describe().select("Summary","SkinThickness","Insulin").show()
ds.describe().select("Summary","BMI","DiabetesPedigreeFunction","Age").show()
ds.printSchema() # another view of the data
# replace min value of zeros with Nan as part of the data cleaning processs
import numpy as np
from pyspark.sql.functions import when
ds = ds.withColumn("Glucose",when(ds.Glucose==0,np.nan).otherwise (ds.Glucose))
ds = ds.withColumn("BloodPressure",when(ds.BloodPressure==0,np.nan).otherwise(ds.BloodPressure))
ds = ds.withColumn("SkinThickness",when(ds.SkinThickness==0,np.nan).otherwise(ds.SkinThickness))
ds = ds.withColumn("BMI",when(ds.BMI==0,np.nan).otherwise(ds.BMI))
ds = ds.withColumn("Insulin",when(ds.Insulin==0,np.nan).otherwise(ds.Insulin))
ds.select("Insulin","Glucose","BloodPressure","SkinThickness","BMI").show(5) # check if the data is clean
# impute - replace missig values with median / mode
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"],
outputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"])
model = imputer.fit(ds)
ds = model.transform(ds)
ds.show(5) # did everything work as expected?
more research required
# combine all the features into one single feature vector.
cols = ds.columns
cols.remove("Outcome")
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
ds = assembler.transform(ds)
ds.select("features").show(truncate=False)
In machine learning and pattern recognition, a feature is an individual measurable property or characteristic of a phenomenon being observed. Choosing informative, discriminating and independent features is a crucial step for effective algorithms in pattern recognition, classification and regression. Features are usually numeric, but structural features such as strings and graphs are used in syntactic pattern recognition. The concept of "feature" is related to that of explanatory variable used in statistical techniques such as linear regression. [https://en.wikipedia.org/wiki/Feature_(machine_learning)]
# Standard Scaler - scaling as described in [https://www.w3schools.com/python/python_ml_scale.asp]
from pyspark.ml.feature import StandardScaler
standardscaler = StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
ds = standardscaler.fit(ds).transform(ds)
ds.select("features","Scaled_features").show(5)
StandardScaler is an Estimator which can be fit on a dataset to produce a StandardScalerModel; this amounts to computing summary statistics. The model can then transform a Vector column in a dataset to have unit standard deviation and/or zero mean features.
Note that if the standard deviation of a feature is zero, it will return default 0.0 value in the Vector for that feature. [https://spark.apache.org/docs/latest/ml-features#standardscaler]
# Train, test split
train, test = ds.randomSplit([0.8, 0.2], seed=12345)
much more info needed
# imbalance in the dataset
ds_size = float(train.select("Outcome").count())
numPositives = train.select("Outcome").where('Outcome == 1').count()
per_ones = (float(numPositives)/float(ds_size))*100
numNegatives = float(ds_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))
BalancingRatio = numNegatives/ds_size
print('BalancingRatio = {}'.format(BalancingRatio))
# balance
train = train.withColumn("classWeights", when(train.Outcome == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)
more info needed
# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)
train = css.fit(train).transform(train)
test = css.fit(test).transform(test)
test.select("Aspect").show(5,truncate=False)
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Outcome", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model = lr.fit(train)
predict_train = model.transform(train)
predict_test = model.transform(test)
predict_test.select("Outcome","prediction").show(10)
my resuts differ from the lecture notes? more info required
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Outcome")
predict_test.select("Outcome","rawPrediction","prediction","probability").show(5)
import matplotlib.pyplot as plt
pr = model.summary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()
print(model.summary)
Some conclusions to make sense of the analysis!?