!pip install -q findspark
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')
import random
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="PythonStreamingQueue")
ssc = StreamingContext(sc, 1)
rddQueue = list()
for i in range(4):
rddQueue += [sc.parallelize([j for j in random.sample(range(1, 1000), 50)], 3)]
rddQueue[0].collect()
rdd_Stream = ssc.queueStream(rddQueue)
mapped_rdd_Stream = rdd_Stream.map(lambda x: (x % 10, 1))
reduced_rdd_Stream = mapped_rdd_Stream.reduceByKey(lambda x, y: x + y)
reduced_rdd_Stream.pprint()
ssc.start()
time.sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)