11.03.2020

unsw_streaming_analysis

In [1]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')
In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import os
In [3]:
config = (SparkConf().setAppName("UNSW").set("spark.executer.memory", "2g"))

sc = SparkContext(conf=config)
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint")
In [4]:
unsw_list = list()
entries = os.listdir('UNSW_data')
for entry in entries:
    print(entry)
    path = "UNSW_data/" + entry
    unsw_list += [sc.textFile(path, 4)]
UNSW-NB15-41.csv
UNSW-NB15-8.csv
UNSW-NB15-43.csv
UNSW-NB15-36.csv
UNSW-NB15-39.csv
UNSW-NB15-3.csv
UNSW-NB15-24.csv
UNSW-NB15-1.csv
UNSW-NB15-14.csv
UNSW-NB15-17.csv
UNSW-NB15-31.csv
UNSW-NB15-23.csv
UNSW-NB15-32.csv
UNSW-NB15-16.csv
UNSW-NB15-34.csv
UNSW-NB15-42.csv
UNSW-NB15-12.csv
UNSW-NB15-15.csv
UNSW-NB15-25.csv
UNSW-NB15-27.csv
UNSW-NB15-45.csv
UNSW-NB15-21.csv
UNSW-NB15-37.csv
UNSW-NB15-46.csv
UNSW-NB15-26.csv
UNSW-NB15-20.csv
UNSW-NB15-4.csv
UNSW-NB15-7.csv
UNSW-NB15-29.csv
UNSW-NB15-9.csv
UNSW-NB15-44.csv
UNSW-NB15-38.csv
UNSW-NB15-13.csv
UNSW-NB15-6.csv
UNSW-NB15-49.csv
UNSW-NB15-48.csv
UNSW-NB15-2.csv
UNSW-NB15-47.csv
UNSW-NB15-5.csv
UNSW-NB15-30.csv
UNSW-NB15-10.csv
UNSW-NB15-18.csv
UNSW-NB15-40.csv
UNSW-NB15-28.csv
UNSW-NB15-35.csv
UNSW-NB15-33.csv
UNSW-NB15-50.csv
UNSW-NB15-22.csv
UNSW-NB15-19.csv
UNSW-NB15-11.csv
In [5]:
unsw_list[49].take(2)
Out[5]:
['"175.45.176.1","1043","149.171.126.18","53","udp","INT","3.0000001E-6","114","0","254","0","0","0","dns","1.52E+8","0","2","0","0","0","0","0","57","0","0","0","0","0","1424237372","1424237372","0.003","0","0","0","0","0","2","0","0","0","31","31","20","21","20","20","31","Generic","1"',
 '"175.45.176.1","1043","149.171.126.18","53","udp","INT","3.0000001E-6","114","0","254","0","0","0","dns","1.52E+8","0","2","0","0","0","0","0","57","0","0","0","0","0","1424237372","1424237372","0.003","0","0","0","0","0","2","0","0","0","31","31","20","21","20","20","31","Generic","1"']
In [6]:
rdd_stream = ssc.queueStream(unsw_list)
In [ ]:
################################## solution (transformation and action) #########################################
In [ ]:
ssc.start()
In [ ]:
ssc.stop(stopGraceFully=True, stopSparkContext=True)
In [ ]: