11.03.2020

reduceByKeyAndWindow

In [1]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')
In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
In [3]:
config = SparkConf().setAppName("reduce_by_key_window_example")
sc = SparkContext(conf=config)
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
In [4]:
lines = ssc.socketTextStream("localhost", 5000)

wordcount example

In [5]:
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))

pairs.reduceByKeyAndWindow(func=lambda x,y: x+y,invFunc=lambda x,y:x+y, windowDuration=15, slideDuration=5).pprint()
In [6]:
ssc.start()
-------------------------------------------
Time: 2020-03-11 15:22:56
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:23:01
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:23:06
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:23:11
-------------------------------------------
('spark', 1)
('great', 1)
('is', 1)

-------------------------------------------
Time: 2020-03-11 15:23:16
-------------------------------------------
('spark', 2)
('great', 1)
('and', 1)
('is', 1)
('love', 1)
('aws', 1)

-------------------------------------------
Time: 2020-03-11 15:23:21
-------------------------------------------
('money', 1)
('spark', 2)
('great', 1)
('and', 2)
('food', 1)
('is', 1)
('love', 2)
('aws', 1)

-------------------------------------------
Time: 2020-03-11 15:23:26
-------------------------------------------
('money', 1)
('spark', 1)
('and', 2)
('food', 1)
('love', 2)
('aws', 1)

-------------------------------------------
Time: 2020-03-11 15:23:31
-------------------------------------------
('money', 1)
('and', 1)
('food', 1)
('love', 1)

In [7]:
ssc.stop(stopGraceFully=True, stopSparkContext=True)
-------------------------------------------
Time: 2020-03-11 15:23:36
-------------------------------------------

In [ ]:
ssc.stop
In [ ]: