Apache Spark — Structured Streaming
Introduction
The goal of this lab assignment is to learn how to analyze streams of data with the Spark Structured Streaming API. Refer to this documentation to learn how to connect and interact with the cluster.
Documentation
In order to answer the questions and do the exercises, you might want to refer to the following documentation:
1 Warming up
Consider the following program.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
port_number = COMPLETE HERE
checkpoint_location = COMPLETE HERE
spark = (SparkSession.builder.appName("Structured Streaming - exo1").getOrCreate())
lines = (spark\
.readStream.format("socket")\
.option("host", "localhost")\
.option("port", port_number)\
.load())
streamingQuery = lines.writeStream\
.option("checkpointLocation", checkpoint_location)\
.format("console").start()
streamingQuery.awaitTermination()
Exercise
Exercise 1.1
Where does this program get its input from?
What object type does the variable
lines
contain?Where does this program write its output?
What is the output of this program?
What is the option
checkpointLocation
intended for?What does the instruction
streamingQuery.awaitTermination()
?
Solution
The input is a socket, where a program generates some data.
lines is a dataframe.
To the console.
Just copies the input to the output.
It is where the Spark program writes the progress of the streaming query.
The instructions means that the program will block on this instruction. The program won’t stop until there is data to process.
You can now verify your answers to the previous questions by executing the program.
Activity
Connect to the cluster, if you haven’t done so yet. Refer to this documentation.
After running the command
srun ...
, you should be connected to a machine on the cluster Kyle. Note the name of this machine (you should see it at the terminal prompt).Create a checkpoint directory for the first exercise (e.g.,
checkpoint_exo1
) under your home directoryhdfs://sar01:9000/cpuecm1/cpuecm1_X
in HDFS.- Copy and paste the code into a Python file (e.g.,
exo1.py
) that you’ll save into your home directory in the local filesystem of the cluster machine.- Change the value of the variable
checkpoint_location
so that it points to the directory that you created at point 3. - Change the value of the variable
port_number
to any value in the range [49152, 65535].
- Change the value of the variable
Open a new terminal window, connect to
phome.metz.supelec.fr
and then to the same machine that you noted at point 2.In the new terminal, start a netcat server listening on the port number that you selected at point 4. Use the following command:
nc -lk port_number
- Run the Python code with the command
spark-submit
. Wait until Spark does not display any more messages on screen.- In case the program stops for an error, read the box “What to do in case of errors” below.
- In the netcat terminal, write few lines of text. Look at the terminal where the Spark program is running and observe the output.
What to do in case of errors
If any error arises, before running the spark-submit
again it would be better to remove
all files from the checkpoint directory.
Stop the program
When you’re done with your experiments, you can stop the Spark program by simply typing CTRL-C in the terminal where Spark is running.
Don’t stop the netcat server, you’ll need it in the next exercise.
Remove all files from the checkpoint location.
2 Triggering policy
In a Structured Streaming program we can choose a triggering policy.
Exercise
Exercise 2.1
What is a triggering policy?
What is the triggering policy in the previous program?
Modify the code of the previous program in order to set the
Fixed interval micro-batch
triggering policy.Run the program. How is the behaviour of this program different from before?
Solution
It is a policy that dictates the timing of streaming data processing.
No triggering policy is specified, so the default is chosen. In practice, as soon as the previous micro-batch finishes processing, the next one is read in.
Here is the code. We need to specify a `
trigger
.
streamingQuery = lines.writeStream
.trigger(processingTime = "15 seconds")
.format("console").start()
- The new data is checked at the specified interval. It is possible that within the specified interval (15 seconds), we write many lines, so the program will get multiple lines in a micro-batch (unlike before, when the processing is triggered as soon as there is data available).
3 Checkpoint location and output mode
We’re now going to see the impact of the checkpoint location and the output modes on a streaming query.
Exercise
Exercise 3.1
What is an output mode and what are the available options?
- What is the output mode of the previous program?
Solution
The output mode tells Spark how the output is presented. There are several options: append (only the new rows added to the Result Table since the last trigger are visible in the output), complete (the whole Result Table is visible after each trigger) and update (only the rows that were updated since the last trigger are visible in the output).
- In the previous program we didn’t specify any output mode, so the default mode (append) is selected.
We’re now going to write a new streaming query.
Exercise
Exercise 3.2
Create a new checkpoint location in HDFS. You may also keep the same directory as before; in this case, make sure you remove all files from that directory.
Write a new program that reads a streaming text from a TCP socket and counts the number of occurrences of each word.
Which output mode are you going to choose and why?
Run the program. Write few lines on the netcat server and observe the output.
Stop the program and run it again with no modifications. Write few lines in the netcat terminal and observe the output. What can you say about the word counts?
Stop the program and remove the files in the checkpoint location. Run the program again and write few lines on the netcat terminal. What can you say about the word counts?
Play with the different output modes and observe how the output changes.
Solution
The new program is as follows:
lines = (spark\
.readStream.format("socket")\
.option("host", "localhost")\
.option("port", port_number)\
.load())
lines = lines.select(F.explode(F.split(lines.value, " "))\
.alias("word"))\
.groupBy("word").count()
streamingQuery = lines.writeStream\
.trigger(processingTime = "15 seconds")\
.option("checkpointLocation", checkpoint_location)\
.outputMode("update")\
.format("console")\
.start()
streamingQuery.awaitTermination()
The append mode doesn’t work, because aggregating function might modify previous lines of the ResultTable. So, the only options left are update and complete. We choose update to just have the values that changed since the last trigger.
4 Window operations on event time
Netcat and checkpoint
You can stop the netcat server now.
Remember to create a new checkpoint location for this exercise. Alternatively, you can also use the same directory as in the previous exercises, but you should remove all its files.
We’re now going to find out how to perform aggregations over a sliding event-time window.
A given data source generates some words for a certain time interval. Each word is accompanied with a timestamp that indicates the exact moment when the word is generated. This timestamp is the event time.
After generating a word, the data source saves the word and its timestamp into a CSV file in a directory on HDFS. For convenience, we’ll refer to this directory as the source directory.
Activity
Create the source directory under your home directory
hdfs://sar01:9000/cpuecm1/cpuecm1_X
in HDFS.
At any given moment, the source will contain zero to many CSV files,
where each file only contains exactly one line in the format
word,timestamp
(no whitespace before nor after the comma).
Exercise
Exercise 4.1
Write a Spark program that:
Reads the stream of data from the source directory.
Counts the number of occurrences of each word within 10 minute windows that slide every 5 minutes.
- Print the output counts to the console. Use triggers of 5 seconds.
Solution
The new program is as follows:
source_directory = hdfs://....
words = (spark
.readStream.format("csv")
.schema("word STRING, timestamp TIMESTAMP")
.load(source_directory))
windowedCount = words.groupBy(F.window(words.timestamp,
"10 seconds", "5 seconds", startTime=0), words.word).count()
windowedQuery = windowedCount.withColumn("trigger_timestamp", F.expr("get_current_timestamp()")).writeStream\
.trigger(processingTime="5 seconds")\
.outputMode("update")\
.format("console")\
.option("truncate", False)\
.start()
streamingQuery.awaitTermination()
We now test the new Spark program.
Data source and timeline visualization
We provide two Python programs for this exercise: a data generator and a tool for visualizing words in a timeline. Instructions to get and run these two programs are given in the activity below.
The data generator is our data source. It generates two words every second for a certain amount of time. Each word is saved in a separate CSV file in source directory. It also saves the list of all generated words to a summary CSV file.
The visualization tool takes as its input the summary CSV file written by the data generator and visualizes the words on a timeline.
Activity
- Copy to your home directory in the local filesystem the data generator that you find at the following path
/usr/users/cpu-prof/cpu_quercini/structured-streaming/tempws_gen.py
Start your Spark program. When running the first time, you might get some errors. Correct your code accordingly.
In another terminal, run the Python script
tempws_gen.py
. Use the following command to learn how to run this program:
python3 tempws_gen.py --help
For this exercise, do not introduce any delay
(keep the default values of the parameters --delay
, --mindelay
, --maxdelay
).
After launching the data generator, you should see some output in the terminal where you launched the Spark program. Wait for the script
tempws_gen.py
to terminate the data generation. The output might be a bit overwhelming. Scroll up to identify the results on each micro-batch.If you need to rerun the Spark program and the data generator, make sure you delete all the files in the checkpoint location and the source directory.
We now want to analyze the output of the program.
The script
tempws_gen.py
has generated a filegen_words.csv
in your home directory. This file contains the list of all words generated with the relative timestamps. Download the file to your computer.Download the visualization tool that you find at the following path:
/usr/users/cpu-prof/cpu_quercini/structured-streaming/timeline_visualization.py
to your computer.
Visualization tool
Use the following command to learn how to run the visualization tool:
python timeline_visualization.py --help
The visualization tool displays a vertical blue bar at each trigger. To this purpose, you’ll need to pass the tool the timestamps associated to the first and last trigger and the interval (in seconds) between two consecutive triggers.
You can get the timestamps associated to the first and last trigger by analyzing the output of Spark. More specifically, for each micro-batch, Spark outputs the progress details of the streaming query; you’ll need to look at the timestamp associated to the first and last micro-batch.
Exercise
Exercise 4.2
Analyze the output of your Spark program and the timeline of the generated words.
- Describe how the counts are updated by the Spark program.
Solution
Unlike the previous exercise, here the number of occurrences of each word is counted based on a time window of 10 seconds that slides every 5 seconds. Each word is associated with an event time that is used to compute the number of occurrences. The time window starts from the first trigger, say at 12:00. If a word arrives at 12:07, the count associated to this word are updated in two time windows, 12:00 - 12:10 and 12:05 - 12:15.