11.03.2020

Output Operations on DStreams

In [1]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')
In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
In [3]:
config = SparkConf().setAppName("output_operation_example")
sc = SparkContext(conf=config)
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
In [4]:
lines = ssc.socketTextStream("localhost", 5000)
In [5]:
words = lines.flatMap(lambda line: line.split(" "))
In [6]:
counter = words.countByValueAndWindow(15, 5)
counter.saveAsTextFiles("output/counter")
In [ ]:
ssc.start()
In [ ]:
ssc.stop(stopGraceFully=True, stopSparkContext=True)
In [ ]: