In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
In [0]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.4-bin-hadoop2.7"
In [0]:
import findspark
findspark.init()
In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("example").enableHiveSupport().getOrCreate()
In [0]:
unsw_rdd = spark.sparkContext.textFile("./UNSW-NB15.csv")   # Import data into Spark's RDD
In [8]:
unsw_rdd.count()
Out[8]:
2539739
In [9]:
unsw_rdd.take(2)
Out[9]:
['59.166.0.3,56716,149.171.126.8,143,tcp,FIN,0.82546002,7812,16236,31,29,30,32,-,75090.25,156111.73,122,126,255,255,2751097753,2748686736,64,129,0,0,445.25928,474.9451,1421970774,1421970775,6.8190908,6.599896,5.9700001E-4,4.6899999E-4,0.000128,0,0,0,0,0,2,7,1,4,1,1,1,,0',
 '59.166.0.0,43467,149.171.126.6,49729,tcp,FIN,0.101815,4238,65628,31,29,7,30,-,328438.84,5087030.5,72,74,255,255,961515433,3225510659,59,887,0,0,0,91.579567,1421970775,1421970775,1.429493,1.387192,0.00068,5.4600002E-4,0.000134,0,0,0,0,0,7,4,1,6,1,1,1,,0']
In [0]:
def AddNormalLabel(line):
    line = line.split(",")
    if line[47] == "":
        line[47] = 'normal'
        
    return line
In [11]:
unsw_update_rdd = unsw_rdd.map(AddNormalLabel) # convert empty value to 'normal' in col. 47
unsw_update_rdd.take(2)   # to check the changes for the first two lines/rows
Out[11]:
[['59.166.0.3',
  '56716',
  '149.171.126.8',
  '143',
  'tcp',
  'FIN',
  '0.82546002',
  '7812',
  '16236',
  '31',
  '29',
  '30',
  '32',
  '-',
  '75090.25',
  '156111.73',
  '122',
  '126',
  '255',
  '255',
  '2751097753',
  '2748686736',
  '64',
  '129',
  '0',
  '0',
  '445.25928',
  '474.9451',
  '1421970774',
  '1421970775',
  '6.8190908',
  '6.599896',
  '5.9700001E-4',
  '4.6899999E-4',
  '0.000128',
  '0',
  '0',
  '0',
  '0',
  '0',
  '2',
  '7',
  '1',
  '4',
  '1',
  '1',
  '1',
  'normal',
  '0'],
 ['59.166.0.0',
  '43467',
  '149.171.126.6',
  '49729',
  'tcp',
  'FIN',
  '0.101815',
  '4238',
  '65628',
  '31',
  '29',
  '7',
  '30',
  '-',
  '328438.84',
  '5087030.5',
  '72',
  '74',
  '255',
  '255',
  '961515433',
  '3225510659',
  '59',
  '887',
  '0',
  '0',
  '0',
  '91.579567',
  '1421970775',
  '1421970775',
  '1.429493',
  '1.387192',
  '0.00068',
  '5.4600002E-4',
  '0.000134',
  '0',
  '0',
  '0',
  '0',
  '0',
  '7',
  '4',
  '1',
  '6',
  '1',
  '1',
  '1',
  'normal',
  '0']]
In [0]:
# print out the names of "connections type" at column 48
connections_rdd = unsw_update_rdd.map(lambda x: x[47].strip()).distinct() # then, get the index 47 for "connections"
Labels = connections_rdd.collect()
In [13]:
Labels
Out[13]:
['Shellcode',
 'Backdoors',
 'Exploits',
 'Reconnaissance',
 'Analysis',
 'normal',
 'Generic',
 'Worms',
 'DoS',
 'Backdoor',
 'Fuzzers']
In [14]:
shellcode_rdd = unsw_update_rdd.filter(lambda line: 'Shellcode' in line)  # count the number of "Shellcode" connections
shellcode_rdd.count()
Out[14]:
223
In [15]:
DoS_rdd = unsw_update_rdd.filter(lambda line: 'DoS' in line)  # count the number of "DoS" connections
DoS_rdd.count()
Out[15]:
16353
In [16]:
# print out the "states of the protocol", which is at column 6 
states_rdd = unsw_update_rdd.map(lambda x: x[5]).distinct() # then, get the 2nd index for "protocols"
States = states_rdd.collect()

States
Out[16]:
['CLO',
 'RST',
 'ECO',
 'MAS',
 'no',
 'INT',
 'FIN',
 'TST',
 'TXD',
 'CON',
 'REQ',
 'URN',
 'ACC',
 'URH',
 'PAR',
 'ECR']
