Date: 11.03.2020

Data Preprocessing

jupyter-notebook start the session

In [5]:
!pip install -q findspark
In [8]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')

$spark_home start the session

In [2]:
import urllib.request # import the data

urllib.request.urlretrieve("https://www.dropbox.com/s/4xqg32ih9xoh5jq/UNSW-NB15.csv?dl=1", "UNSW-NB15.csv")
Out[2]:
('UNSW-NB15.csv', <http.client.HTTPMessage at 0x7f7a5960b0d0>)
In [2]:
unsw_rdd = sc.textFile("./UNSW-NB15.csv")# read the data into an rdd
In [3]:
def AddNormalLabel(line):
    line = line.split(",")
    if line[47].strip() == "Fuzzers": # conver strings to numbers
        line[47] = '1'
    elif line[47].strip() == "Analysis":
        line[47] = '2'  
    elif (line[47].strip() == "Backdoors") or (line[47].strip() == "Backdoor"):
        line[47] = '3'   
    elif line[47].strip() == "DoS":
        line[47] = '4'   
    elif line[47].strip() == "Exploits":
        line[47] = '5'
    elif line[47].strip() == "Generic":
        line[47] = '6' 
    elif line[47].strip() == "Reconnaissance":
        line[47] = '7' 
    elif line[47].strip() == "Shellcode":
        line[47] = '8'
    elif line[47].strip() == 'Worms':
        line[47] = '9'
    elif line[47].strip() == "":
        line[47] = '10' 
    
    return line
In [4]:
unsw_update_rdd = unsw_rdd.map(AddNormalLabel)
unsw_update_rdd.take(2)
Out[4]:
[['59.166.0.3',
  '56716',
  '149.171.126.8',
  '143',
  'tcp',
  'FIN',
  '0.82546002',
  '7812',
  '16236',
  '31',
  '29',
  '30',
  '32',
  '-',
  '75090.25',
  '156111.73',
  '122',
  '126',
  '255',
  '255',
  '2751097753',
  '2748686736',
  '64',
  '129',
  '0',
  '0',
  '445.25928',
  '474.9451',
  '1421970774',
  '1421970775',
  '6.8190908',
  '6.599896',
  '5.9700001E-4',
  '4.6899999E-4',
  '0.000128',
  '0',
  '0',
  '0',
  '0',
  '0',
  '2',
  '7',
  '1',
  '4',
  '1',
  '1',
  '1',
  '10',
  '0'],
 ['59.166.0.0',
  '43467',
  '149.171.126.6',
  '49729',
  'tcp',
  'FIN',
  '0.101815',
  '4238',
  '65628',
  '31',
  '29',
  '7',
  '30',
  '-',
  '328438.84',
  '5087030.5',
  '72',
  '74',
  '255',
  '255',
  '961515433',
  '3225510659',
  '59',
  '887',
  '0',
  '0',
  '0',
  '91.579567',
  '1421970775',
  '1421970775',
  '1.429493',
  '1.387192',
  '0.00068',
  '5.4600002E-4',
  '0.000134',
  '0',
  '0',
  '0',
  '0',
  '0',
  '7',
  '4',
  '1',
  '6',
  '1',
  '1',
  '1',
  '10',
  '0']]
In [5]:
connections_rdd = unsw_update_rdd.map(lambda x:x[47].strip()).distinct()
Labels = connections_rdd.collect()
In [6]:
Labels
Out[6]:
['10', '8', '4', '9', '2', '5', '1', '7', '6', '3']
In [7]:
type(connections_rdd)
Out[7]:
pyspark.rdd.PipelinedRDD
In [8]:
Labels[0]
Out[8]:
'10'
In [9]:
features = ['proto','state','dur','sbytes','dbytes','sttl',
'dttl','sloss','dloss','service','Sload','Dload',
'Spkts','Dpkts','swin','dwin','stcpb','dtcpb','smeansz','dmeansz','trans_depth',
'res_bdy_len','Sjit','Djit','Sintpkt','Dintpkt','tcprtt','synack','ackdat',
'is_sm_ips_ports','ct_state_ttl','ct_flw_http_mthd','is_ftp_login','ct_ftp_cmd','ct_srv_src',
'ct_srv_dst','ct_dst_ltm','ct_src_ltm','ct_src_dport_ltm','ct_dst_sport_ltm','ct_dst_src_ltm','attack_cat']
In [9]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
In [10]:
features
Out[10]:
['proto',
 'state',
 'dur',
 'sbytes',
 'dbytes',
 'sttl',
 'dttl',
 'sloss',
 'dloss',
 'service',
 'Sload',
 'Dload',
 'Spkts',
 'Dpkts',
 'swin',
 'dwin',
 'stcpb',
 'dtcpb',
 'smeansz',
 'dmeansz',
 'trans_depth',
 'res_bdy_len',
 'Sjit',
 'Djit',
 'Sintpkt',
 'Dintpkt',
 'tcprtt',
 'synack',
 'ackdat',
 'is_sm_ips_ports',
 'ct_state_ttl',
 'ct_flw_http_mthd',
 'is_ftp_login',
 'ct_ftp_cmd',
 'ct_srv_src',
 'ct_srv_dst',
 'ct_dst_ltm',
 'ct_src_ltm',
 'ct_src_dport_ltm',
 'ct_dst_sport_ltm',
 'ct_dst_src_ltm',
 'attack_cat']
In [11]:
states_rdd = unsw_update_rdd.map(lambda x: x[5]).distinct()
In [14]:
states_rdd
Out[14]:
PythonRDD[12] at RDD at PythonRDD.scala:53
In [15]:
df_unsw = spark.createDataFrame(unsw_update_rdd, features)
In [16]:
type(df_unsw)
Out[16]:
pyspark.sql.dataframe.DataFrame
In [17]:
df_unsw.dtypes
Out[17]:
[('proto', 'string'),
 ('state', 'string'),
 ('dur', 'string'),
 ('sbytes', 'string'),
 ('dbytes', 'string'),
 ('sttl', 'string'),
 ('dttl', 'string'),
 ('sloss', 'string'),
 ('dloss', 'string'),
 ('service', 'string'),
 ('Sload', 'string'),
 ('Dload', 'string'),
 ('Spkts', 'string'),
 ('Dpkts', 'string'),
 ('swin', 'string'),
 ('dwin', 'string'),
 ('stcpb', 'string'),
 ('dtcpb', 'string'),
 ('smeansz', 'string'),
 ('dmeansz', 'string'),
 ('trans_depth', 'string'),
 ('res_bdy_len', 'string'),
 ('Sjit', 'string'),
 ('Djit', 'string'),
 ('Sintpkt', 'string'),
 ('Dintpkt', 'string'),
 ('tcprtt', 'string'),
 ('synack', 'string'),
 ('ackdat', 'string'),
 ('is_sm_ips_ports', 'string'),
 ('ct_state_ttl', 'string'),
 ('ct_flw_http_mthd', 'string'),
 ('is_ftp_login', 'string'),
 ('ct_ftp_cmd', 'string'),
 ('ct_srv_src', 'string'),
 ('ct_srv_dst', 'string'),
 ('ct_dst_ltm', 'string'),
 ('ct_src_ltm', 'string'),
 ('ct_src_dport_ltm', 'string'),
 ('ct_dst_sport_ltm', 'string'),
 ('ct_dst_src_ltm', 'string'),
 ('attack_cat', 'string'),
 ('_43', 'string'),
 ('_44', 'string'),
 ('_45', 'string'),
 ('_46', 'string'),
 ('_47', 'string'),
 ('_48', 'string'),
 ('_49', 'string')]
In [18]:
df_unsw.show(2)
+----------+-----+-------------+------+------+----+----------+-----+-----+-------+-----+-----+-----+-----+---------+---------+-----+-----+-------+-------+-----------+-----------+----+----+-------+-------+---------+---------+----------+---------------+------------+----------------+------------+------------+----------+----------+----------+----------+----------------+----------------+--------------+----------+---+---+---+---+---+---+---+
|     proto|state|          dur|sbytes|dbytes|sttl|      dttl|sloss|dloss|service|Sload|Dload|Spkts|Dpkts|     swin|     dwin|stcpb|dtcpb|smeansz|dmeansz|trans_depth|res_bdy_len|Sjit|Djit|Sintpkt|Dintpkt|   tcprtt|   synack|    ackdat|is_sm_ips_ports|ct_state_ttl|ct_flw_http_mthd|is_ftp_login|  ct_ftp_cmd|ct_srv_src|ct_srv_dst|ct_dst_ltm|ct_src_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|attack_cat|_43|_44|_45|_46|_47|_48|_49|
+----------+-----+-------------+------+------+----+----------+-----+-----+-------+-----+-----+-----+-----+---------+---------+-----+-----+-------+-------+-----------+-----------+----+----+-------+-------+---------+---------+----------+---------------+------------+----------------+------------+------------+----------+----------+----------+----------+----------------+----------------+--------------+----------+---+---+---+---+---+---+---+
|59.166.0.3|56716|149.171.126.8|   143|   tcp| FIN|0.82546002| 7812|16236|     31|   29|   30|   32|    -| 75090.25|156111.73|  122|  126|    255|    255| 2751097753| 2748686736|  64| 129|      0|      0|445.25928| 474.9451|1421970774|     1421970775|   6.8190908|        6.599896|5.9700001E-4|4.6899999E-4|  0.000128|         0|         0|         0|               0|               0|             2|         7|  1|  4|  1|  1|  1| 10|  0|
|59.166.0.0|43467|149.171.126.6| 49729|   tcp| FIN|  0.101815| 4238|65628|     31|   29|    7|   30|    -|328438.84|5087030.5|   72|   74|    255|    255|  961515433| 3225510659|  59| 887|      0|      0|        0|91.579567|1421970775|     1421970775|    1.429493|        1.387192|     0.00068|5.4600002E-4|  0.000134|         0|         0|         0|               0|               0|             7|         4|  1|  6|  1|  1|  1| 10|  0|
+----------+-----+-------------+------+------+----+----------+-----+-----+-------+-----+-----+-----+-----+---------+---------+-----+-----+-------+-------+-----------+-----------+----+----+-------+-------+---------+---------+----------+---------------+------------+----------------+------------+------------+----------+----------+----------+----------+----------------+----------------+--------------+----------+---+---+---+---+---+---+---+
only showing top 2 rows

In [19]:
df_unsw
Out[19]:
DataFrame[proto: string, state: string, dur: string, sbytes: string, dbytes: string, sttl: string, dttl: string, sloss: string, dloss: string, service: string, Sload: string, Dload: string, Spkts: string, Dpkts: string, swin: string, dwin: string, stcpb: string, dtcpb: string, smeansz: string, dmeansz: string, trans_depth: string, res_bdy_len: string, Sjit: string, Djit: string, Sintpkt: string, Dintpkt: string, tcprtt: string, synack: string, ackdat: string, is_sm_ips_ports: string, ct_state_ttl: string, ct_flw_http_mthd: string, is_ftp_login: string, ct_ftp_cmd: string, ct_srv_src: string, ct_srv_dst: string, ct_dst_ltm: string, ct_src_ltm: string, ct_src_dport_ltm: string, ct_dst_sport_ltm: string, ct_dst_src_ltm: string, attack_cat: string, _43: string, _44: string, _45: string, _46: string, _47: string, _48: string, _49: string]
In [20]:
df_unsw.select('sbytes').describe().show()
+-------+------------------+
|summary|            sbytes|
+-------+------------------+
|  count|           2539739|
|   mean|11235.096788685767|
| stddev|18438.200835771608|
|    min|                 0|
|    max|              9999|
+-------+------------------+

In [21]:
df_unsw.select('attack_cat').describe().show()
+-------+------------------+
|summary|        attack_cat|
+-------+------------------+
|  count|           2539739|
|   mean| 8.989882818667587|
| stddev|10.822813466163364|
|    min|                 1|
|    max|                 9|
+-------+------------------+

In [22]:
df = df_unsw.select('attack_cat')
In [23]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="attack_cat", outputCol="attack_catIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
+----------+---------------+
|attack_cat|attack_catIndex|
+----------+---------------+
|         7|            6.0|
|         4|            3.0|
|         5|            4.0|
|         4|            3.0|
|         7|            6.0|
|         6|            5.0|
|         2|            1.0|
|         6|            5.0|
|         5|            4.0|
|         1|            0.0|
|         3|            2.0|
|         6|            5.0|
|         6|            5.0|
|         7|            6.0|
|         7|            6.0|
|         5|            4.0|
|         3|            2.0|
|         5|            4.0|
|         3|            2.0|
|         2|            1.0|
+----------+---------------+
only showing top 20 rows

In [24]:
df
Out[24]:
DataFrame[attack_cat: string]
In [26]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=["attack_catIndex"],
                        outputCols=["categoryVec1"])
