11.03.2020

countByValueAndWindow

In [1]:
import findspark
findspark.init('/home/bigdata/Documents/spark-3.0.0')
In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
In [3]:
config = SparkConf().setAppName("count_by_value_window_example")
sc = SparkContext(conf=config)
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
In [4]:
lines = ssc.socketTextStream("localhost", 5000)
In [5]:
words = lines.flatMap(lambda line: line.split(" "))
words.countByValueAndWindow(15, 5).pprint()
In [6]:
ssc.start()
-------------------------------------------
Time: 2020-03-11 15:31:17
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:31:22
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:31:27
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:31:32
-------------------------------------------
('i', 1)
('spark', 1)
('123', 1)
('love', 1)

-------------------------------------------
Time: 2020-03-11 15:31:37
-------------------------------------------
('i', 1)
('spark', 1)
('123', 1)
('love', 1)

-------------------------------------------
Time: 2020-03-11 15:31:42
-------------------------------------------
('i', 2)
('spark', 2)
('123', 1)
('love', 2)
('456', 1)

-------------------------------------------
Time: 2020-03-11 15:31:47
-------------------------------------------
('i', 1)
('spark', 2)
('456', 1)
('love', 1)
('aws', 1)

-------------------------------------------
Time: 2020-03-11 15:31:52
-------------------------------------------
('i', 1)
('spark', 2)
('456', 1)
('love', 1)
('aws', 1)

-------------------------------------------
Time: 2020-03-11 15:31:57
-------------------------------------------
('spark', 1)
('aws', 1)

-------------------------------------------
Time: 2020-03-11 15:32:02
-------------------------------------------

-------------------------------------------
Time: 2020-03-11 15:32:07
-------------------------------------------

In [9]:
ssc.stop(stopGraceFully=True, stopSparkContext=True)
In [ ]: