Running Spark programs on a cluster
1 Computing averages
We consider a collection of CSV files containing temperature measurements in the following format:
year,month,day,hours,minutes,seconds,temperature
you can find the files under the directory hdfs://sar01:9000/data/temperatures/
Here are the details for each file:
- File
temperatures_86400.csv
contains one measurement per day in the years 1980 - 2018. - File
temperatures_2880.csv
contains one measurement every 2880 seconds in the years 1980 - 2018. - File
temperatures_86.csv
contains one measurement every 86 seconds for the year 1980 alone. - File
temperatures_10.csv
contains one measurement every 10 seconds for the years 1980 - 2018.
We intend to implement a Spark algorithm to generate pairs \((y, t_{avg})\), where \(y\) is the year and \(t_{avg}\) is the average temperature in the year.
1.1 First implementation
Copy the file ~cpu_vialle/DCE-Spark/template_temperatures.py
to your home directory by typing
the following command:
cp ~cpu_vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_slow.py
Open the file avg_temperatures_slow.py
.
The file contains the implementation of the function avg_temperature_slow
that:
takes in an RDD, where each item is a line of the input text file;
returns an RDD, where each item is a key-value pair
(year, avg_temp)
.
In the same file, locate the two variables input_path
and output-path
and write the following code:
input_path = "hdfs://sar01:9000/data/temperatures/"
output_path = "hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/"
Replace X
with the number corresponding to your account!
Don’t forget the / at the end of the file paths!
Execute the following actions:
- Run the script
avg_temperatures_slow.py
by usingtemperatures_86400.csv
as an input. To this extent, use the following command:
spark-submit --master spark://sar01:7077 avg_temperatures_slow.py temperatures_86400.csv
- You should find the output of the program under the following folder:
hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/temperatures_86400.out
Type the following command to verify it :
hdfs dfs -ls hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/temperatures_86400.out
- If you want to read the result of the computation, you can execute the following command:
hdfs dfs -cat hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/temperatures_86400.out/*
Exercise
Exercise 1.1
In the output of Spark on the command line you should see a line that reads something similar to the following phrase:
INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.478220 s
Note the execution time that you obtained.
Run the same script by using
temperatures_2880.csv
as an input.What is the execution time? Does it seem reasonable compared with the execution time that you observed before? Justify your answer.
Execute the same script by using
temperatures_86.csv
as an input.What is the execution time? How would you justify it, knowing that the files
temperatures_2880.csv
andtemperatures_86.csv
have a similar size (11 MB the former, 9 MB the latter)?
1.2 Second implementation
We want to implement a better version of the program. You can draft your ideas on paper before you write any code.
When you’re ready, create a copy of avg_temperatures_slow.py
and rename it as avg_temperatures_fast.py
, with the following command:
cp ./avg_temperatures_slow.py ./avg_temperatures_fast.py
Exercise
Exercise 1.2
Open the file and implement the function avg_temperature_fast
.
NOTE. Remember to comment the call
to avg_temperature_slow
and to uncomment the call to avg_temperature_fast
at the end of the file.
Exercise
Exercise 1.3
Run the script
avg_temperatures_fast.py
by usingtemperatures_86.csv
as an input.What’s the execution time? Compare it with the execution time obtained in the previous exercise and comment the difference.
Run the same script by using
temperatures_10.csv
(3 GB!) as an input. Do you think that the program takes too long? Why?
3 Average and standard deviation
We use the same files as in the first question. Our objective is to write a Spark program that produces triples \((y, t_{\mu}, t_{\sigma})\), where \(y\), \(t_{\mu}\) and \(t_{\sigma}\) are the year, the average temperature in the year and the standard deviation respectively.
We can express the standard deviation of \(n\) values \(x_1 \ldots x_n\) with the following formula:
\[ \sigma = \sqrt{\overline{x^2} - \overline{x}^2} = \sqrt{\frac{\sum_{i=1}^n (x_i)^2}{n} - \Bigg(\frac{\sum_{i=1}^n x_i}{n}\Bigg)^2} \]
Type the following command to get a code template:
cp ~cpu_vialle/DCE-Spark/template_temperatures.py ./avg_stddev_temp.py
Exercise
Exercise 3.1
Define a new function
avg_stddev_temperature
in fileavg_stddev_temp.py
.Execute the script and observe the results.