Apache Spark — Programming with RDD
Refer to this documentation to learn how to connect and interact with the cluster.
Assignment submission
This lab assignment will be evaluated.
You need to submit a .zip file containing the following files:
Source code of the programs that you write.
A PDF document with the answer to the questions that you find in this document.
Please send me the zip file by email.
The submission deadline is Thursday, May 20, 2021 8:00 AM.
1 Computing averages
We consider a collection of CSV files containing temperature measurements in the following format:
year,month,day,hours,minutes,seconds,temperature
you can find the files under the directory hdfs://sar01:9000/data/temperatures/
Here are the details for each file:
- File
temperatures_86400.csv
contains one measurement per day in the years 1980 - 2018. - File
temperatures_2880.csv
contains one measurement every 2880 seconds in the years 1980 - 2018. - File
temperatures_86.csv
contains one measurement every 86 seconds for the year 1980 alone. - File
temperatures_10.csv
contains one measurement every 10 seconds for the years 1980 - 2018.
We intend to implement a Spark algorithm to generate pairs \((y, t_{avg})\), where \(y\) is the year and \(t_{avg}\) is the average temperature in the year.
1.1 First implementation
Copy the file ~vialle/DCE-Spark/template_temperatures.py
to your home directory by typing
the following command:
cp ~vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_first.py
Open the file avg_temperatures_first.py
and write the following function:
def avg_temperature(theTextFile):
temperatures = theTextFile \
.map(lambda line: line.split(",")) \
.map(lambda term: (term[0], [float(term[6])])) \
.reduceByKey(lambda x, y: x+y) \
.mapValues(lambda lv: sum(lv)/len(lv))
return temperatures
In the same file, locate the two variables input_path
and output-path
.
and write the following code:
input_path = "hdfs://sar01:9000/data/temperatures/"
output_path = "hdfs://sar01:9000/cpuecm1/cpuecm1_XX/"
Don’t forget the / at the end of the file paths and to replace XX with the number at the end of your username.
Exercise
Exercise 1.1
- Run the script
avg_temperatures_first.py
by usingtemperatures_86400.csv
as an input. To this extent, use the following command:
spark-submit --master spark://sar01:7077 avg_temperatures_first.py temperatures_86400.csv
You should find the output of the program under the folder
hdfs://sar01:9000/cpuecm1/cpuecm1_XX/temperatures_86400.out
- What’s the execution time?
- In the output of Spark on the command line you should see a line that mentions something along the following line:
INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.478220 s
Run the same script by using
temperatures_2880.csv
as an input.What is the execution time? Does it seem reasonable compared with the execution time that you observed before? Justify your answer.
Execute the same script by using
temperatures_86.csv
as an input.What is the execution time? How would you justify it, knowing that the files
temperatures_2880.csv
andtemperatures_86.csv
have a similar size (11 MB the former, 9 MB the latter)?
```
1.2 Second implementation
Copy the file ~vialle/DCE-Spark/template_temperatures.py
to your working directory by typing
the following command:
cp ~vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_second.py
Exercise
Exercise 1.2
Based on the observations made in the
previous exercise, write an improved implementation of the
function avg_temperature
.
Exercise
Exercise 1.3
Run the script
avg_temperatures_second.py
by usingtemperatures_86.csv
as an input.What’s the execution time? Compare it with the execution time obtained in the previous exercise and comment the difference.
Run the same script by using
temperatures_10.csv
(3 GB!) as an input. Do you think that the program takes too long? Why?
3 Creating an inverted index
In folder hdfs://sar01:9000/data/bbc/
you’ll find a collection of 50
articles obtained from the BBC website (2004-2005) organized into five subfolders:
business, entertainment, politics, sport and technology.
We want to create an inverted index, which associates each word with the list of the
files in which the word occurs.
More specifically, for each word, the inverted index will have a list of the
names of the files (path relative to the folder /data/bbc
) that contain the word.
The inverted index:
must not contain the same word twice;
must not contain any stopwords (the list of stopwords is provided in the
hdfs://sar01:9000/data/stopwords.txt
file);
Moreover:
Words in the inverted index must only contain letters.
Words in the inverted index must be lowercase.
Exercise
Exercise 3.1
Write a Spark program to create an inverted index and execute it on the
input folder.
You can use the template available at ~vialle/DCE-Spark/template_inverted_index.py
.