Big data for AI — Tutorial 4

Spark structured streaming


1 Introduction

The goal of this tutorial is to learn how to analyze streams of data with the Spark Structured Streaming API.

Documentation

In order to answer the questions and do the exercises, you might want to refer to the following documentation:

2 Warming up

  • Copy the file warmup.py to your home folder by typing the following command:

cp /usr/users/cpu-prof/cpu_quercini/structured_streaming/warmup.py .

Exercise

Exercise 2.1

Read the code and answer the following questions.

  1. Where does this program get its input from?

  2. What object type does the variable lines contain (instruction at Line 28)?

  3. Where does this program write its output?

  4. What is the output of this program?

  5. What is the option checkpointLocation intended for?

  6. What does the instruction streamingQuery.awaitTermination()?

You can now verify your answers to the previous questions by executing the program.

Activity

  1. Create a checkpoint directory for the first exercise (e.g., checkpoint_dir) under your home directory hdfs://sar01:9000/bdiaspark23/bdiaspark23_X in HDFS. The command to do so is as follows:

hdfs dfs -mkdir hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/checkpoint_dir

  1. Complete line 21 of file warmup.py. Choose a port number in the interval [49152, 65535].

  2. Complete line 25 of file warmup.py. Write the path to the checkpoint location that you created on step 1.

  3. Open a new terminal window and connect to the same kyle machine to which you’re connected in the first terminal window:

ssh kyleXX

  1. In the new terminal window, start a netcat server. Use the following command. Replace [port_number] with the port number that you chose on step 2. The command will hang waiting for some input. This is normal.

nc -lk [port_number]

  1. In the old terminal window, execute the Spark program with spark-submit. Wait until Spark displays the content of the first micro-batch. In case the program stops for an error, remove all files generated in the checkpoint location with the following command and restart the Spark program.

hdfs dfs -rm -r hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/checkpoint_dir/*

  1. In the terminal window where the netcat server is running, write few lines of text. Look at the terminal where the Spark program is running and observe the output.

Stop the program

  • When you’re done with your experiments, you can stop the Spark program by simply typing CTRL-C in the terminal where Spark is running.

  • Remove all files in the checkpoint directory.

hdfs dfs -rm -r hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/checkpoint_dir/*

  • Don’t stop the netcat server, you’ll need it in the next exercise.

3 Triggers

In a Structured Streaming program we can choose a trigger type. A trigger determines when the streaming source is checked for new data.

By looking at the Structured Streaming programming guide, answer the following questions.

Exercise

Exercise 3.1

  1. What is the trigger type in the previous program?

  2. Modify the code of the previous program in order to set the Fixed interval micro-batches trigger type. Set an interval of 10 seconds.

  3. Run the program. Write many lines back to back on the netcat server. How is the behavior of this program different from before?

4 Checkpoint location and output mode

We’re now going to see the impact of the checkpoint location and the output modes on a streaming query.

Exercise

Exercise 4.1

  1. Look again at the Structured Streaming programming guide, and check the options for the output mode.

  2. What is the output mode of the previous program?

You’re now going to code a program that counts the number of occurrences of each word in a streaming data source. But first do the actions that you find in the following Activity.

Activity

  • Create a copy of the file warmup.py and rename it to wordcount.py by typing the following command:

cp warmup.py wordcount.py

  • Remove all files in the checkpoint location by typing the following command:

hdfs dfs -rm -r hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/checkpoint_dir/*

Exercise

Exercise 4.2

Before you code the program, you should first answer the following questions:

  1. Observe the output of a batch in the previous executions of the Spark programs. What is the type of the output?

  2. Given the type of the output, which Spark API are you going to use to code the program?

  3. Which operations do you need to execute to count the number of occurrences of each word across batches? For the moment, don’t worry about the specific functions you need to use, just think in abstract terms.

  4. Look at the Structured Streaming programming guide. Which output mode can’t you use? Why?

Exercise

Exercise 4.3

  1. Add to the file wordcount.py the instructions to count the number of occurrences of each word in the stream. Where are you going to add the new instructions?

  2. Based on the answer to the previous exercises, set the appropriate output mode. You have two choices. Try one of them.

  3. Make sure that the netcat server is still running.

  4. Run the program with spark-submit. In the netcat terminal, write few lines and observe the output of the Spark program.

  5. Stop the Spark program and run it again with no modifications. The first time you get a java.lang.IndexOutOfBoundsException. Don’t lose your hope and run it again.

  6. Write few lines in the netcat terminal and observe the output of the Spark program. What can you say about the word counts?

  7. Stop the program and remove the files in the checkpoint location. Run the program again and write few lines on the netcat terminal. What can you say about the word counts?

  8. Run the program again with a different output mode and observe the result.

5 Window operations on event time

Netcat and checkpoint

Important actions to do!

  1. Stop the netcat server now.

  2. Remove all files from the checkpoint directory.

We’re now going to find out how to perform aggregations over a sliding event-time window.

A given data source generates two words every second for a certain amount of time. Each word is accompanied with a timestamp that indicates the exact moment when the word is generated. This timestamp is the event time.

After generating a word, the data source saves the word and its timestamp into a CSV file in a directory on HDFS. For convenience, we’ll refer to this directory as the source directory.

At any given moment, the source will contain zero to many CSV files, where each file only contains exactly one line in the format word,timestamp (no whitespace before nor after the comma).

Activity

  1. Create the source directory under your home directory in HDFS, by typing the following command:

hdfs dfs -mkdir hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/source_dir

  1. Copy to your home directory the file tempws_gen.py (the data source) by typing the following command: the data generator that you find at the following path

cp /usr/users/cpu-prof/cpu_quercini/structured_streaming/tempws_gen.py .

  1. Copy to your home directory the file win_events.py by typing the following command:

cp /usr/users/cpu-prof/cpu_quercini/structured_streaming/win_events.py .

You now need to complete the code in file win_events.py.

Exercise

Exercise 5.1

  1. Open the file win_events.py.

  2. Complete Line 19. Specify the path to the source directory that you created in the previous activity.

  3. Complete Line 31. Write the query to count the number of occurrences of each word within a 10 second window that slides every 5 seconds. Look at the documentation of the window function to learn how to do it.

  4. Complete Line 34. Specify the path to the checkpoint directory.

  5. Complete Line 40. Write the code to write the output of the streaming query to console.

    • Use triggers of 5 seconds.
    • Use the output mode update.
    • Don’t forget to specify the location of the checkpoint directory.
  6. Execute win_events.py with spark-submit.

    • If you get some errors, stop the program. Correct the errors in the code and then re-execute the program again.
    • If no error arises, the program should hang waiting for some input. Let the program wait and go on to the next exercise.

We now test the Spark program.

Activity

  1. Open a new terminal window and type the following command to connect to the same kyle machine where you’re connected in the first terminal window:

ssh kyleXX

  1. Execute the data generator program by typing the following command (remember to replace X with your account number!):

python3 tempws_gen.py -s hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/source_dir

  1. After launching the data generator, the generated words are shown in the terminalyou should see some output in the terminal window where you launched the Spark program. Wait for the script tempws_gen.py to terminate the data generation.

  2. If you need to rerun the Spark program and the data generator, make sure you delete all the files in the checkpoint location and the source directory.

We now want to analyze the output of the program.

Activity

  • Create a new folder on your own computer (not the computers in the cluster) and open it in a new Visual Studio Code window. For the sake of simplicity, let’s call this folder window_analysis.

  • The script tempws_gen.py has generated a file gen_words.csv in your home directory in the cluster. This file contains the list of all words generated with the relative timestamps.

  • Open gen_words.csv and copy the whole content.

  • Create a new file gen_words.csv under window_analysis and paste the content that you copied on the previous step.

  • Copy the file timeline_visualization.py into your home folder in the cluster by typing the following command:

cp /usr/users/cpu-prof/cpu_quercini/structured_streaming/timeline_visualization.py .

  • Open the file timeline_visualization.py and copy all its content.

  • Create a new file timeline_visualization.py under window_analysis and paste the content that you copied on the previous step.

  • In the Visual Studio Code window where you opened window_analysis, open a terminal and run the following command: WARNING. If you’re on macOS, you should type python3 instead of python.

python timeline_visualization.py -i gen_words.csv -ft [first timestamp] -lt [last timestamp] -it 5 --slide 5

where:

  • gen_words.csv is the file that you previously created.

  • Replace first timestamp with the timestamp of the left boundary of the first window (look at the output of your Spark program).

  • Replace last timestamp with the timestamp of the right boundary of the last window (look at the output of your Spark program).

  • The option -it is used to specify the interval between two consecutive triggers.

  • The option --slide is used to specify the window slide frequency.

The plot will show a grid line at each trigger and a blue line at each window boundary.

Exercise

Exercise 5.2

  1. Analyze the output of your Spark program and the timeline of the generated words.

  2. Describe how the counts are updated by the Spark program.

6 Late data and watermarking

We’re now going to learn how Structured Streaming handles late data in windowed aggregations.

Remove generated files

  • Remove all the files in the source directory.

  • Remove all the files in the checkpoint directory.

The data generator tempws_gen.py can generate a stream of words, some of which might be written to the directory tempws with some amount of delay. In other words, there is a gap between the event time (when the word is generated) and the processing time (when the word is written to the directory).

Good to know

To generate data with some delay you can use the following command (remember to replace the X with your account number!):

python3 tempws_gen.py -s hdfs://sar01:9000/bdiaspark23/bdiaspark23_X/source_dir --delay 0.8

In the previous command, the 0.8 indicates that the probability that an event arrives with some delay is 80%. The delay is between 5 and 15 seconds. You can adjust these values by using the appropriate options. To learn more, type the following command:

python3 tempws_gen.py --help

Spark uses a mechanism called watermarking to specify the maximum delay that an event can take to be counted. If the difference between the arrival time and the event time is higher than a certain threshold (the watermark), the delayed data will be discarded.

Exercise

Exercise 6.1

  1. Write a Spark program that does the same aggregation as in the previous exercise. Additionally, the program must use watermarking to handle late data. You can look at the programming guide to learn how to do it

  2. Start the Spark program.

  3. Generate some data with delay with the program tempws_gen.py. Once the data generation stops, you can stop the Spark program.

  4. Visualize the generated words with the visualization tool. Late words have the delay indicated between parentheses.

  5. Observe the output of the Spark program and describe how the watermarking mechanism works on this example.