Big data — Lab assignment 1

Apache Spark — Programming with RDD


Refer to this documentation to learn how to connect and interact with the cluster.

Assignment submission

This lab assignment will be evaluated.

You need to submit a .zip file containing the following files:

  • Source code of the programs that you write.

  • A PDF document with the answer to the questions that you find in this document.

Please send me the zip file by email.

The submission deadline is Thursday, May 20, 2021 8:00 AM.

1 Computing averages

We consider a collection of CSV files containing temperature measurements in the following format:

year,month,day,hours,minutes,seconds,temperature

you can find the files under the directory hdfs://sar01:9000/data/temperatures/

Here are the details for each file:

  • File temperatures_86400.csv contains one measurement per day in the years 1980 - 2018.
  • File temperatures_2880.csv contains one measurement every 2880 seconds in the years 1980 - 2018.
  • File temperatures_86.csv contains one measurement every 86 seconds for the year 1980 alone.
  • File temperatures_10.csv contains one measurement every 10 seconds for the years 1980 - 2018.

We intend to implement a Spark algorithm to generate pairs \((y, t_{avg})\), where \(y\) is the year and \(t_{avg}\) is the average temperature in the year.

1.1 First implementation

Copy the file ~vialle/DCE-Spark/template_temperatures.py to your home directory by typing the following command:

cp ~vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_first.py

Open the file avg_temperatures_first.py and write the following function:

def avg_temperature(theTextFile): 
    temperatures = theTextFile \ 
                        .map(lambda line: line.split(",")) \ 
                        .map(lambda term: (term[0],   [float(term[6])])) \ 
                        .reduceByKey(lambda x, y: x+y) \ 
                        .mapValues(lambda lv: sum(lv)/len(lv)) 
    return temperatures 

In the same file, locate the two variables input_path and output-path. and write the following code:

input_path = "hdfs://sar01:9000/data/temperatures/"
output_path = "hdfs://sar01:9000/cpuecm1/cpuecm1_XX/"

Don’t forget the / at the end of the file paths and to replace XX with the number at the end of your username.

Exercise

Exercise 1.1

  • Run the script avg_temperatures_first.py by using temperatures_86400.csv as an input. To this extent, use the following command:
spark-submit --master spark://sar01:7077 avg_temperatures_first.py temperatures_86400.csv

You should find the output of the program under the folder

hdfs://sar01:9000/cpuecm1/cpuecm1_XX/temperatures_86400.out

  • What’s the execution time?
    • In the output of Spark on the command line you should see a line that mentions something along the following line:
INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.478220 s 
  • Run the same script by using temperatures_2880.csv as an input.

  • What is the execution time? Does it seem reasonable compared with the execution time that you observed before? Justify your answer.

  • Execute the same script by using temperatures_86.csv as an input.

  • What is the execution time? How would you justify it, knowing that the files temperatures_2880.csv and temperatures_86.csv have a similar size (11 MB the former, 9 MB the latter)?

```

1.2 Second implementation

Copy the file ~vialle/DCE-Spark/template_temperatures.py to your working directory by typing the following command:

cp ~vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_second.py

Exercise

Exercise 1.2 Based on the observations made in the previous exercise, write an improved implementation of the function avg_temperature.

Exercise

Exercise 1.3

  • Run the script avg_temperatures_second.py by using temperatures_86.csv as an input.

  • What’s the execution time? Compare it with the execution time obtained in the previous exercise and comment the difference.

  • Run the same script by using temperatures_10.csv (3 GB!) as an input. Do you think that the program takes too long? Why?

2 Common friends in a social network

Consider a social network described by a graph encoded in a text file. Each line of the file is a list of identifiers separated by commas. For instance, the line \(A,B,C,D\) means that \(A\) is friend with \(B\), \(C\) and \(D\). An excerpt of the file looks like as follows:

A,B,C,D
B,A,D
C,A,D
D,A,B,C
...

We suppose that the friendship relation is symmetric: \((A, B)\) implies \((B, A)\).

We want to obtain the list of the common friends for each pair of individuals:

(A, B), [D]
(A, C), [D] 
(A, D), [B, C] 
(B, C), [D] 
(B, D), [A] 
(C, D), [A]

As an additional constraint, we want to represent a couple only once and avoid to represent the symmetric couple. In other words, if we output \((A, B)\), we don’t want to output \((B, A)\).

We use the following input files available in folder hdfs://sar01:9000/data/sn/:

  • sn_tiny.csv. Small social network, that you can use to test your implementation.

  • sn_10k_100k.csv. Social network with \(10^4\) individuals and \(10^5\) links.

  • sn_100k_100k.csv. Social network with \(10^5\) individuals and \(10^5\) links.

  • sn_1k_100k.csv. Social network with \(10^3\) individuals and \(10^5\) links.

  • sn_1m_1m.csv. Social network with \(10^6\) individuals and \(10^6\) links.

Exercise

Exercise 2.1 Write an implementation in Spark. Test your implementation on file sn_tiny.csv.

Exercise

Exercise 2.2 Run your implementation on the other files and write down the execution times. Comment on the execution times considering the file sizes, the number of nodes and links and the number of pairs \(((A, B), X)\) generated by the algorithm.

Exercise

Exercise 2.3

  1. By using a MapReduce-style algorithm, write a Spark program to compute the minimum, maximum and average degree of a node in a given graph.

  2. Compute the minimum, maximum and average degree on all the given input files.

  3. Do these values confirm or invalidate the considerations that you made on the execution times of the algorithm in the first exercise? Justify your answer.

3 Creating an inverted index

In folder hdfs://sar01:9000/data/bbc/ you’ll find a collection of 50 articles obtained from the BBC website (2004-2005) organized into five subfolders: business, entertainment, politics, sport and technology.

We want to create an inverted index, which associates each word with the list of the files in which the word occurs. More specifically, for each word, the inverted index will have a list of the names of the files (path relative to the folder /data/bbc) that contain the word.

The inverted index:

  • must not contain the same word twice;

  • must not contain any stopwords (the list of stopwords is provided in the hdfs://sar01:9000/data/stopwords.txt file);

Moreover:

  • Words in the inverted index must only contain letters.

  • Words in the inverted index must be lowercase.

Exercise

Exercise 3.1 Write a Spark program to create an inverted index and execute it on the input folder. You can use the template available at ~vialle/DCE-Spark/template_inverted_index.py.