In [0]:
def State_Count(line):
    count = []
    for state in line:
        All_rdd = unsw_update_rdd.filter(lambda x: state in x[5])
        count.append(All_rdd.count())
    return count
In [18]:
State_Count(States)
Out[18]:
[161, 528, 96, 7, 7, 490469, 1478689, 8, 5, 560588, 9043, 7, 43, 54, 26, 8]
In [19]:
# print out the "services", which is at column 14
services_rdd = unsw_update_rdd.map(lambda x: x[13]).distinct()
services_rdd.collect()
Out[19]:
['ssh',
 'ftp-data',
 'ftp',
 'dns',
 'smtp',
 '-',
 'irc',
 'dhcp',
 'radius',
 'snmp',
 'pop3',
 'ssl',
 'http']
In [0]:
import pandas as pd

link = "https://www.dropbox.com/s/2b0tbw4muxg4yr5/data_sample.csv?dl=1"
data = pd.read_csv(link)
In [21]:
data.sample(10) 
Out[21]:
2018-01-01 00:01:01;read;country_7;2458151261;SEO;North America
1 2018-01-01 00:04:01;read;country_7;2458151263;...
432 2018-01-01 05:56:42;read;country_8;2458151694;...
254 2018-01-01 03:28:10;read;country_7;2458151516;...
812 2018-01-01 11:05:46;read;country_6;2458152074;...
843 2018-01-01 11:27:31;read;country_4;2458152105;...
1037 2018-01-01 13:59:15;read;country_2;2458152299;...
1674 2018-01-01 22:43:25;read;country_5;2458152936;...
999 2018-01-01 13:30:47;read;country_7;2458152261;...
525 2018-01-01 07:05:45;read;country_6;2458151787;...
765 2018-01-01 10:20:37;read;country_7;2458152027;...
In [22]:
data.count()   # You can count the number of records, which is 1794 samples
Out[22]:
2018-01-01 00:01:01;read;country_7;2458151261;SEO;North America    1794
dtype: int64
In [23]:
# Since there is no header for data, it would better to understand each attribute by adding header

Column_names = ['DateTime', 'Event', 'Country', 'User_ID', 'Source', 'Topic']
data = pd.read_csv(link, delimiter=';', names = Column_names)

data.sample(10)
Out[23]:
DateTime Event Country User_ID Source Topic
984 2018-01-01 13:11:40 read country_2 2458152245 AdWords Europe
1040 2018-01-01 13:59:49 read country_2 2458152301 Reddit Africa
470 2018-01-01 06:21:29 read country_7 2458151731 Reddit North America
355 2018-01-01 04:44:38 read country_4 2458151616 Reddit Asia
1704 2018-01-01 22:59:53 read country_2 2458152965 Reddit Australia
463 2018-01-01 06:15:14 read country_5 2458151724 AdWords Europe
1744 2018-01-01 23:24:53 read country_5 2458153005 Reddit Africa
624 2018-01-01 08:26:29 read country_2 2458151885 Reddit Asia
786 2018-01-01 10:38:23 read country_4 2458152047 AdWords Australia
804 2018-01-01 10:57:15 read country_7 2458152065 Reddit Asia
In [24]:
data.head()
Out[24]:
DateTime Event Country User_ID Source Topic
0 2018-01-01 00:01:01 read country_7 2458151261 SEO North America
1 2018-01-01 00:03:20 read country_7 2458151262 SEO South America
2 2018-01-01 00:04:01 read country_7 2458151263 AdWords Africa
3 2018-01-01 00:04:02 read country_7 2458151264 AdWords Europe
4 2018-01-01 00:05:03 read country_8 2458151265 Reddit North America
In [25]:
data.tail()
Out[25]:
DateTime Event Country User_ID Source Topic
1790 2018-01-01 23:57:14 read country_2 2458153051 AdWords North America
1791 2018-01-01 23:58:33 read country_8 2458153052 SEO Asia
1792 2018-01-01 23:59:36 read country_6 2458153053 Reddit Asia
1793 2018-01-01 23:59:36 read country_7 2458153054 AdWords Europe
1794 2018-01-01 23:59:38 read country_5 2458153055 Reddit Asia
In [26]:
# Select specific columns of your dataframe

data[['Country','Event','Source']]
Out[26]:
Country Event Source
0 country_7 read SEO
1 country_7 read SEO
2 country_7 read AdWords
3 country_7 read AdWords
4 country_8 read Reddit
... ... ... ...
1790 country_2 read AdWords
1791 country_8 read SEO
1792 country_6 read Reddit
1793 country_7 read AdWords
1794 country_5 read Reddit

1795 rows × 3 columns

In [0]: