Spark MLlib
In this tutorial, you’ll learn how to use the Spark MLlib library to train, test and evaluate machine learning models. We’ll be using a dataset of AirBnb accommodation in the San Francisco area. The dataset is available in HDFS at the following path:
hdfs://sar01:9000/prof/cpu_quercini/ml/sf-airbnb-clean.parquet
Dataset source
The dataset has been obtained from this GitHub repository.
The goal of the exercise is to predict the price per night of an apartment given all the features in the dataset.
Changing Python interpreter of the executors
The default Python interpreter used by the executors in the cluster does not have the package numpy
installed.
In order to use one that has numpy
, you need to run the following command in the terminal:
export PYSPARK_PYTHON=/usr/bin/python3
1 Obtaining a training and test set
In order to build and evaluate a machine learning model, we need to split our dataset into a training and test set.
Activity
- Copy the file
train_test.py
to your home folder in the cluster by typing the following command:
cp /usr/users/prof/cpu_quercini/mllib/train_test.py .
- Complete Line 9, by specifying your directory in HDFS, which is as follows (replace X with your account number).
hdfs://sar01:9000/bdiaspark2024/bdiaspark2024_X
Exercise
Exercise 1.1
Complete Line 23. Write the code to read the input file, which is stored in HDFS as a Parquet file, into the DataFrame
airbnb_df
.From Line 26, add the instructions necessary to print the schema of
airbnb_df
and the first 5 rows. Which option should you use to nicely see the values in the columns?Execute the code with
spark-submit
and verify that the data is correctly loaded.
We now split airbnb_df
into two separate DataFrames, train_df
and test_df
, containing the training and
test instances respectively.
Exercise
Exercise 1.2
Uncomment Line 32. Write the code to split the dataset into a training and test set. 80% of the instances must be taken as training instances, while the remaining 20% will be used a test instances. Looking at the DataFrame API documentation, which function are you going to use?
From Line 36, write the code to print the number of instances in the training and test set.
Execute the code with
spark-submit
. You should have 5780 training instances and 1366 test instances.
It is time to save the training and test sets to a HDFS file. This way, we can load them whenever we need them to build a machine learning model for this problem.
Exercise
Exercise 1.3
From Line 47. Write the code to write the training and test sets to two Parquet files in HDFS.
The paths to the two files are already available in variables training_set_file
and test_set_file
defined at Lines 40 and 43 respectively.
2 Preparing the features
In both training and test sets, the features correspond to DataFrame columns. Most of the machine learning algorithms in Spark need to have all features in one single vector. We need to use a transformer.
Good to know
Transformers take in a DataFrame and return a new DataFrame that has the same columns as the input DataFrame and additional columns that are specified as an argument to the transformer.
Copy the file train_test_model
to your home folder in the cluster by typing the following command:
cp /usr/users/prof/cpu_quercini/mllib/train_test_model.py .
Exercise
Exercise 2.1
Implement the function
read_training_set
at Line 22. The file reads into a Spark DataFrame the training set Parquet file that you generated in the previous section.Implement the function
read_test_function
at Line 39. The file reads into a Spark DataFrame the test set Parquet file that you generated in the previous section.Modify the variables at Line 105 and 106 so that they point to the two HDFS Parquet files that contain the training and test sets.
Uncomment Lines 109 and 110, so as to print the number of training and test instances.
Execute the file
train_test_model
withspark-submit
. Check that the number of training and test instances correspond to what you got in the first section.
It is now time to learn how to use a transformer to put all the necessary features into a vector.
For the time being, we’ll only be using the feature bedrooms
to predict the price.
Exercise
Exercise 2.2
Implement the function
get_vector
at Line 55. The comments in the file describe what the function is supposed to do. You’ll need to use a VectorAssembler object to implement the function.Uncomment Lines 117 and 118 to call the function
get_vector
and display the result.Execute the file
train_test_model
withspark-submit
. Observe the content of the the DataFrametrain_vect
. It should have a column namedfeatures
containing a list with one value (the value of featurebedrooms
).
It is now time to train a linear regression model on the given training set and the selected features.
Exercise
Exercise 2.3
Implement the function
train_linear_regression_model
at Line 79. The comments in the file describe what the function is supposed to do. Looking at the documentation, try to identify the object that you need to use to create a linear regressor and the function that you need to invoke to train the linear regressor.Uncomment Lines 121, 124, 125, 126 to call the function
train_linear_regression_model
and display the coefficients learned by the linear regression model.Execute the file
train_test_model
withspark-submit
.
We can now use the trained model to make some predictions.
The model returned by the function that you implemented in the previous exercise
is an object of type LinearRegressionModel
.
Looking at the documentation, we learn that there is a function predict
that allows us to
make a prediction given a single instance.
If we want to make a prediction on the whole test set, we should use the function transform
.
The function takes in a DataFrame with the test set and returns the same DataFrame with an additional column prediction
that contains the predicted value.
Exercise
Exercise 2.4
Look at Lines 130 and 131. Which DataFrame do we need to pass the function
transform
? Istest_df
a good choice? Why?Complete line 130 and uncomment lines 130, 131, 132.
Execute the file
train_test_model
withspark-submit
and observe the result. Is the predicted value the one that you expect given that you know the formula of the regression line?
3 Pipelines
In the previous section we learned that a training and test set need to go through the same transformation steps in order to be fed to a machine learning model. When we have few transformations, it is easy to remember the ones that we applied to a training set, so as to apply them to the test set too. However, when we need to apply a series of transformations, the order of which is important, it is easy to make mistakes (applying different sets of transformations to the training and test instances, which leads to meaningless predictions).
A good practice is to use the Pipeline API.
A pipeline is composed of stages. Each stage may be a transformer or an estimator.
A transformer is an object on which we call the function transform
to obtain a new DataFrame from an input DataFrame.
An estimator is an object on which we call the function fit
to learn a model on a given DataFrame. The learned model is itself
a transformer.
When the function fit
is called on a Pipeline, the training set goes through all the transformers and the estimators in the order in which they are declared in the pipeline;
at last, the estimator specified in the last stage is trained on the training set.
The model returned by applying the function fit
on the pipeline is itself a transformer.
If we invoke the function transform
on that model on the test set, we obtain a DataFrame that contains a column named predictions
.
Implicitly, all the transformations in the pipeline will be applied to the test set too, before making the predictions.
Copy the file pipeline_example.py
to your home folder in the cluster by typing the following command:
cp /usr/users/prof/cpu_quercini/mllib/pipeline_example.py .
Exercise
Exercise 3.1
Look at the documentation and
write a code from Line 34 to create a pipeline that does the same operations as in file train_test_model.py
to train and test a linear regression model on the given training and test sets.
- Don’t forget to specify the paths to the training and test set files on HDFS at Lines 27 and 28.
3.1 From categorical to numerical features
Many machine learning models do not handle categorical values: they need all features to be numerical. Linear regression is such an example.
Activity
- Copy the file
onehot_playground.py
to your home directory in the cluster by typing the following command:
cp /usr/users/prof/cpu_quercini/mllib/onehot_playground.py .
- Change lines 27 and 28 and write the paths to the files with the training and test sets.
First, let’s find out which features are categorical in our dataset.
Exercise
Exercise 3.2
Complete the function get_categorical_columns
.
In order to get an idea as to how to get the categorical columns, execute the code with
spark_submit
. This execute the instruction at Line 86.Once you’re done with the implementation, execute the file with
spark-submit
and observe the output.
One example of categorical feature in our dataset is property_type
, which takes values such as Apartment, House, Condominium…
Each value of a categorical feature is also referred to as a category.
One way to turn this feature into a numerical one would be to assign a number to each category (e.g., Apartment corresponds to 0, House to 1….). However, this implicitly introduces an order among the categories: the category House would be worth twice as much as the category Apartment; this would inevitably bias the trained model.
A commonly used method is one-hot encoding. Let’s find out how it works.
Let’s focus only on the feature property_type
.
Activity
Uncomment Lines 40, 41 and 42. These lines instantiate an estimator that is called
StringIndexer
. This estimator associates numeric indexes to the values ofproperty_type
. The indexes will be stored in another column namedproperty_type_index
.Uncomment Line 46. The
StringIndexer
is applied to the training set to learn how to associate indexes to the values ofproperty_type
. The result of the instruction is a new transformer.Uncomment Line 49. The transformer learned on Line 46 is used to transform the training set into a new DataFrame. This DataFrame contains an additional column called
property_type_index
.
Exercise
Exercise 3.3
Complete the function count_property_types
.
Follow the instructions in the file.
Once the function is complete, uncomment Line 53 to call the function.
Execute the file with
spark-submit
and observe the output. Can you guess how the indexes are assigned to the categories of the featureproperty_type
?
It is time to find out how one-hot encoding works.
Activity
Uncomment Lines 57, 58, 61, 64, 67. These lines instantiate an estimator that is called
OneHotEncoder
. This estimator uses the indexes inproperty_type_index
and creates a new columnproperty_type_ohe
.The estimator is trained on the training set and a new transformer is obtained (line 61).
The transformer is used on the training set to transform it into a new DataFrame, where a new column
property_type_ohe
exists.Line 67 prints selected columns of this transformed training set.
Exercise
Exercise 3.4
Execute the file with
spark-submit
.Can you understand how one-hot encoding works?
Now, you have all the ingredients to create a full pipeline to train and test a linear regression model by using all features.
Exercise
Exercise 3.5
Create a new file full_pipeline.py
where:
You select the categorical features.
Set up a
StringIndexer
and aOneHotEncoder
to apply one-hot encoding to all categorical features.Obtain the numeric features.
Set up a
VectorAssembler
to put into a single vector the one-hot encoded features and the numerical ones.Set up a linear regression model that takes in all features and the variable to predict (
price
).Mix all these ingredients in a
Pipeline
.Use the
Pipeline
to train and test the model.
4 Evaluating a model
In the previous sections we visually looked at the predictions to get an vague idea of how our estimator performs. In order to quantify the quality of our estimator, we need some evaluation measures.
Exercise
Exercise 4.1 Look at the documentation and play with the code to do model selection based on a grid search of parameters.