Big data for AI — Tutorial 2

Running Spark programs on a cluster


1 Computing averages

We consider a collection of CSV files containing temperature measurements in the following format:

year,month,day,hours,minutes,seconds,temperature

you can find the files under the directory hdfs://sar01:9000/data/temperatures/

Here are the details for each file:

  • File temperatures_86400.csv contains one measurement per day in the years 1980 - 2018.
  • File temperatures_2880.csv contains one measurement every 2880 seconds in the years 1980 - 2018.
  • File temperatures_86.csv contains one measurement every 86 seconds for the year 1980 alone.
  • File temperatures_10.csv contains one measurement every 10 seconds for the years 1980 - 2018.

We intend to implement a Spark algorithm to generate pairs \((y, t_{avg})\), where \(y\) is the year and \(t_{avg}\) is the average temperature in the year.

1.1 First implementation

Copy the file ~cpu_vialle/DCE-Spark/template_temperatures.py to your home directory by typing the following command:

cp ~cpu_vialle/DCE-Spark/template_temperatures.py ./avg_temperatures_slow.py

Open the file avg_temperatures_slow.py. The file contains the implementation of the function avg_temperature_slow that:

  • takes in an RDD, where each item is a line of the input text file;

  • returns an RDD, where each item is a key-value pair (year, avg_temp).

In the same file, locate the two variables input_path and output-path and write the following code:

input_path = "hdfs://sar01:9000/data/temperatures/"
output_path = "hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/"

Replace X with the number corresponding to your account! Don’t forget the / at the end of the file paths!

Execute the following actions:

  • Run the script avg_temperatures_slow.py by using temperatures_86400.csv as an input. To this extent, use the following command:

spark-submit --master spark://sar01:7077 avg_temperatures_slow.py temperatures_86400.csv

  • You should find the output of the program under the following folder:

hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/temperatures_86400.out

Type the following command to verify it :

hdfs dfs -ls hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/temperatures_86400.out

  • If you want to read the result of the computation, you can execute the following command:

hdfs dfs -cat hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X/temperatures_86400.out/*

Exercise

Exercise 1.1

In the output of Spark on the command line you should see a line that reads something similar to the following phrase:

   INFO DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:78, took 3.478220 s 
   
  • Note the execution time that you obtained.

  • Run the same script by using temperatures_2880.csv as an input.

  • What is the execution time? Does it seem reasonable compared with the execution time that you observed before? Justify your answer.

  • Execute the same script by using temperatures_86.csv as an input.

  • What is the execution time? How would you justify it, knowing that the files temperatures_2880.csv and temperatures_86.csv have a similar size (11 MB the former, 9 MB the latter)?

1.2 Second implementation

We want to implement a better version of the program. You can draft your ideas on paper before you write any code.

When you’re ready, create a copy of avg_temperatures_slow.py and rename it as avg_temperatures_fast.py, with the following command:

cp ./avg_temperatures_slow.py ./avg_temperatures_fast.py

Exercise

Exercise 1.2 Open the file and implement the function avg_temperature_fast.

NOTE. Remember to comment the call to avg_temperature_slow and to uncomment the call to avg_temperature_fast at the end of the file.

Exercise

Exercise 1.3

  • Run the script avg_temperatures_fast.py by using temperatures_86.csv as an input.

  • What’s the execution time? Compare it with the execution time obtained in the previous exercise and comment the difference.

  • Run the same script by using temperatures_10.csv (3 GB!) as an input. Do you think that the program takes too long? Why?

2 Common friends in a social network

Consider a social network described by a graph encoded in a text file. Each line of the file is a list of identifiers separated by commas. For instance, the line \(A,B,C,D\) means that \(A\) is friend with \(B\), \(C\) and \(D\). An excerpt of the file looks like as follows:

B,A,D
A,B,C,D
D,A,B,C
C,A,D
...

We suppose that the friendship relation is symmetric: \((A, B)\) implies \((B, A)\).

We want to obtain the list of the common friends for each pair of individuals:

(B, C), [A, D] 
(A, D), [B, C] 
(C, D), [A]
(A, C), [D] 
(B, D), [A] 
(A, B), [D]

As an additional constraint, we want to represent a couple only once and avoid to represent the symmetric couple. In other words, if we output \((A, B)\), we don’t want to output \((B, A)\).

We use the following input files available in folder hdfs://sar01:9000/data/social-network/:

  • sn_tiny.csv. Small social network, that you can use to test your implementation.

  • sn_10k_100k.csv. Social network with \(10^4\) individuals and \(10^5\) links.

  • sn_100k_100k.csv. Social network with \(10^5\) individuals and \(10^5\) links.

  • sn_1k_100k.csv. Social network with \(10^3\) individuals and \(10^5\) links.

  • sn_1m_1m.csv. Social network with \(10^6\) individuals and \(10^6\) links.

2.1 Implementation

Get the code template with the following command:

cp ~cpu_vialle/DCE-Spark/template_common_friends.py .

Exercise

Exercise 2.1

  • Write an implementation that uses a groupByKey.

  • Write an implementation that uses a reduceByKey.

  • Test both implementations on file sn_tiny.csv.

2.2 Tests and performance measures

Exercise

Exercise 2.2

  • Run both implementations on the other files.

  • Fill in a table where you indicate: the name and size of each file and the measured running times of both implementations.

2.3 Minimum, maximum and average degree

Exercise

Exercise 2.3

  1. Add a function to file template_common_friends.py that returns a tuple containing the minimum, the maximum and the average degree of a node in the social network. You must use RDDs to do so, don’t try to collect() the content of the RDDs and so compute the values on Python lists.

  2. Execute the function for all the given input files.

  3. Complete the table that you created in the previous exercise by adding the minimum, maximum and average number of friends.

2.4 Performance analysis

Exercise

Exercise 2.4

  • We suppose that each node has a number of friends that is equal to the average number of friends. Compute (with pencil or paper, no need to write a code for that) the number of intermediate pairs \(((A, B), X)\) generated by your code.

  • Complete the table by writing down the number of intermediate pairs for each file.

  • Plot three graphs, where the y-axis has the program running times and the x-axis has: the number of intermediate pairs, the average degree and the file size respectively.

  • Which graphs best predict the evolution of the computational time?

3 Average and standard deviation

We use the same files as in the first question. Our objective is to write a Spark program that produces triples \((y, t_{\mu}, t_{\sigma})\), where \(y\), \(t_{\mu}\) and \(t_{\sigma}\) are the year, the average temperature in the year and the standard deviation respectively.

We can express the standard deviation of \(n\) values \(x_1 \ldots x_n\) with the following formula:

\[ \sigma = \sqrt{\overline{x^2} - \overline{x}^2} = \sqrt{\frac{\sum_{i=1}^n (x_i)^2}{n} - \Bigg(\frac{\sum_{i=1}^n x_i}{n}\Bigg)^2} \]

Type the following command to get a code template:

cp ~cpu_vialle/DCE-Spark/template_temperatures.py ./avg_stddev_temp.py

Exercise

Exercise 3.1

  • Define a new function avg_stddev_temperature in file avg_stddev_temp.py.

  • Execute the script and observe the results.