model = encoder.fit(indexed)
encoded = model.transform(indexed)
encoded.show()
+----------+---------------+--------------+
|attack_cat|attack_catIndex|  categoryVec1|
+----------+---------------+--------------+
|         7|            6.0|(65,[6],[1.0])|
|         4|            3.0|(65,[3],[1.0])|
|         5|            4.0|(65,[4],[1.0])|
|         4|            3.0|(65,[3],[1.0])|
|         7|            6.0|(65,[6],[1.0])|
|         6|            5.0|(65,[5],[1.0])|
|         2|            1.0|(65,[1],[1.0])|
|         6|            5.0|(65,[5],[1.0])|
|         5|            4.0|(65,[4],[1.0])|
|         1|            0.0|(65,[0],[1.0])|
|         3|            2.0|(65,[2],[1.0])|
|         6|            5.0|(65,[5],[1.0])|
|         6|            5.0|(65,[5],[1.0])|
|         7|            6.0|(65,[6],[1.0])|
|         7|            6.0|(65,[6],[1.0])|
|         5|            4.0|(65,[4],[1.0])|
|         3|            2.0|(65,[2],[1.0])|
|         5|            4.0|(65,[4],[1.0])|
|         3|            2.0|(65,[2],[1.0])|
|         2|            1.0|(65,[1],[1.0])|
+----------+---------------+--------------+
only showing top 20 rows

In [29]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["attack_catIndex", "categoryVec1"],
    outputCol="features")
In [33]:
output = assembler.transform(encoded)
print("Assembled columns 'attack_cat', 'attack_catIndex', 'categoryVec1' to vector column 'features'")
Assembled columns 'attack_cat', 'attack_catIndex', 'categoryVec1' to vector column 'features'
In [34]:
output.select("features").show(truncate=False)
+--------------------+
|features            |
+--------------------+
|(66,[0,7],[6.0,1.0])|
|(66,[0,4],[3.0,1.0])|
|(66,[0,5],[4.0,1.0])|
|(66,[0,4],[3.0,1.0])|
|(66,[0,7],[6.0,1.0])|
|(66,[0,6],[5.0,1.0])|
|(66,[0,2],[1.0,1.0])|
|(66,[0,6],[5.0,1.0])|
|(66,[0,5],[4.0,1.0])|
|(66,[1],[1.0])      |
|(66,[0,3],[2.0,1.0])|
|(66,[0,6],[5.0,1.0])|
|(66,[0,6],[5.0,1.0])|
|(66,[0,7],[6.0,1.0])|
|(66,[0,7],[6.0,1.0])|
|(66,[0,5],[4.0,1.0])|
|(66,[0,3],[2.0,1.0])|
|(66,[0,5],[4.0,1.0])|
|(66,[0,3],[2.0,1.0])|
|(66,[0,2],[1.0,1.0])|
+--------------------+
only showing top 20 rows

In [22]:
df.show()
+----------+
|attack_cat|
+----------+
|         7|
|         4|
|         5|
|         4|
|         7|
|         6|
|         2|
|         6|
|         5|
|         1|
|         3|
|         6|
|         6|
|         7|
|         7|
|         5|
|         3|
|         5|
|         3|
|         2|
+----------+
only showing top 20 rows

In [25]:
df.printSchema()
root
 |-- attack_cat: string (nullable = true)

In [26]:
print("total records",df.count())
total records 2539739
In [27]:
for t in df.dtypes:
    if t[1]=='string':
        print("column name",t[0])
        print(df.select(t[0]).distinct().show())
column name attack_cat
+----------+
|attack_cat|
+----------+
|         7|
|        51|
|        15|
|        54|
|        11|
|        29|
|        42|
|        64|
|         3|
|        30|
|        34|
|        59|
|         8|
|        22|
|        28|
|        16|
|        35|
|        52|
|        47|
|        43|
+----------+
only showing top 20 rows

None
In [23]:
import pandas as pd
pd.DataFrame(df.take(20), columns=df.columns).transpose()
Out[23]:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
attack_cat 7 4 5 4 7 6 2 6 5 1 3 6 6 7 7 5 3 5 3 2
In [24]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
Training Dataset Count: 1777130
Test Dataset Count: 762609
In [31]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'attack_cat', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('attack_cat', 'label', 'rawPrediction', 'prediction', 'probability').show(10)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/Documents/spark-3.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:

~/Documents/spark-3.0.0/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o229.fit.
: java.lang.IllegalArgumentException: requirement failed: Column attack_cat must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually string.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:45)
	at org.apache.spark.ml.PredictorParams.validateAndTransformSchema(Predictor.scala:51)
	at org.apache.spark.ml.PredictorParams.validateAndTransformSchema$(Predictor.scala:46)
	at org.apache.spark.ml.classification.Classifier.org$apache$spark$ml$classification$ClassifierParams$$super$validateAndTransformSchema(Classifier.scala:75)
	at org.apache.spark.ml.classification.ClassifierParams.validateAndTransformSchema(Classifier.scala:42)
	at org.apache.spark.ml.classification.ClassifierParams.validateAndTransformSchema$(Classifier.scala:38)
	at org.apache.spark.ml.classification.ProbabilisticClassifier.org$apache$spark$ml$classification$ProbabilisticClassifierParams$$super$validateAndTransformSchema(ProbabilisticClassifier.scala:53)
	at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema(ProbabilisticClassifier.scala:37)
	at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema$(ProbabilisticClassifier.scala:33)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.org$apache$spark$ml$tree$DecisionTreeClassifierParams$$super$validateAndTransformSchema(DecisionTreeClassifier.scala:48)
	at org.apache.spark.ml.tree.DecisionTreeClassifierParams.validateAndTransformSchema(treeParams.scala:241)
	at org.apache.spark.ml.tree.DecisionTreeClassifierParams.validateAndTransformSchema$(treeParams.scala:237)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.validateAndTransformSchema(DecisionTreeClassifier.scala:48)
	at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:178)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:75)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:134)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:116)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-31-38d13812ea10> in <module>
      1 from pyspark.ml.classification import DecisionTreeClassifier
      2 dt = DecisionTreeClassifier(featuresCol = 'attack_cat', labelCol = 'label', maxDepth = 3)
----> 3 dtModel = dt.fit(train)
      4 predictions = dtModel.transform(test)
      5 predictions.select('attack_cat', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

~/Documents/spark-3.0.0/python/pyspark/ml/base.py in fit(self, dataset, params)
    129                 return self.copy(params)._fit(dataset)
    130             else:
--> 131                 return self._fit(dataset)
    132         else:
    133             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/Documents/spark-3.0.0/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    319 
    320     def _fit(self, dataset):
--> 321         java_model = self._fit_java(dataset)
    322         model = self._create_model(java_model)
    323         return self._copyValues(model)

~/Documents/spark-3.0.0/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    316         """
    317         self._transfer_params_to_java()
--> 318         return self._java_obj.fit(dataset._jdf)
    319 
    320     def _fit(self, dataset):

~/Documents/spark-3.0.0/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

~/Documents/spark-3.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
    100             converted = convert_exception(e.java_exception)
    101             if not isinstance(converted, UnknownException):
--> 102                 raise converted
    103             else:
    104                 raise

IllegalArgumentException: requirement failed: Column attack_cat must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually string.
In [12]:
Labels[8]
Out[12]:
'6'
In [13]:
from pyspark.sql.functions import col
In [14]:
feature_columns = df_unsw.columns[:-5] 
feature_columns
Out[14]:
['proto',
 'state',
 'dur',
 'sbytes',
 'dbytes',
 'sttl',
 'dttl',
 'sloss',
 'dloss',
 'service',
 'Sload',
 'Dload',
 'Spkts',
 'Dpkts',
 'swin',
 'dwin',
 'stcpb',
 'dtcpb',
 'smeansz',
 'dmeansz',
 'trans_depth',
 'res_bdy_len',
 'Sjit',
 'Djit',
 'Sintpkt',
 'Dintpkt',
 'tcprtt',
 'synack',
 'ackdat',
 'is_sm_ips_ports',
 'ct_state_ttl',
 'ct_flw_http_mthd',
 'is_ftp_login',
 'ct_ftp_cmd',
 'ct_srv_src',
 'ct_srv_dst',
 'ct_dst_ltm',
 'ct_src_ltm',
 'ct_src_dport_ltm',
 'ct_dst_sport_ltm',
 'ct_dst_src_ltm',
 'attack_cat',
 '_43',
 '_44']
In [15]:
df_unsw.columns
Out[15]:
['proto',
 'state',
 'dur',
 'sbytes',
 'dbytes',
 'sttl',
 'dttl',
 'sloss',
 'dloss',
 'service',
 'Sload',
 'Dload',
 'Spkts',
 'Dpkts',
 'swin',
 'dwin',
 'stcpb',
 'dtcpb',
 'smeansz',
 'dmeansz',
 'trans_depth',
 'res_bdy_len',
 'Sjit',
 'Djit',
 'Sintpkt',
 'Dintpkt',
 'tcprtt',
 'synack',
 'ackdat',
 'is_sm_ips_ports',
 'ct_state_ttl',
 'ct_flw_http_mthd',
 'is_ftp_login',
 'ct_ftp_cmd',
 'ct_srv_src',
 'ct_srv_dst',
 'ct_dst_ltm',
 'ct_src_ltm',
 'ct_src_dport_ltm',
 'ct_dst_sport_ltm',
 'ct_dst_src_ltm',
 'attack_cat',
 '_43',
 '_44',
 '_45',
 '_46',
 '_47',
 '_48',
 '_49']
In [28]:
df_unsw.printSchema()
root
 |-- proto: string (nullable = true)
 |-- state: string (nullable = true)
 |-- dur: string (nullable = true)
 |-- sbytes: string (nullable = true)
 |-- dbytes: string (nullable = true)
 |-- sttl: string (nullable = true)
 |-- dttl: string (nullable = true)
 |-- sloss: string (nullable = true)
 |-- dloss: string (nullable = true)
 |-- service: string (nullable = true)
 |-- Sload: string (nullable = true)
 |-- Dload: string (nullable = true)
 |-- Spkts: string (nullable = true)
 |-- Dpkts: string (nullable = true)
 |-- swin: string (nullable = true)
 |-- dwin: string (nullable = true)
 |-- stcpb: string (nullable = true)
 |-- dtcpb: string (nullable = true)
 |-- smeansz: string (nullable = true)
 |-- dmeansz: string (nullable = true)
 |-- trans_depth: string (nullable = true)
 |-- res_bdy_len: string (nullable = true)
 |-- Sjit: string (nullable = true)
 |-- Djit: string (nullable = true)
 |-- Sintpkt: string (nullable = true)
 |-- Dintpkt: string (nullable = true)
 |-- tcprtt: string (nullable = true)
 |-- synack: string (nullable = true)
 |-- ackdat: string (nullable = true)
 |-- is_sm_ips_ports: string (nullable = true)
 |-- ct_state_ttl: string (nullable = true)
 |-- ct_flw_http_mthd: string (nullable = true)
 |-- is_ftp_login: string (nullable = true)
 |-- ct_ftp_cmd: string (nullable = true)
 |-- ct_srv_src: string (nullable = true)
 |-- ct_srv_dst: string (nullable = true)
 |-- ct_dst_ltm: string (nullable = true)
 |-- ct_src_ltm: string (nullable = true)
 |-- ct_src_dport_ltm: string (nullable = true)
 |-- ct_dst_sport_ltm: string (nullable = true)
 |-- ct_dst_src_ltm: string (nullable = true)
 |-- attack_cat: string (nullable = true)
 |-- _43: string (nullable = true)
 |-- _44: string (nullable = true)
 |-- _45: string (nullable = true)
 |-- _46: string (nullable = true)
 |-- _47: string (nullable = true)
 |-- _48: string (nullable = true)
 |-- _49: string (nullable = true)

In [33]:
df_unsw.select("attack_cat").show(truncate=False)
+----------+
|attack_cat|
+----------+
|7         |
|4         |
|5         |
|4         |
|7         |
|6         |
|2         |
|6         |
|5         |
|1         |
|3         |
|6         |
|6         |
|7         |
|7         |
|5         |
|3         |
|5         |
|3         |
|2         |
+----------+
only showing top 20 rows

In [36]:
attack_cat = df_unsw.select("attack_cat")
In [40]:
attack_cat.describe()
Out[40]:
DataFrame[summary: string, attack_cat: string]
In [41]:
type(attack_cat)
Out[41]:
pyspark.sql.dataframe.DataFrame
In [42]:
import pandas as pd
pd.DataFrame(df_unsw.take(5), columns=df_unsw.columns).transpose()
Out[42]:
0 1 2 3 4
proto 59.166.0.3 59.166.0.0 59.166.0.5 59.166.0.9 59.166.0.8
state 56716 43467 41289 43785 40691
dur 149.171.126.8 149.171.126.6 149.171.126.2 149.171.126.0 149.171.126.9
sbytes 143 49729 9574 6881 6881
dbytes tcp tcp tcp tcp tcp
sttl FIN FIN FIN FIN FIN
dttl 0.82546002 0.101815 4.4002999E-2 2.7908299 2.6335001
sloss 7812 4238 2750 10476 13350
dloss 16236 65628 29104 395734 548216
service 31 31 31 31 31
Sload 29 29 29 29 29
Dload 30 7 7 16 21
Spkts 32 30 17 143 197
Dpkts - - - - -
swin 75090.25 328438.84 488693.97 29863.518 40381.238
dwin 156111.73 5087030.5 5181101.5 1130840.8 1661560.6
stcpb 122 72 44 180 232
dtcpb 126 74 48 320 438
smeansz 255 255 255 255 255
dmeansz 255 255 255 255 255
trans_depth 2751097753 961515433 3291096757 3934392726 1518931
res_bdy_len 2748686736 3225510659 1191410228 3961690324 18267719
Sjit 64 59 63 58 58
Djit 129 887 606 1237 1252
Sintpkt 0 0 0 0 0
Dintpkt 0 0 0 0 0
tcprtt 445.25928 0 78.126968 2707.4927 718.33679
synack 474.9451 91.579567 62.206562 2018.976 500.57288
ackdat 1421970774 1421970775 1421970775 1421970772 1421970773
is_sm_ips_ports 1421970775 1421970775 1421970775 1421970775 1421970775
ct_state_ttl 6.8190908 1.429493 1.014977 15.589459 11.399026
ct_flw_http_mthd 6.599896 1.387192 0.92583001 8.7470121 6.0251832
is_ftp_login 5.9700001E-4 0.00068 0.00125 6.8400003E-4 0.000619
ct_ftp_cmd 4.6899999E-4 5.4600002E-4 0.000485 5.3199998E-4 0.000489
ct_srv_src 0.000128 0.000134 0.000765 1.5199999E-4 0.00013
ct_srv_dst 0 0 0 0 0
ct_dst_ltm 0 0 0 0 0
ct_src_ltm 0 0 0 0 0
ct_src_dport_ltm 0 0 0 0 0
ct_dst_sport_ltm 0 0 0 0 0
ct_dst_src_ltm 2 7 3 11 16
attack_cat 7 4 5 4 7
_43 1 1 3 3 7
_44 4 6 3 2 1
_45 1 1 1 1 1
_46 1 1 1 1 1
_47 1 1 1 1 1
_48 10 10 10 10 10
_49 0 0 0 0 0
In [30]:
train, test = df_unsw.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
Training Dataset Count: 1777130
Test Dataset Count: 762609

make a feature column

In [46]:
feature_columns = df_unsw.columns[:-5] 
feature_columns
Out[46]:
['proto',
 'state',
 'dur',
 'sbytes',
 'dbytes',
 'sttl',
 'dttl',
 'sloss',
 'dloss',
 'service',
 'Sload',
 'Dload',
 'Spkts',
 'Dpkts',
 'swin',
 'dwin',
 'stcpb',
 'dtcpb',
 'smeansz',
 'dmeansz',
 'trans_depth',
 'res_bdy_len',
 'Sjit',
 'Djit',
 'Sintpkt',
 'Dintpkt',
 'tcprtt',
 'synack',
 'ackdat',
 'is_sm_ips_ports',
 'ct_state_ttl',
 'ct_flw_http_mthd',
 'is_ftp_login',
 'ct_ftp_cmd',
 'ct_srv_src',
 'ct_srv_dst',
 'ct_dst_ltm',
 'ct_src_ltm',
 'ct_src_dport_ltm',
 'ct_dst_sport_ltm',
 'ct_dst_src_ltm',
 'attack_cat',
 '_43',
 '_44']
In [47]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'feature_column', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('attack_cat', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/Documents/spark-3.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:

~/Documents/spark-3.0.0/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o473.fit.
: java.lang.IllegalArgumentException: feature_column does not exist. Available: proto, state, dur, sbytes, dbytes, sttl, dttl, sloss, dloss, service, Sload, Dload, Spkts, Dpkts, swin, dwin, stcpb, dtcpb, smeansz, dmeansz, trans_depth, res_bdy_len, Sjit, Djit, Sintpkt, Dintpkt, tcprtt, synack, ackdat, is_sm_ips_ports, ct_state_ttl, ct_flw_http_mthd, is_ftp_login, ct_ftp_cmd, ct_srv_src, ct_srv_dst, ct_dst_ltm, ct_src_ltm, ct_src_dport_ltm, ct_dst_sport_ltm, ct_dst_src_ltm, attack_cat, _43, _44, _45, _46, _47, _48, _49
	at org.apache.spark.sql.types.StructType.$anonfun$apply$1(StructType.scala:275)
	at scala.collection.MapLike.getOrElse(MapLike.scala:131)
	at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
	at scala.collection.AbstractMap.getOrElse(Map.scala:63)
	at org.apache.spark.sql.types.StructType.apply(StructType.scala:274)
	at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
	at org.apache.spark.ml.PredictorParams.validateAndTransformSchema(Predictor.scala:51)
	at org.apache.spark.ml.PredictorParams.validateAndTransformSchema$(Predictor.scala:46)
	at org.apache.spark.ml.classification.Classifier.org$apache$spark$ml$classification$ClassifierParams$$super$validateAndTransformSchema(Classifier.scala:75)
	at org.apache.spark.ml.classification.ClassifierParams.validateAndTransformSchema(Classifier.scala:42)
	at org.apache.spark.ml.classification.ClassifierParams.validateAndTransformSchema$(Classifier.scala:38)
	at org.apache.spark.ml.classification.ProbabilisticClassifier.org$apache$spark$ml$classification$ProbabilisticClassifierParams$$super$validateAndTransformSchema(ProbabilisticClassifier.scala:53)
	at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema(ProbabilisticClassifier.scala:37)
	at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema$(ProbabilisticClassifier.scala:33)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.org$apache$spark$ml$tree$DecisionTreeClassifierParams$$super$validateAndTransformSchema(DecisionTreeClassifier.scala:48)
	at org.apache.spark.ml.tree.DecisionTreeClassifierParams.validateAndTransformSchema(treeParams.scala:241)
	at org.apache.spark.ml.tree.DecisionTreeClassifierParams.validateAndTransformSchema$(treeParams.scala:237)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.validateAndTransformSchema(DecisionTreeClassifier.scala:48)
	at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:178)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:75)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:134)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-47-5c2d2acbd1c2> in <module>
      1 from pyspark.ml.classification import DecisionTreeClassifier
      2 dt = DecisionTreeClassifier(featuresCol = 'feature_column', labelCol = 'label', maxDepth = 3)
----> 3 dtModel = dt.fit(train)
      4 predictions = dtModel.transform(test)
      5 predictions.select('attack_cat', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

~/Documents/spark-3.0.0/python/pyspark/ml/base.py in fit(self, dataset, params)
    129                 return self.copy(params)._fit(dataset)
    130             else:
--> 131                 return self._fit(dataset)
    132         else:
    133             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/Documents/spark-3.0.0/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    319 
    320     def _fit(self, dataset):
--> 321         java_model = self._fit_java(dataset)
    322         model = self._create_model(java_model)
    323         return self._copyValues(model)

~/Documents/spark-3.0.0/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    316         """
    317         self._transfer_params_to_java()
--> 318         return self._java_obj.fit(dataset._jdf)
    319 
    320     def _fit(self, dataset):

~/Documents/spark-3.0.0/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

~/Documents/spark-3.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
    100             converted = convert_exception(e.java_exception)
    101             if not isinstance(converted, UnknownException):
--> 102                 raise converted
    103             else:
    104                 raise

IllegalArgumentException: feature_column does not exist. Available: proto, state, dur, sbytes, dbytes, sttl, dttl, sloss, dloss, service, Sload, Dload, Spkts, Dpkts, swin, dwin, stcpb, dtcpb, smeansz, dmeansz, trans_depth, res_bdy_len, Sjit, Djit, Sintpkt, Dintpkt, tcprtt, synack, ackdat, is_sm_ips_ports, ct_state_ttl, ct_flw_http_mthd, is_ftp_login, ct_ftp_cmd, ct_srv_src, ct_srv_dst, ct_dst_ltm, ct_src_ltm, ct_src_dport_ltm, ct_dst_sport_ltm, ct_dst_src_ltm, attack_cat, _43, _44, _45, _46, _47, _48, _49
In [23]:
import numpy as np
In [22]:
from pyspark.mllib.stat import Statistics
In [25]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
In [ ]: