import urllib.request # download the data
urllib.request.urlretrieve("https://www.dropbox.com/s/4xqg32ih9xoh5jq/UNSW-NB15.csv?dl=1", "UNSW-NB15.csv")
# set up the enviroment on google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.4-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("example").enableHiveSupport().getOrCreate()
unsw_rdd = spark.sparkContext.textFile("./UNSW-NB15.csv") #put the data in a spark rdd
unsw_rdd.count() #count the number of records
unsw_rdd.take(2) # print 2 records
def AddNormalLabel(line): #create a function to replace null values with the word 'normal'
line = line.split(",")
if line[47] == "":
line[47] = 'normal'
return line
unsw_update_rdd = unsw_rdd.map(AddNormalLabel) # apply the function and store it in another variable
unsw_update_rdd.take(2) # get and example of the function working
# print the connetion types at column 48
connections_rdd = unsw_update_rdd.map(lambda x: x[47].strip()).distinct()# use the index 47 to get the types
labels = connections_rdd.collect()
labels # print all the labels for a normal connection type
shellcode_rdd = unsw_update_rdd.filter(lambda line: 'Shellcode' in line) # count the numer of shellcode connections
shellcode_rdd.count()
DoS_rdd = unsw_update_rdd.filter(lambda line: 'DoS' in line)
DoS_rdd.count()
# print out the states of the protocol, which is at column 6
states_rdd = unsw_update_rdd.map(lambda x: x[5]).distinct()
States = states_rdd.collect()
States
def State_Count(line):
count = []
for state in line:
All_rdd = unsw_update_rdd.filter(lambda x: state in x[5])
count.append(All_rdd.count())
return count
State_Count(States)
services_rdd = unsw_update_rdd.map(lambda x: x[13]).distinct()
services_rdd.collect()
# summary statistics based on the label
import numpy as np
def Summary_with_Labels(line):
# keep just numeric and logical values
symbolic_indexes = [0,2,4,5,13,47]
clean_line = [item for i,item in enumerate(line) if i not in symbolic_indexes]
return (line[47], np.array([float(x) for x in clean_line]))
label_vector_data = unsw_update_rdd.map(Summary_with_Labels)
label_vector_data.take(2) # use the label as a key and the data as value
# use a filter on the 'label_vector_data' rdd to get 'normal' connections
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal")
# Sparks Mllib provides column summary stat. for rdd[vector] through the function colStats available in stat
# colStats() returns an instance of MultivariateStatisticalSummary, which contains:
# the column-wise max,min, mean, variance, and a number of nonzeros, as well as the total count
from pyspark.mllib.stat import Statistics
from math import sqrt
# collect and visualise results for 'normal' connections based on the "total duration" in column 8
normal_summary = Statistics.colStats(normal_label_data.values())
print("Duration Statistics for label: {}".format("normal"))
print("Mean: {}".format(normal_summary.mean()[7],3))
print("St. deviation: {}".format(round(sqrt(normal_summary.variance()[7]),3)))
print("Max value: {}".format(round(normal_summary.max()[7],3)))
print("Min value: {}".format(round(normal_summary.min()[7],3)))
print("Total value count: {}".format(normal_summary.count()))
print("Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))
# create a dataframe
features = ['srcip','sport','dstip','dsport','proto','state','dur','sbytes','dbytes','sttl','dttl','sloss','dloss','service',
'Sload','Dload','Spkts','Dpkts','swin','dwin','stcpb','dtcpb','smeansz','dmeansz','trans_depth','res_bdy_len',
'Sjit','Djit','Stime','Ltime','Sintpkt','Dintpkt','tcprtt','synack','ackdat','is_sm_ips_ports','ct_state_ttl',
'ct_flw_http_mthd','is_ftp_login','ct_ftp_cmd','ct_srv_src','ct_srv_dst','ct_dst_ltm','ct_src_ltm',
'ct_src_dport_ltm','ct_dst_sport_ltm','ct_dst_src_ltm','attack_cat','Label']
df_unsw = spark.createDataFrame(unsw_update_rdd, features)
# show descriptive statistics on 'sbytes' column
df_unsw.select('sbytes').describe().show()
df_unsw.select(['sloss', 'dloss']).describe().show()
# convert desired_features from a string to float/int
from pyspark.sql.functions import col
Desired_features = ['dur','sloss','dloss','swin','dwin']
df_unsw_cast = df_unsw.select(*(col(c).cast("float").alias(c) for c in Desired_features))
df_unsw_cast.show() # check the data
# calculates the approximate quantiles of a numerical column in a SparkDataFrame
Q1 = df_unsw_cast.approxQuantile('sloss', [0.25], 0)
Q2 = df_unsw_cast.approxQuantile('sloss', [0.5], 0)
Q3 = df_unsw_cast.approxQuantile('sloss', [0.75], 0)
print([Q1,Q2,Q3])
# calculate the mode
df_unsw_cast.groupBy("sloss").count().orderBy("count", ascending=False).first()[0]
# calculate the mode for a list of modes for all columns in df
[df_unsw_cast.groupBy(i).count().orderBy("count", ascending=False).first()[0] for i in df_unsw_cast.columns]
# identify which mode for which column, a 2D list
[[i,df_unsw_cast.groupBy(i).count().orderBy("count", ascending=False).first()[0]]for i in df_unsw_cast.columns]
from pyspark.sql import functions as f
# calculate the skewness
df_unsw_cast.select(f.skewness(df_unsw_cast['sloss']),f.skewness(df_unsw_cast['dloss'])).show()
# calcuate the kurtosis
df_unsw_cast.select(f.kurtosis(df_unsw_cast['sloss']),f.kurtosis(df_unsw_cast['dloss'])).show()