Spark DataFrames and Spark SQL
1 Number of partitions in a RDD
We consider a set of CSV files that contain temperature measurements over several years.
Each line has the following content: year,month,day,hour,minute,second,temperature
.
These files are stored at the following location: hdfs://sar01:9000/data/temperatures/
.
Here is the detail for each file:
- File
temperatures_86400.csv
contains one measurement per day between 1980 and 2018. - File
temperatures_2880.csv
contains one measurement every 2880 seconds between 1980 and 2018. - File
temperatures_86.csv
contains one measurement every 86 seconds for the year 1980 alone. - File
temperatures_10.csv
contains one measurement every 10 seconds between 1980 - 2018.
Get the file avg_temperatures_rdd.py
by typing the following command:
cp ~cpu_quercini/spark-sql-templates/avg_temperatures_rdd.py .
This file contains an efficient implementation of the function that computes the average yearly temperature, as we have seen in a previous tutorial.
1.1 Performances and number of partitions
Line 70. Replace
sarXX
withsar01
.Line 80. Replace
sarXX
withsar01
. Replace YOUR_DIRECTORY withbdiaspark2024/bdiaspark2024_XX/
(where XX corresponds to your username number).Observe the instructions at line 98 and line 104. They get and show the number of partitions of the input RDD
text_file
and the output RDDtemperatures
respectively.
Exercise
Exercise 1.1
Complete Table 1. Run the program on file temperatures_10.csv
. Write down the
execution time and the number of partitions of both RDDs text_file
and temperatures
.
Reminder. The command to run the program is as follows:
spark-submit --master spark://sar01:7077 avg_temperatures_rdd.py temperatures_10.csv
File |
Execution time (sec) |
Number of partitions ( text_file )
|
Number of partitions ( temperatures )
|
temperatures_86400.csv | |||
temperatures_2880.csv | |||
temperatures_86.csv | |||
temperatures_10.csv |
1.2 Analysis of Spark’s operation
Exercise
Exercise 1.2
Could you understand how Spark determines the number of partitions of the RDD
text_file
by looking at the size of the input files?
HINT. If you divide the size of the filetemperatures_10.csv
by the number of partitions of the RDDtext_file
, which value do you obtain? What does this value represent?List the files that are stored in the output folder
temperatures_10.rdd.out
under your HDFS folder. What do you notice? Is there any relation with respect to the number of partitions?
Good to know
In order to list the content of the folder in HDFS, you can use the following command:
hdfs dfs -ls hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_XX/temperatures_10.rdd.out
2 Using the DataFrame API to compute the average temperatures
You’re now going to implement a Spark program to compute the average temperatures by using the Spark DataFrame API.
Go through the following steps:
- Copy the code template
avg_temperatures_df.py
to your home folder by executing the following command:
cp ~cpu_quercini/spark-sql-templates/avg_temperatures_df.py .
- The result of the computation will be stored in the folder
temperatures_*.df.out
under your HDFS folderbdiaspark2024/bdiaspark2024_XX
.
Exercise
Exercise 2.1
Line 78. Replace
sarXX
withsar01
.Line 89. Replace
sarXX
withsar01
.Line 106. Complete the instruction to read from the input CSV file. Please note that the input CSV files do not have headers. Don’t use schema inference, just specify your schema manually. As a reminder, the columns are:
year
,month
,day
,hour
,minute
,second
,temperature
.Line 55. Complete the definition of function
avg_temperature_df
.Execute your code on all the input CSV files and complete Table 2.
File |
Execution time RDD (sec) |
Execution time DataFrame (sec) |
temperatures_86400.csv | Exercise 1.1 | |
temperatures_2880.csv | Exercise 1.1 | |
temperatures_86.csv | Exercise 1.1 | |
temperatures_10.csv | Exercise 1.1 |
Exercise
Exercise 2.2
Compare the execution times with the ones obtained with the implementation using the RDDs. What do you observe? How do you explain the differences?
2.1 Caching a DataFrame
You’re now going to discover the advantages of caching a DataFrame.
Uncomment the last two lines in file
avg_temperatures_df.py
Remove the file
temperatures_10.df.out
by typing the following command:
hdfs dfs -rm -r hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_XX/temperatures_10.df.out
Exercise
Exercise 2.3
Execute the code on file temperatures_10.csv
.
What is the execution time of each action?
Can you explain in detail what is going on here?
- Remove files
temperatures_10.df.out
andtemperatures_10.df.out.bis
.
Exercise
Exercise 2.4
Cache the DataFrame df_avg
and
execute the code again on file temperatures_10.csv
.
Where should you add the cache instruction?
What is the execution time of each action?
Can you explain in detail what is going on here?
3 Computing averages with SQL
You’re now going to implement the computation of the yearly average temperatures by using SQL on Spark DataFrames.
3.1 Using a view
A first option to query a DataFrame with SQL is to create a view.
- Copy the file
avg_temperatures_sql_view.py
to your home folder by typing the following command:
cp ~cpu_quercini/spark-sql-templates/avg_temperatures_sql_view.py .
Complete the code in lines 90, 101, 118.
Implement the function
avg_temperature_sql
(line 57).
Exercise
Exercise 3.1
Execute the code on all CSV files and complete Table 3.
What can you tell about the the running times ? Do you find significant differences between using SQL on a view and DataFrame functions?
File |
Execution time RDD (sec) |
Execution time DataFrame (sec) |
Execution time SQL view (sec) |
temperatures_86400.csv | Exercise 1.1 | Exercise 2.1 | |
temperatures_2880.csv | Exercise 1.1 | Exercise 2.1 | |
temperatures_86.csv | Exercise 1.1 | Exercise 2.1 | |
temperatures_10.csv | Exercise 1.1 | Exercise 2.1 |
3.2 Using a table
A second option to query a DataFrame with SQL is to create a table.
- Copy the file
avg_temperatures_sql_table.py
to your home directory by typing the following command:
cp ~cpu_quercini/spark-sql-templates/avg_temperatures_sql_table.py .
Complete lines 50, 112, 123 and 140.
Implement the function
avg_temperature_sql
(line 76).
Exercise
Exercise 3.2
Execute the code on all files.
Complete Table 4.
Compare the execution times with the ones that you obtained. Discuss the results.
Good to know
When you test the code for the first time,
it’s possible that your program will return errors (e.g. due to syntax problems).
In this case, it’s imperative to delete any files that may have been created
in the spark-warehouse
directory, as well as the metastore_db
directory, which you’ll find in your local working directory (not on HDFS).
File |
Execution time RDD (sec) |
Execution time DataFrame (sec) |
Execution time SQL view (sec) |
Execution time SQL table (sec) |
temperatures_86400.csv | 3.12 | Exercise 2.1 | Exercise 3.1 | |
temperatures_2880.csv | 3.67 | Exercise 2.1 | Exercise 3.1 | |
temperatures_86.csv | 4.59 | Exercise 2.1 | Exercise 3.1 | |
temperatures_10.csv | Exercise 1.1 | Exercise 2.1 | Exercise 3.1 |
Remove the file
temperatures_10.sql.table.out
Execute the code again on file
temperatures_10.csv
.
Exercise
Exercise 3.3 What is the execution time that you obtain now?
4 Using the DataFrame API on large files
We now consider the files stored under hdfs://sar01:9000/data/sales/
.
These files contain tabular data related to the sale of products in a chain of stores.
We consider two tables: store_sales
and customer
.
In the first table we find information about each sale,
such as the identifier of the product sold,
the identifier of the buyer,
the quantity of purchased product and the price paid by the customer.
For this table, we have 4 files, which only differ in size:
store_sales_1_4.100.dat
: contains 9.5 GiB of data.store_sales_1_4.200.dat
: contains 19 GiB of data.store_sales_1_4.400.dat
: contains 38 GiB of data.store_sales_1_4.800.dat
: contains 77 GiB of data.
In table customer
we find data about customers, such as first and last names and birth dates.
We only have one file for this table:
customer_10000.dat
: contains 8.3 GiB of data.
We want to test the performances of the DataFrame API on the following queries (WARNING. you must write a code that uses DataFrame functions, not SQL!):
- Query Q1: returns the number of clients. This corresponds to the following SQL query:
SELECT count(*)
FROM customer
Query Q2: returns the price of the most expensive product. This corresponds to the following SQL query:
SELECT max(ss_list_price)
FROM store_sales
Query Q3: returns the amount of money spent by each client. This corresponds to the following SQL query:
SELECT ss_customer_sk, SUM(ss_net_paid_inc_tax) as amountSpent
FROM store_sales
GROUP BY ss_customer_sk
Query Q4: Query Q3 + sort the result so that the client that spent the most money appears on the top. This corresponds to the following SQL query:
SELECT ss_customer_sk, SUM(ss_net_paid_inc_tax) as amountSpent
FROM store_sales
GROUP BY ss_customer_sk
ORDER BY amountSpent DESC
Query Q5: Query Q4 + join with the table customer
to get the first and last name of the customers.
This corresponds to the following SQL query:
SELECT c.c_first_name, c.c_last_name, SUM(ss_net_paid_inc_tax) as amountSpent
FROM store_sales s JOIN customer c ON s.ss_customer_sk = c.c_customer_sk
GROUP BY ss_customer_sk
ORDER BY amountSpent DESC
4.1 Development of the code using the DataFrame API.
- Copy the file
dataframe_api_benchmark.py
to your home directory by typing the following command:
cp ~cpu_quercini/spark-sql-templates/dataframe_api_benchmark.py .
Modify the code by following the instructions in the file.
Execute the code on file
store_sales_100.dat
(the smallest one) to test that your code is bug-free.Once you’re sure that your code is correct, uncomment lines 82 and 83. This will cache the two DataFrames.
Execute the code on all files
store-sales_*.dat
Good to know
Each query is executed 5 times to have a correct estimate of the execution time. You’ll see that the execution times fluctuate on the first iterations and they stabilize in the later iterations. When you write down the execution times, only consider the execution times obtained at the last iteration.
Exercise
Exercise 4.1
Complete Table 5 and write down the execution time of each query for each file.
Why the execution time of the queries Q1 and Q2 is large at the iteration 0?
Do you think that the difference between the execution times of the queries is reasonable?
Do you think that the augmentation of the execution times is reasonable given the size of the input files?
File / query |
Read (sec) |
Query Q1 (sec) |
Query Q2 (sec) |
Query Q3 (sec) |
Query Q4 (sec) |
Query Q5 (sec) |
store_sales_1_4.100.dat | ||||||
store_sales_1_4.200.dat | ||||||
store_sales_1_4.400.dat | ||||||
store_sales_1_4.800.dat |