Streaming Data

In [1]:
!pip install -q findspark
In [2]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')
In [3]:
import random
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
In [12]:
sc = SparkContext(appName="PythonStreamingQueue")
ssc = StreamingContext(sc, 1)
In [13]:
rddQueue = list()
for i in range(4):
    rddQueue += [sc.parallelize([j for j in random.sample(range(1, 1000), 50)], 3)]
In [14]:
rddQueue[0].collect()
Out[14]:
[294,
 886,
 912,
 263,
 422,
 380,
 991,
 902,
 882,
 18,
 745,
 612,
 131,
 406,
 179,
 519,
 66,
 825,
 542,
 370,
 948,
 155,
 357,
 522,
 133,
 350,
 594,
 649,
 14,
 172,
 506,
 551,
 828,
 262,
 307,
 441,
 701,
 686,
 792,
 347,
 983,
 195,
 304,
 192,
 298,
 662,
 946,
 412,
 43,
 485]
In [15]:
rdd_Stream = ssc.queueStream(rddQueue)
In [16]:
mapped_rdd_Stream = rdd_Stream.map(lambda x: (x % 10, 1))
reduced_rdd_Stream = mapped_rdd_Stream.reduceByKey(lambda x, y: x + y)
reduced_rdd_Stream.pprint()
In [17]:
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
-------------------------------------------
Time: 2020-03-04 14:39:40
-------------------------------------------
(4, 4)
(0, 3)
(8, 4)
(1, 5)
(5, 5)
(9, 3)
(6, 6)
(2, 13)
(3, 4)
(7, 3)

-------------------------------------------
Time: 2020-03-04 14:39:41
-------------------------------------------
(0, 6)
(8, 5)
(4, 6)
(1, 5)
(5, 8)
(9, 5)
(6, 5)
(2, 1)
(7, 7)
(3, 2)

-------------------------------------------
Time: 2020-03-04 14:39:42
-------------------------------------------
(4, 5)
(8, 8)
(0, 8)
(1, 3)
(9, 3)
(5, 4)
(2, 2)
(6, 6)
(3, 3)
(7, 8)

-------------------------------------------
Time: 2020-03-04 14:39:43
-------------------------------------------
(0, 5)
(4, 4)
(8, 5)
(9, 8)
(5, 3)
(1, 5)
(6, 8)
(2, 4)
(7, 3)
(3, 5)

-------------------------------------------
Time: 2020-03-04 14:39:44
-------------------------------------------

-------------------------------------------
Time: 2020-03-04 14:39:45
-------------------------------------------

-------------------------------------------
Time: 2020-03-04 14:39:46
-------------------------------------------

In [ ]: