Big data for AI — Lab assignment 2

Spark DataFrames and Spark SQL


1 Number of partitions in a RDD

We consider a set of CSV files that contain temperature measurements over several years. Each line has the following content: year,month,day,hour,minute,second,temperature.

These files are stored at the following location: hdfs://sar01:9000/data/temperatures/.

Here is the detail for each file:

  • File temperatures_86400.csv contains one measurement per day between 1980 and 2018.
  • File temperatures_2880.csv contains one measurement every 2880 seconds between 1980 and 2018.
  • File temperatures_86.csv contains one measurement every 86 seconds for the year 1980 alone.
  • File temperatures_10.csv contains one measurement every 10 seconds between 1980 - 2018.

Get the file avg_temperatures_rdd.py by typing the following command:

cp ~cpu_quercini/spark-sql-templates/avg_temperatures_rdd.py .

This file contains an efficient implementation of the function that computes the average yearly temperature, as we have seen in a previous tutorial.

1.1 Performances and number of partitions

  • Line 70. Replace sarXX with sar01.

  • Line 80. Replace sarXX with sar01. Replace YOUR_DIRECTORY with bdiaspark2024/bdiaspark2024_XX/ (where XX corresponds to your username number).

  • Observe the instructions at line 98 and line 104. They get and show the number of partitions of the input RDD text_file and the output RDD temperatures respectively.

Exercise

Exercise 1.1 Complete Table 1. Run the program on file temperatures_10.csv. Write down the execution time and the number of partitions of both RDDs text_file and temperatures.

Reminder. The command to run the program is as follows:

spark-submit --master spark://sar01:7077 avg_temperatures_rdd.py temperatures_10.csv

File Execution time
(sec)
Number of partitions
(text_file)
Number of partitions
(temperatures)
temperatures_86400.csv
temperatures_2880.csv
temperatures_86.csv
temperatures_10.csv
Table 1. Execution time and partition numbers with RDD.

1.2 Analysis of Spark’s operation

Exercise

Exercise 1.2

  • Could you understand how Spark determines the number of partitions of the RDD text_file by looking at the size of the input files?
    HINT. If you divide the size of the file temperatures_10.csv by the number of partitions of the RDD text_file, which value do you obtain? What does this value represent?

  • List the files that are stored in the output folder temperatures_10.rdd.out under your HDFS folder. What do you notice? Is there any relation with respect to the number of partitions?

Good to know

In order to list the content of the folder in HDFS, you can use the following command:

hdfs dfs -ls hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_XX/temperatures_10.rdd.out

2 Using the DataFrame API to compute the average temperatures

You’re now going to implement a Spark program to compute the average temperatures by using the Spark DataFrame API.

Go through the following steps:

  • Copy the code template avg_temperatures_df.py to your home folder by executing the following command:

cp ~cpu_quercini/spark-sql-templates/avg_temperatures_df.py .

  • The result of the computation will be stored in the folder temperatures_*.df.out under your HDFS folder bdiaspark2024/bdiaspark2024_XX.

Exercise

Exercise 2.1

  • Line 78. Replace sarXX with sar01.

  • Line 89. Replace sarXX with sar01.

  • Line 106. Complete the instruction to read from the input CSV file. Please note that the input CSV files do not have headers. Don’t use schema inference, just specify your schema manually. As a reminder, the columns are: year, month, day, hour, minute, second, temperature.

  • Line 55. Complete the definition of function avg_temperature_df.

  • Execute your code on all the input CSV files and complete Table 2.

File Execution time RDD
(sec)
Execution time DataFrame
(sec)
temperatures_86400.csv Exercise 1.1
temperatures_2880.csv Exercise 1.1
temperatures_86.csv Exercise 1.1
temperatures_10.csv Exercise 1.1
Table 2. RDDs vs. DataFrames.

Exercise

Exercise 2.2

Compare the execution times with the ones obtained with the implementation using the RDDs. What do you observe? How do you explain the differences?

2.1 Caching a DataFrame

You’re now going to discover the advantages of caching a DataFrame.

  • Uncomment the last two lines in file avg_temperatures_df.py

  • Remove the file temperatures_10.df.out by typing the following command:

hdfs dfs -rm -r hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_XX/temperatures_10.df.out

Exercise

Exercise 2.3 Execute the code on file temperatures_10.csv. What is the execution time of each action? Can you explain in detail what is going on here?

  • Remove files temperatures_10.df.out and temperatures_10.df.out.bis.

Exercise

Exercise 2.4 Cache the DataFrame df_avg and execute the code again on file temperatures_10.csv.

  • Where should you add the cache instruction?

  • What is the execution time of each action?

  • Can you explain in detail what is going on here?

3 Computing averages with SQL

You’re now going to implement the computation of the yearly average temperatures by using SQL on Spark DataFrames.

3.1 Using a view

A first option to query a DataFrame with SQL is to create a view.

  • Copy the file avg_temperatures_sql_view.py to your home folder by typing the following command:

cp ~cpu_quercini/spark-sql-templates/avg_temperatures_sql_view.py .

  • Complete the code in lines 90, 101, 118.

  • Implement the function avg_temperature_sql (line 57).

Exercise

Exercise 3.1

  • Execute the code on all CSV files and complete Table 3.

  • What can you tell about the the running times ? Do you find significant differences between using SQL on a view and DataFrame functions?

File Execution time RDD
(sec)
Execution time DataFrame
(sec)
Execution time SQL view
(sec)
temperatures_86400.csv Exercise 1.1 Exercise 2.1
temperatures_2880.csv Exercise 1.1 Exercise 2.1
temperatures_86.csv Exercise 1.1 Exercise 2.1
temperatures_10.csv Exercise 1.1 Exercise 2.1
Table 3. RDDs vs DataFrames with views.

3.2 Using a table

A second option to query a DataFrame with SQL is to create a table.

  • Copy the file avg_temperatures_sql_table.py to your home directory by typing the following command:

cp ~cpu_quercini/spark-sql-templates/avg_temperatures_sql_table.py .

  • Complete lines 50, 112, 123 and 140.

  • Implement the function avg_temperature_sql (line 76).

Exercise

Exercise 3.2

  • Execute the code on all files.

  • Complete Table 4.

  • Compare the execution times with the ones that you obtained. Discuss the results.

Good to know

When you test the code for the first time, it’s possible that your program will return errors (e.g. due to syntax problems). In this case, it’s imperative to delete any files that may have been created in the spark-warehouse directory, as well as the metastore_db directory, which you’ll find in your local working directory (not on HDFS).

File Execution time RDD
(sec)
Execution time DataFrame
(sec)
Execution time SQL view
(sec)
Execution time SQL table
(sec)
temperatures_86400.csv 3.12 Exercise 2.1 Exercise 3.1
temperatures_2880.csv 3.67 Exercise 2.1 Exercise 3.1
temperatures_86.csv 4.59 Exercise 2.1 Exercise 3.1
temperatures_10.csv Exercise 1.1 Exercise 2.1 Exercise 3.1
Table 4. RDDs vs DataFrames with views and tables.
  • Remove the file temperatures_10.sql.table.out

  • Execute the code again on file temperatures_10.csv.

Exercise

Exercise 3.3 What is the execution time that you obtain now?

4 Using the DataFrame API on large files

We now consider the files stored under hdfs://sar01:9000/data/sales/.

These files contain tabular data related to the sale of products in a chain of stores. We consider two tables: store_sales and customer. In the first table we find information about each sale, such as the identifier of the product sold, the identifier of the buyer, the quantity of purchased product and the price paid by the customer. For this table, we have 4 files, which only differ in size:

  • store_sales_1_4.100.dat: contains 9.5 GiB of data.

  • store_sales_1_4.200.dat: contains 19 GiB of data.

  • store_sales_1_4.400.dat: contains 38 GiB of data.

  • store_sales_1_4.800.dat: contains 77 GiB of data.

In table customer we find data about customers, such as first and last names and birth dates. We only have one file for this table:

  • customer_10000.dat: contains 8.3 GiB of data.

We want to test the performances of the DataFrame API on the following queries (WARNING. you must write a code that uses DataFrame functions, not SQL!):

  • Query Q1: returns the number of clients. This corresponds to the following SQL query:
SELECT count(*) 
FROM customer

Query Q2: returns the price of the most expensive product. This corresponds to the following SQL query:

SELECT max(ss_list_price) 
FROM store_sales

Query Q3: returns the amount of money spent by each client. This corresponds to the following SQL query:

SELECT ss_customer_sk, SUM(ss_net_paid_inc_tax) as amountSpent 
FROM store_sales
GROUP BY ss_customer_sk

Query Q4: Query Q3 + sort the result so that the client that spent the most money appears on the top. This corresponds to the following SQL query:

SELECT ss_customer_sk, SUM(ss_net_paid_inc_tax) as amountSpent 
FROM store_sales
GROUP BY ss_customer_sk
ORDER BY amountSpent DESC

Query Q5: Query Q4 + join with the table customer to get the first and last name of the customers. This corresponds to the following SQL query:

SELECT c.c_first_name, c.c_last_name, SUM(ss_net_paid_inc_tax) as amountSpent 
FROM store_sales s JOIN customer c ON s.ss_customer_sk = c.c_customer_sk 
GROUP BY ss_customer_sk
ORDER BY amountSpent DESC

4.1 Development of the code using the DataFrame API.

  • Copy the file dataframe_api_benchmark.py to your home directory by typing the following command:
cp ~cpu_quercini/spark-sql-templates/dataframe_api_benchmark.py .
  • Modify the code by following the instructions in the file.

  • Execute the code on file store_sales_100.dat (the smallest one) to test that your code is bug-free.

  • Once you’re sure that your code is correct, uncomment lines 82 and 83. This will cache the two DataFrames.

  • Execute the code on all files store-sales_*.dat

Good to know

Each query is executed 5 times to have a correct estimate of the execution time. You’ll see that the execution times fluctuate on the first iterations and they stabilize in the later iterations. When you write down the execution times, only consider the execution times obtained at the last iteration.

Exercise

Exercise 4.1

  • Complete Table 5 and write down the execution time of each query for each file.

  • Why the execution time of the queries Q1 and Q2 is large at the iteration 0?

  • Do you think that the difference between the execution times of the queries is reasonable?

  • Do you think that the augmentation of the execution times is reasonable given the size of the input files?

File / query Read
(sec)
Query Q1
(sec)
Query Q2
(sec)
Query Q3
(sec)
Query Q4
(sec)
Query Q5
(sec)
store_sales_1_4.100.dat
store_sales_1_4.200.dat
store_sales_1_4.400.dat
store_sales_1_4.800.dat
Table 5. Execution times of the queries on the sales dataset.