Descriptive Statistics on Big Data

In [0]:
import urllib.request # download the data

urllib.request.urlretrieve("https://www.dropbox.com/s/4xqg32ih9xoh5jq/UNSW-NB15.csv?dl=1", "UNSW-NB15.csv")
Out[0]:
('UNSW-NB15.csv', <http.client.HTTPMessage at 0x7f7bf2b7a4a8>)
In [ ]:
# set up the enviroment on google colab
In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
In [0]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.4-bin-hadoop2.7"
In [0]:
import findspark
findspark.init()
In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("example").enableHiveSupport().getOrCreate()
In [0]:
unsw_rdd = spark.sparkContext.textFile("./UNSW-NB15.csv") #put the data in a spark rdd
In [0]:
unsw_rdd.count() #count the number of records
Out[0]:
2539739
In [0]:
unsw_rdd.take(2) # print 2 records
Out[0]:
['59.166.0.3,56716,149.171.126.8,143,tcp,FIN,0.82546002,7812,16236,31,29,30,32,-,75090.25,156111.73,122,126,255,255,2751097753,2748686736,64,129,0,0,445.25928,474.9451,1421970774,1421970775,6.8190908,6.599896,5.9700001E-4,4.6899999E-4,0.000128,0,0,0,0,0,2,7,1,4,1,1,1,,0',
 '59.166.0.0,43467,149.171.126.6,49729,tcp,FIN,0.101815,4238,65628,31,29,7,30,-,328438.84,5087030.5,72,74,255,255,961515433,3225510659,59,887,0,0,0,91.579567,1421970775,1421970775,1.429493,1.387192,0.00068,5.4600002E-4,0.000134,0,0,0,0,0,7,4,1,6,1,1,1,,0']
In [0]:
def AddNormalLabel(line): #create a function to replace null values with the word 'normal'
    line = line.split(",")
    if line[47] == "":
        line[47] = 'normal'
        
    return line
In [0]:
unsw_update_rdd = unsw_rdd.map(AddNormalLabel) # apply the function and store it in another variable
unsw_update_rdd.take(2) # get and example of the function working
Out[0]:
[['59.166.0.3',
  '56716',
  '149.171.126.8',
  '143',
  'tcp',
  'FIN',
  '0.82546002',
  '7812',
  '16236',
  '31',
  '29',
  '30',
  '32',
  '-',
  '75090.25',
  '156111.73',
  '122',
  '126',
  '255',
  '255',
  '2751097753',
  '2748686736',
  '64',
  '129',
  '0',
  '0',
  '445.25928',
  '474.9451',
  '1421970774',
  '1421970775',
  '6.8190908',
  '6.599896',
  '5.9700001E-4',
  '4.6899999E-4',
  '0.000128',
  '0',
  '0',
  '0',
  '0',
  '0',
  '2',
  '7',
  '1',
  '4',
  '1',
  '1',
  '1',
  'normal',
  '0'],
 ['59.166.0.0',
  '43467',
  '149.171.126.6',
  '49729',
  'tcp',
  'FIN',
  '0.101815',
  '4238',
  '65628',
  '31',
  '29',
  '7',
  '30',
  '-',
  '328438.84',
  '5087030.5',
  '72',
  '74',
  '255',
  '255',
  '961515433',
  '3225510659',
  '59',
  '887',
  '0',
  '0',
  '0',
  '91.579567',
  '1421970775',
  '1421970775',
  '1.429493',
  '1.387192',
  '0.00068',
  '5.4600002E-4',
  '0.000134',
  '0',
  '0',
  '0',
  '0',
  '0',
  '7',
  '4',
  '1',
  '6',
  '1',
  '1',
  '1',
  'normal',
  '0']]
In [0]:
# print the connetion types at column 48
connections_rdd = unsw_update_rdd.map(lambda x: x[47].strip()).distinct()# use the index 47 to get the types
labels = connections_rdd.collect()
In [0]:
labels # print all the labels for a normal connection type
Out[0]:
['Shellcode',
 'Backdoors',
 'Exploits',
 'Reconnaissance',
 'Analysis',
 'normal',
 'Generic',
 'Worms',
 'DoS',
 'Backdoor',
 'Fuzzers']
In [0]:
shellcode_rdd = unsw_update_rdd.filter(lambda line: 'Shellcode' in line) # count the numer of shellcode connections
shellcode_rdd.count()
Out[0]:
223
In [0]:
DoS_rdd = unsw_update_rdd.filter(lambda line: 'DoS' in line)
DoS_rdd.count()
Out[0]:
16353
In [0]:
# print out the states of the protocol, which is at column 6
states_rdd = unsw_update_rdd.map(lambda x: x[5]).distinct()
States = states_rdd.collect()
In [0]:
States
Out[0]:
['CLO',
 'RST',
 'ECO',
 'MAS',
 'no',
 'INT',
 'FIN',
 'TST',
 'TXD',
 'CON',
 'REQ',
 'URN',
 'ACC',
 'URH',
 'PAR',
 'ECR']
In [0]:
def State_Count(line):
    count = []
    for state in line:
        All_rdd = unsw_update_rdd.filter(lambda x: state in x[5])
        count.append(All_rdd.count())
    return count 
In [0]:
State_Count(States)
Out[0]:
[161, 528, 96, 7, 7, 490469, 1478689, 8, 5, 560588, 9043, 7, 43, 54, 26, 8]
In [0]:
services_rdd = unsw_update_rdd.map(lambda x: x[13]).distinct()
services_rdd.collect()
Out[0]:
['ssh',
 'ftp-data',
 'ftp',
 'dns',
 'smtp',
 '-',
 'irc',
 'dhcp',
 'radius',
 'snmp',
 'pop3',
 'ssl',
 'http']
In [0]:
# summary statistics based on the label
import numpy as np

def Summary_with_Labels(line):
    # keep just numeric and logical values
    symbolic_indexes = [0,2,4,5,13,47]
    clean_line = [item for i,item in enumerate(line) if i not in symbolic_indexes]
    return (line[47], np.array([float(x) for x in clean_line]))
In [0]:
label_vector_data = unsw_update_rdd.map(Summary_with_Labels)
In [0]:
label_vector_data.take(2) # use the label as a key and the data as value
Out[0]:
[('normal',
  array([5.67160000e+04, 1.43000000e+02, 8.25460020e-01, 7.81200000e+03,
         1.62360000e+04, 3.10000000e+01, 2.90000000e+01, 3.00000000e+01,
         3.20000000e+01, 7.50902500e+04, 1.56111730e+05, 1.22000000e+02,
         1.26000000e+02, 2.55000000e+02, 2.55000000e+02, 2.75109775e+09,
         2.74868674e+09, 6.40000000e+01, 1.29000000e+02, 0.00000000e+00,
         0.00000000e+00, 4.45259280e+02, 4.74945100e+02, 1.42197077e+09,
         1.42197078e+09, 6.81909080e+00, 6.59989600e+00, 5.97000010e-04,
         4.68999990e-04, 1.28000000e-04, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.00000000e+00,
         7.00000000e+00, 1.00000000e+00, 4.00000000e+00, 1.00000000e+00,
         1.00000000e+00, 1.00000000e+00, 0.00000000e+00])),
 ('normal',
  array([4.34670000e+04, 4.97290000e+04, 1.01815000e-01, 4.23800000e+03,
         6.56280000e+04, 3.10000000e+01, 2.90000000e+01, 7.00000000e+00,
         3.00000000e+01, 3.28438840e+05, 5.08703050e+06, 7.20000000e+01,
         7.40000000e+01, 2.55000000e+02, 2.55000000e+02, 9.61515433e+08,
         3.22551066e+09, 5.90000000e+01, 8.87000000e+02, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00, 9.15795670e+01, 1.42197078e+09,
         1.42197078e+09, 1.42949300e+00, 1.38719200e+00, 6.80000000e-04,
         5.46000020e-04, 1.34000000e-04, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 7.00000000e+00,
         4.00000000e+00, 1.00000000e+00, 6.00000000e+00, 1.00000000e+00,
         1.00000000e+00, 1.00000000e+00, 0.00000000e+00]))]
In [0]:
# use a filter on the 'label_vector_data' rdd to get 'normal' connections
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal")
In [0]:
# Sparks Mllib provides column summary stat. for rdd[vector] through the function colStats available in stat
# colStats() returns an instance of MultivariateStatisticalSummary, which contains:
# the column-wise max,min, mean, variance, and a number of nonzeros, as well as the total count
from pyspark.mllib.stat import Statistics
from math import sqrt
In [0]:
# collect and visualise results for 'normal' connections based on the "total duration" in column 8

normal_summary = Statistics.colStats(normal_label_data.values())
print("Duration Statistics for label: {}".format("normal"))
print("Mean: {}".format(normal_summary.mean()[7],3))
print("St. deviation: {}".format(round(sqrt(normal_summary.variance()[7]),3)))
print("Max value: {}".format(round(normal_summary.max()[7],3)))
print("Min value: {}".format(round(normal_summary.min()[7],3)))
print("Total value count: {}".format(normal_summary.count()))
print("Number of non-zero values: {}".format(normal_summary.numNonzeros()[0]))
Duration Statistics for label: normal
Mean: 5.5354611495562756
St. deviation: 8.848
Max value: 560.0
Min value: 0.0
Total value count: 2218456
Number of non-zero values: 2203800.0
In [0]:
# create a dataframe

features = ['srcip','sport','dstip','dsport','proto','state','dur','sbytes','dbytes','sttl','dttl','sloss','dloss','service',
            'Sload','Dload','Spkts','Dpkts','swin','dwin','stcpb','dtcpb','smeansz','dmeansz','trans_depth','res_bdy_len',
            'Sjit','Djit','Stime','Ltime','Sintpkt','Dintpkt','tcprtt','synack','ackdat','is_sm_ips_ports','ct_state_ttl',
            'ct_flw_http_mthd','is_ftp_login','ct_ftp_cmd','ct_srv_src','ct_srv_dst','ct_dst_ltm','ct_src_ltm',
            'ct_src_dport_ltm','ct_dst_sport_ltm','ct_dst_src_ltm','attack_cat','Label']

df_unsw = spark.createDataFrame(unsw_update_rdd, features)
In [0]:
# show descriptive statistics on 'sbytes' column
df_unsw.select('sbytes').describe().show()
+-------+------------------+
|summary|            sbytes|
+-------+------------------+
|  count|           2539739|
|   mean| 4340.072263330996|
| stddev|56409.398122862396|
|    min|                 0|
|    max|              9999|
+-------+------------------+

In [0]:
df_unsw.select(['sloss', 'dloss']).describe().show()
+-------+------------------+------------------+
|summary|             sloss|             dloss|
+-------+------------------+------------------+
|  count|           2539739|           2539739|
|   mean| 5.164547223159545|16.331423819534212|
| stddev|22.518368387797217| 56.59789035320461|
|    min|                 0|                 0|
|    max|               999|               999|
+-------+------------------+------------------+

In [0]:
# convert desired_features from a string to float/int
from pyspark.sql.functions import col

Desired_features = ['dur','sloss','dloss','swin','dwin']
df_unsw_cast = df_unsw.select(*(col(c).cast("float").alias(c) for c in Desired_features))
In [0]:
df_unsw_cast.show() # check the data
+--------+-----+-----+-----+-----+
|     dur|sloss|dloss| swin| dwin|
+--------+-----+-----+-----+-----+
| 0.82546| 30.0| 32.0|255.0|255.0|
|0.101815|  7.0| 30.0|255.0|255.0|
|0.044003|  7.0| 17.0|255.0|255.0|
| 2.79083| 16.0|143.0|255.0|255.0|
|  2.6335| 21.0|197.0|255.0|255.0|
|0.115048|  6.0|  6.0|255.0|255.0|
|0.003362|  0.0|  0.0|  0.0|  0.0|
|0.453052|  1.0|  4.0|255.0|255.0|
|0.001088|  0.0|  0.0|  0.0|  0.0|
| 9.69E-4|  0.0|  0.0|  0.0|  0.0|
|0.001063|  0.0|  0.0|  0.0|  0.0|
|0.265011|  4.0|  1.0|255.0|255.0|
|0.517128|  4.0|  4.0|255.0|255.0|
|0.005339|  0.0|  0.0|  0.0|  0.0|
|0.001739|  0.0|  0.0|  0.0|  0.0|
|0.001018|  0.0|  0.0|  0.0|  0.0|
|0.001044|  0.0|  0.0|  0.0|  0.0|
| 9.87E-4|  0.0|  0.0|  0.0|  0.0|
|2.254712| 11.0| 15.0|255.0|255.0|
|0.051221|  7.0| 17.0|255.0|255.0|
+--------+-----+-----+-----+-----+
only showing top 20 rows

In [0]:
# calculates the approximate quantiles of a numerical column in a SparkDataFrame
Q1 = df_unsw_cast.approxQuantile('sloss', [0.25], 0)
Q2 = df_unsw_cast.approxQuantile('sloss', [0.5], 0)
Q3 = df_unsw_cast.approxQuantile('sloss', [0.75], 0)

print([Q1,Q2,Q3])
[[0.0], [3.0], [7.0]]
In [0]:
# calculate the mode
df_unsw_cast.groupBy("sloss").count().orderBy("count", ascending=False).first()[0]
Out[0]:
0.0
In [0]:
# calculate the mode for a list of modes for all columns in df
[df_unsw_cast.groupBy(i).count().orderBy("count", ascending=False).first()[0] for i in df_unsw_cast.columns]
Out[0]:
[9.000000318337698e-06, 0.0, 0.0, 255.0, 255.0]
In [47]:
# identify which mode for which column, a 2D list
[[i,df_unsw_cast.groupBy(i).count().orderBy("count", ascending=False).first()[0]]for i in df_unsw_cast.columns]
Out[47]:
[['dur', 9.000000318337698e-06],
 ['sloss', 0.0],
 ['dloss', 0.0],
 ['swin', 255.0],
 ['dwin', 255.0]]
In [0]:
from pyspark.sql import functions as f
In [52]:
# calculate the skewness
df_unsw_cast.select(f.skewness(df_unsw_cast['sloss']),f.skewness(df_unsw_cast['dloss'])).show()
+------------------+-----------------+
|   skewness(sloss)|  skewness(dloss)|
+------------------+-----------------+
|118.73321194469554|9.428047440525845|
+------------------+-----------------+

In [53]:
# calcuate the kurtosis
df_unsw_cast.select(f.kurtosis(df_unsw_cast['sloss']),f.kurtosis(df_unsw_cast['dloss'])).show()
+------------------+------------------+
|   kurtosis(sloss)|   kurtosis(dloss)|
+------------------+------------------+
|18651.336100274766|321.61059597972456|
+------------------+------------------+

In [0]: