Running Spark programs on a cluster
1 Computing averages
We consider a collection of CSV files containing temperature measurements in the following format:
year,month,day,hours,minutes,seconds,temperature
you can find the files under the directory hdfs://sar01:9000/data/temperatures/
Here are the details for each file:
- File
temperatures_86400.csvcontains one measurement per day in the years 1980 - 2018. - File
temperatures_2880.csvcontains one measurement every 2880 seconds in the years 1980 - 2018. - File
temperatures_86.csvcontains one measurement every 86 seconds for the year 1980 alone. - File
temperatures_10.csvcontains one measurement every 10 seconds for the years 1980 - 2018.
We intend to implement a Spark algorithm to generate pairs \((y, t_{avg})\), where \(y\) is the year and \(t_{avg}\) is the average temperature in the year.
1.1 First implementation
Copy the file ~cpu_vialle/DCE-Spark/template_temperatures.py
to your home directory by typing
the following command:
cp ~cpu_vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_slow.py
Open the file avg_temperatures_slow.py.
The file contains the implementation of the function avg_temperature_slow
that:
takes in an RDD, where each item is a line of the input text file;
returns an RDD, where each item is a key-value pair
(year, avg_temp).
In the same file, locate the two variables input_path and output-path
and write the following code:
input_path = "hdfs://sar01:9000/data/temperatures/"
output_path = "hdfs://sar01:9000/bdiaspark25/bdiaspark25_X/"
Replace X with the number corresponding to your account!
Don’t forget the / at the end of the file paths!
Execute the following actions:
- Run the script
avg_temperatures_slow.pyby usingtemperatures_86400.csvas an input. To this extent, use the following command:
spark-submit --master spark://sar01:7077 avg_temperatures_slow.py temperatures_86400.csv
- You should find the output of the program under the following folder:
hdfs://sar01:9000/bdiaspark25/bdiaspark25_X/temperatures_86400.out
Type the following command to verify it :
hdfs dfs -ls hdfs://sar01:9000/bdiaspark25/bdiaspark25_X/temperatures_86400.out
- If you want to read the result of the computation, you can execute the following command:
hdfs dfs -cat hdfs://sar01:9000/bdiaspark25/bdiaspark25_X/temperatures_86400.out/*
Exercise
Exercise 1.1
In the output of Spark on the command line you should see a line that reads something similar to the following phrase:
INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.478220 s
Note the execution time that you obtained.
Run the same script by using
temperatures_2880.csvas an input.What is the execution time? Does it seem reasonable compared with the execution time that you observed before? Justify your answer.
Execute the same script by using
temperatures_86.csvas an input.What is the execution time? How would you justify it, knowing that the files
temperatures_2880.csvandtemperatures_86.csvhave a similar size (11 MB the former, 9 MB the latter)?
1.2 Second implementation
We want to implement a better version of the program. You can draft your ideas on paper before you write any code.
When you’re ready, create a copy of avg_temperatures_slow.py and rename it as avg_temperatures_fast.py, with the following command:
cp ./avg_temperatures_slow.py ./avg_temperatures_fast.py
Exercise
Exercise 1.2
Open the file and implement the function avg_temperature_fast.
NOTE. Remember to comment the call
to avg_temperature_slow and to uncomment the call to avg_temperature_fast at the end of the file.
Exercise
Exercise 1.3
Run the script
avg_temperatures_fast.pyby usingtemperatures_86.csvas an input.What’s the execution time? Compare it with the execution time obtained in the previous exercise and comment the difference.
Run the same script by using
temperatures_10.csv(3 GB!) as an input. Do you think that the program takes too long? Why?
3 Average and standard deviation
We use the same files as in the first question. Our objective is to write a Spark program that produces triples \((y, t_{\mu}, t_{\sigma})\), where \(y\), \(t_{\mu}\) and \(t_{\sigma}\) are the year, the average temperature in the year and the standard deviation respectively.
We can express the standard deviation of \(n\) values \(x_1 \ldots x_n\) with the following formula:
\[ \sigma = \sqrt{\overline{x^2} - \overline{x}^2} = \sqrt{\frac{\sum_{i=1}^n (x_i)^2}{n} - \Bigg(\frac{\sum_{i=1}^n x_i}{n}\Bigg)^2} \]
Type the following command to get a code template:
cp ~cpu_vialle/DCE-Spark/template_temperatures.py ./avg_stddev_temp.py
Exercise
Exercise 3.1
Define a new function
avg_stddev_temperaturein fileavg_stddev_temp.py.Execute the script and observe the results.