MapReduce
1 Computing averages
We are given a dataset that contains the average monthly temperature measurements over the course of some years. More precisely, the dataset is stored in a CSV file, where each row corresponds to a monthly measurement and the columns contain the following values: year, month, average temperature in the month.
1980,1,5
1980,2,2
1980,3,10
1980,4,14
1980,5,17
....
1981,1,2
1981,2,1
1981,3,3
1981,4,10
....
We intend to get the average monthly temperature for each year.
Exercise
Exercise 1.1 Write a MapReduce algorithm that generates key-value pairs \((year, average\_temperature)\).
Solution
def map(line):
values = line.split(",")
return (values[0], float(values[-1]))
def reduce(year, temperatures_in_year):
return (year, sum(temperatures_in_year) / len(temperatures_in_year))
Suppose now that we have a large CSV file stored in a distributed file system (e.g., HDFS), containing a series of measurements in the format “Year, Month, Day, Minute, Second, Temperature”. We can have up to one measurement per second in some years. Like before, we’d like to compute key-value pairs (year, average_temperature) by using a MapReduce algorithm.
Exercise
Exercise 1.2 What is the maximum number of measurements in a year?
Solution
Since we can have up to one measurement per second, the maximum number of measurements \(M_{max}\) for a certain year is given by the following formula:
\[ M_{max} = 365 \times 24 \times 60 \times 60 \approx 31.5 \times 10^6 \]
Exercise
Exercise 1.3 Considering the answer to the previous question, discuss the efficiency of the first implementation of the algorithm.
Solution
Since there might be up to 31 million values associated with a key, the bottleneck of the computation would be the shuffle operation, since we need to copy a high number of (key,value) pairs from the mappers to the reducers.
Also, a reducer might have to loop over a huge list of values in order to compute their average.
Exercise
Exercise 1.4 Based on the answer to the previous question, propose a better implementation to handle the CSV file.
Solution
Function map()
is the same as in the previous exercise.
def map(line):
values = line.split(",")
return (values[0], float(values[-1]))
Now we code the function combine()
which will apply some computation on key-value pairs returned by the Map task.
More precisely, after a Map task applies the function map()
on all
lines of a block, a local shuffle operation is executed to group all values associated with the same key.
The function combine()
takes in a couple, of which the key (a year)
is associated with the list of all its values (the year’s temperatures).
The function returns a couple, where the key still represents a year and the value is itself a couple;
the first element of this couple is the sum all of year’s temperatures,
the second element is the number of temperatures in the year.
Please note that all these considerations are applied to data within a single bloc.
Therefore, the shuffle and the combine did not move data across blocks.
The result is that from each block we send out to the reducers only a couple per year,
which significantly reduces the amount of data sent over the network.
def combine(year, temperatures_in_year):
return (year, (sum(temperatures_in_year), len(temperatures_in_year)))
The function reduce()
takes in a a couple,
where the key is a year and the value is a list of couples, as output by
the function combine()
.
def reduce(year, sum_len_tuples):
sum_temps = sum(s for (s, _) in sum_len_tuples)
nb_temps = sum(l for (_, l) in sum_len_tuples)
return (year, sum_temps/nb_temps)
2 Common friends in a social network
Consider a social network described by a graph encoded in a text file. Each line of the file is a list of identifiers separated by commas. For instance, the line \(A,B,C,D\) means that \(A\) is friend with \(B\), \(C\) and \(D\). An excerpt of the file looks like as follows:
A,B,C,D
B,A,D
C,A,D
D,A,B,C
...
We suppose that the friendship relation is symmetric: \((A, B)\) implies \((B, A)\).
We want to obtain the list of the common friends for each pair of individuals:
(A, B), [D]
(A, C), [D]
(A, D), [B, C]
(B, C), [D]
(B, D), [A]
(C, D), [A]
As an additional constraint, we want to represent a couple only once and avoid to represent the symmetric couple. In other words, if we output \((A, B)\), we don’t want to output \((B, A)\).
Exercise
Exercise 2.1 Propose a MapReduce implementation to find the common friends in a social network satisfying the given constraints.
Solution
The result of our algorithm is a collection of couples, where the first element represents a couple of individuals, and the second element is the common friends of the two individuals.
We infer that a couple of individuals is a proper choice for a key.
Now, the function map()
only takes in a single line.
The question we need to ask ourselves is: can we obtain a solution to our problem on
that single line? Otherwise stated, can we still get common friends between couple of individuals from
one single line?
Take the example of line: A,D,B,C. From this line, we know that A is a common friend of his friends : (B, D), (C, D) and (B, C).
This gives us an idea as to the implementation of the function map()
.
def map(line):
values = line.split(",")
return [((x, y), values[0]) for x in values[1:] for y in values[1:] if x < y]
Note that the condition x<y
guarantees that we never output summetrical couples.
The shuffle step will then associate the list of common friends to each couple of individuals.
This is exactly what we want, therefore there is no need for a
reduce()
function (identity function).
3 Computing average and standard deviation
We consider again the large CSV file with a series of measurements in the format “Year, Month, Day, Minute, Second, Temperature”. We now intend to generate a series of key-value pairs (year, (avg_temperature, std_deviation)).
We can express the standard deviation of \(n\) values \(x_i\) (\(1 \leq i \leq n\)) with two different equations.
The first equation is as follows:
\[ \sigma = \sqrt{\frac{\sum_{i=1}^n (x_i - \overline{x})^2}{n}} \]
The second equation is as follows:
\[ \sigma = \sqrt{\overline{x^2} - \overline{x}^2} = \sqrt{\frac{\sum_{i=1}^n (x_i)^2}{n} - \Bigg(\frac{\sum_{i=1}^n x_i}{n}\Bigg)^2} \]
Exercise
Exercise 3.1 Which equation of the standard deviation is more appropriate in a Map-Reduce algorithm? Why?
Solution
The second equation is more appropriate because it allows the computation of the sum of the elements and of the square of the elements step by step by using map and combine together.
Instead, if we use the first equation, we need first to compute the average and then use it to compute the variance.
Exercise
Solution
def map(line):
values = line.split(",")
return (values[0], float(values[-1]))
def combine(year, temperatures_in_year):
return (year, (sum(temperatures_in_year), sum(x**2 for x in temperatures_in_year), len(temperatures_in_year)))
def reduce(year, sum_len_tuples):
sum_temps = sum(s for (s, _, _) in sum_len_tuples)
sum_sq_temps = sum(sq for (_, sq, _) in sum_len_tuples)
nb_temps = sum(l for (_, _, l) in sum_len_tuples)
return (year, (sum_temps/nb_temps, math.sqrt(sum_sq_temps/nb_temps - (sum_temps/nb_temps)**2)))