Colaborative Filtering : Hyperparameter Tuning Alternating Least Squares Algorithm
Collaborative filtering is commonly used for recommender systems. This can be formulated as a learning problem in which we are given the ratings that users have given certain items and are tasked with predicting their ratings for the rest of the items. A user-item matrix can be constructed as follows: Given n users and m items, an n . m matrix A in which the (u; i)th entry is r_ui { the rating for item i by user u}. The matrix A has many unobserved ratings which is represented as missing ratings.
The following packages are required for analysis in this post.They are loaded below.
%matplotlib inline
import pandas as pd
import gzip
import matplotlib.pyplot as plt
import os, random, time, shutil, requests
import numpy as np
# Hide warnings if there are any
import warnings
warnings.filterwarnings('ignore')
import seaborn as sns; sns.set()
from datetime import datetime, timedelta
from dateutil.parser import parse
from scipy import stats
from ast import literal_eval
import warnings; warnings.simplefilter('ignore')
import multiprocessing
from pyspark.sql.types import IntegerType, FloatType, LongType, DateType, ArrayType
from pyspark.sql.window import Window
from pyspark.sql.types import *
import datetime
import pyarrow as pa
from pyspark import SparkConf, Row
from pyspark.sql.functions import desc
from pyspark.sql.functions import col
from pyspark.sql.functions import *
import glob
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
pd.set_option('display.max_rows',500)
pd.set_option('display.max_columns',500)
pd.set_option('display.width',1000)
pd.set_option('display.max_columns',500)
path='/PythonRecommenderSystem'
# Check current working directory.
retval = os.getcwd()
print ("Current working directory %s" % retval)
# Now change the directory
os.chdir( path )
# Check current working directory.
retval = os.getcwd()
print("Directory changed successfully %s" % retval)
multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-1)
import multiprocessing as mp
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
Current working directory /SparkPython2
Directory changed successfully /PythonRecommenderSystem
The Digital Music review dataset from the Amazon product dataset used in this work can be found here .This dataset contains didgital music reviews and metadata from Amazon, including 142.8 million reviews spanning May 1996 - July 2014. This dataset includes reviews (ratings, text, helpfulness votes), product metadata (descriptions, category information, price, brand, and image features), and links (also viewed/also bought graphs).The discription of the dataset is as follows:
- reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B
- asin - ID of the product, e.g. 0000013714
- reviewerName - name of the reviewer
- helpful - helpfulness rating of the review, e.g. 2/3
- reviewText - text of the review
- overall - rating of the product
- summary - summary of the review
- unixReviewTime - time of the review (unix time)
- reviewTime - time of the review (raw)
Import the data downloaded from the amazon site as below.
import json
Digital_Music = []
for line in open('reviews_Digital_Music.json', 'r'):
Digital_Music.append(json.loads(line))
df=pd.DataFrame(Digital_Music)
#we will randomly sample about 20% of our available data to
#reduce the computational time and cost
df=df.sample(frac=0.2, replace=False, random_state=1)
df.head(3)
asin | helpful | overall | reviewText | reviewTime | reviewerID | reviewerName | summary | unixReviewTime | |
---|---|---|---|---|---|---|---|---|---|
82875 | B00000581T | [2, 5] | 5.0 | This album is outstanding, from beginning to e... | 11 11, 2000 | ASSY7Q8GQCQC4 | HM | Outstanding | 973900800 |
612792 | B005X5LADS | [0, 0] | 5.0 | I heard a selection from this cd on Pandora an... | 08 22, 2013 | A33URADTU6RTSY | J. Malone | Truly relaxing music -- | 1377129600 |
573729 | B004UP3T2C | [0, 0] | 5.0 | All time hit! Takes you right back to a summer... | 11 10, 2012 | A1SNCP893TLCWT | GhostDust1863 | great song | 1352505600 |
check the class of each column variable
df.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 167201 entries, 82875 to 522564
Data columns (total 9 columns):
asin 167201 non-null object
helpful 167201 non-null object
overall 167201 non-null float64
reviewText 167201 non-null object
reviewTime 167201 non-null object
reviewerID 167201 non-null object
reviewerName 166737 non-null object
summary 167201 non-null object
unixReviewTime 167201 non-null int64
dtypes: float64(1), int64(1), object(7)
memory usage: 12.8+ MB
Convert the aisin column from string to numeric.
#df["asin"] = df["asin"].convert_objects(convert_numeric=True)
#df['asin']= df["asin"].astype(str).astype(float)
The summary statistics for the numeric columns from the dataset is shown below:
df.describe()
overall | unixReviewTime | |
---|---|---|
count | 167201.000000 | 1.672010e+05 |
mean | 4.539823 | 1.296173e+09 |
std | 0.960933 | 1.266856e+08 |
min | 1.000000 | 8.931168e+08 |
25% | 4.000000 | 1.245197e+09 |
50% | 5.000000 | 1.358294e+09 |
75% | 5.000000 | 1.381104e+09 |
max | 5.000000 | 1.406074e+09 |
df.shape
(167201, 9)
The columns which are to build the reccomender system are the reviewer/user ID, the ID of the product aisin and the item rating (overall). We can rename some of the columns to a lot recognizable and select the columns which are needed to build our reccomender system.
df.isnull().sum()
asin 0
helpful 0
overall 0
reviewText 0
reviewTime 0
reviewerID 0
reviewerName 464
summary 0
unixReviewTime 0
dtype: int64
df.rename(columns={'reviewerID': 'UserID', 'overall': 'rating','asin':'ItemID'}, inplace=True)
df.head(3)
ItemID | helpful | rating | reviewText | reviewTime | UserID | reviewerName | summary | unixReviewTime | |
---|---|---|---|---|---|---|---|---|---|
82875 | B00000581T | [2, 5] | 5.0 | This album is outstanding, from beginning to e... | 11 11, 2000 | ASSY7Q8GQCQC4 | HM | Outstanding | 973900800 |
612792 | B005X5LADS | [0, 0] | 5.0 | I heard a selection from this cd on Pandora an... | 08 22, 2013 | A33URADTU6RTSY | J. Malone | Truly relaxing music -- | 1377129600 |
573729 | B004UP3T2C | [0, 0] | 5.0 | All time hit! Takes you right back to a summer... | 11 10, 2012 | A1SNCP893TLCWT | GhostDust1863 | great song | 1352505600 |
These three columns are what is of interest in our collaborative filtering task.
dff=df[['UserID', 'ItemID', 'rating']]
dff.head()
UserID | ItemID | rating | |
---|---|---|---|
82875 | ASSY7Q8GQCQC4 | B00000581T | 5.0 |
612792 | A33URADTU6RTSY | B005X5LADS | 5.0 |
573729 | A1SNCP893TLCWT | B004UP3T2C | 5.0 |
470228 | A2WL6QTD4CZWD2 | B002P9WOMQ | 5.0 |
261494 | A11J6JC55R633N | B000TEEA1M | 5.0 |
We create a SparkSession then import the recommendation system and evaluation algorithms. SparkSessions were introduced in Spark 2.0.The SparkContext is no longer needed to start a session since the sparkSession creates a SparkContext,SpankConf and SparkContext.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ALSRecommenderSystem').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
We can create a Spark DataFrame below by converting the pandas dataframe to spark dataframe, alternative we could have imported the data straight as a Spark DataFrame with spark.read.csv(‘/file path’, inferSchema=True, header=True)
DF=spark.createDataFrame(dff)
DF.show(4)
+--------------+----------+------+
| UserID| ItemID|rating|
+--------------+----------+------+
| ASSY7Q8GQCQC4|B00000581T| 5.0|
|A33URADTU6RTSY|B005X5LADS| 5.0|
|A1SNCP893TLCWT|B004UP3T2C| 5.0|
|A2WL6QTD4CZWD2|B002P9WOMQ| 5.0|
+--------------+----------+------+
only showing top 4 rows
DF.describe() function produces the summary statistics from the data. There are about 157836 observations. The mean rating is about 4.3. The rating ranges from 1 to 5. The toPandas() function converts the summary statistic output into a pandas dataframe which is pretty.
DF.describe().toPandas()
summary | UserID | ItemID | rating | |
---|---|---|---|---|
0 | count | 167201 | 167201 | 167201 |
1 | mean | None | 5.963927389565217E9 | 4.539823326415512 |
2 | stddev | None | 9.090571950387818E8 | 0.9609327057388072 |
3 | min | A00167062N853P9T0TEJ5 | 5555991584 | 1.0 |
4 | max | AZZZF6NKJ4QYV | B00LWJDZHI | 5.0 |
DF.printSchema()
root
|-- UserID: string (nullable = true)
|-- ItemID: string (nullable = true)
|-- rating: double (nullable = true)
DF.select('UserID').distinct().show(5)
+--------------+
| UserID|
+--------------+
|A3A7F8AQMAVFGN|
|A1Z6W412RO5156|
|A14B6M7JU3JSMZ|
|A2Z9L0JK0XRS61|
|A33RII6U1H9EV9|
+--------------+
only showing top 5 rows
from pyspark.sql.functions import countDistinct, avg, stddev
DF.select(countDistinct('UserID')).show()
+----------------------+
|count(DISTINCT UserID)|
+----------------------+
| 130379|
+----------------------+
Convert string type columns to numeric
The ML library accepts only numeric input
#DF=DF.withColumn("itemId", DF["ItemID"].cast("int"))
#DF=DF.withColumn("userId", DF["userId"].cast("int"))
DF.printSchema()
root
|-- UserID: string (nullable = true)
|-- ItemID: string (nullable = true)
|-- rating: double (nullable = true)
convert categorical data to numerical data in Pyspark
Since the pyspark ML accepts only numeric input, we shall convert the UserID from string type to numeric. Each UserID will be converted to a unique numeric ID number.The Stringindexer function in Spark ML can be used for this converting many columns from string to numeric at the same time.
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
#indexer = StringIndexer(inputCol="UserID", outputCol="moduserID")
#DFF = indexer.fit(DF).transform(DF)
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(DF) for column in list(set(DF.columns)-set(['rating']))]
pipeline = Pipeline(stages=indexers)
DFF = pipeline.fit(DF).transform(DF)
DFF.show(5)
+--------------+----------+------+------------+------------+
| UserID| ItemID|rating|UserID_index|ItemID_index|
+--------------+----------+------+------------+------------+
| ASSY7Q8GQCQC4|B00000581T| 5.0| 45352.0| 8956.0|
|A33URADTU6RTSY|B005X5LADS| 5.0| 107082.0| 6347.0|
|A1SNCP893TLCWT|B004UP3T2C| 5.0| 127045.0| 5391.0|
|A2WL6QTD4CZWD2|B002P9WOMQ| 5.0| 87818.0| 31360.0|
|A11J6JC55R633N|B000TEEA1M| 5.0| 35016.0| 60263.0|
+--------------+----------+------+------------+------------+
only showing top 5 rows
#DFF=DFF.withColumn("moduserID", DFF["moduserID"].cast("int"))
#DFF=DFF.withColumn("itemId", DFF["itemId"].cast("int"))
DFF.printSchema()
root
|-- UserID: string (nullable = true)
|-- ItemID: string (nullable = true)
|-- rating: double (nullable = true)
|-- UserID_index: double (nullable = false)
|-- ItemID_index: double (nullable = false)
Select the three columns for our analysis and rename the columns with farmiliar column names
#DFF = DFF.drop('UserID')
#DFF= DFF.select(['moduserID', 'itemId', 'rating'])
DFF = DFF.select(col("UserID_index").alias("userId"),col("ItemID_index").alias("itemId"),col("rating"))
Training the Model and Making Predictions
To train the model and make predictions, we need a training and an evaluation set. Here the training set is 80% of randomly selected samples and the rest are for evaluation.
Note that because the split is random and there is no random state parameter as in Scikit-Learn these results will not be exactly reproducible.
DFF.show(3)
+--------+------+------+
| userId|itemId|rating|
+--------+------+------+
| 45352.0|8956.0| 5.0|
|107082.0|6347.0| 5.0|
|127045.0|5391.0| 5.0|
+--------+------+------+
only showing top 3 rows
Check for missing values
We can check for which columns have missing values in the spark dataframe as shown below. Handling missing data in machine learning is an important subject. The basic approach include deleting ,mean and median imputation for continuous and categorical variables respectively.
from pyspark.sql.functions import col,sum
DFF.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in DFF.columns)).show()
+------+------+------+
|userId|itemId|rating|
+------+------+------+
| 0| 0| 0|
+------+------+------+
Equivalently,
from pyspark.sql.functions import lit
rows = DFF.count()
summary = DFF.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in DFF.columns)).show()
+------+------+------+
|userId|itemId|rating|
+------+------+------+
| 0.0| 0.0| 0.0|
+------+------+------+
Suppose the column itemId contained missing observations and we wanted to delete the rows with NA values based on this column, we would as follows :
DFF.na.drop(subset=["itemId"])
DataFrame[userId: double, itemId: double, rating: double]
DFF.describe().toPandas()
summary | userId | itemId | rating | |
---|---|---|---|---|
0 | count | 167201 | 167201 | 167201 |
1 | mean | 51996.47661198198 | 21262.852022416133 | 4.539823326415512 |
2 | stddev | 41561.27675663137 | 24994.03017898536 | 0.9609327057388072 |
3 | min | 0.0 | 0.0 | 1.0 |
4 | max | 130378.0 | 81077.0 | 5.0 |
The following line creates the alternating least squares model. Then the model is fitted with the training data and predictions are made on the test set. The alternating least squares is similar to regular least squares. Inalternating least squares, the objective function is minimized by holding other varaibles constant except one at each time. By holding all other variables constant except one, the minimization reduces to something similar to least squares.
Hyperparameter Tuning
The following parameters in the ALS are to be tuned. The paper Large- Scale Parallel Colaborative Filtering for the Netflix Price is available on this link here. For a detailed explanation of the model, reading through the paper would be appropriate.
numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
rank is the number of latent factors in the model (defaults to 10).
maxIter is the maximum number of iterations to run (defaults to 10).
regParam specifies the regularization parameter in ALS (defaults to 1.0).
implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
Explicit vs. implicit feedback
The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item, for example, users giving ratings to a product. Explicit preferencces are always not available.Inplicit preferences which can be inferred from numbers representing the strength in observations of user actions can be used instead.Example of inplicit feedback/preferences include views, clicks, purchases, likes, shares etc.
Scaling of the regularization parameter
The regularization parameter regParam is scaled in solving each least squares problem by the number of ratings the user generated in updating user factors, or the number of ratings the product received in updating product factors. This makes regParam less dependent on the scale of the dataset, allowing the best parameter learned from a sampled subset to be applied to the full dataset and expect similar performance.
Cold-start strategy
The cold-start problem arises in colaborative filtering prediction when we encouter users/items with no prior rating history in our training set. The default behavior of the spark ALS model is to assign NaN values as predictions in such situation.Spark allows users to set the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values. The evaluation metric will then be computed over the non-NaN data and will be valid.
Two standardizing Approaches
We demonstrate two standardization approaches, the min-max and normalization. The normalization approach is what we apply on the target variable ratings.
mean_rating, sttdev_rating = DFF.select(mean("rating"), stddev("rating")).first()
max_rating,min_rating = DFF.select(max("rating"), min("rating")).first()
DFF=DFF.withColumn("rating_Normalized", (col("rating") - mean_rating) / sttdev_rating)
DFF=DFF.withColumn("rating_minmax", (col("rating") - min_rating) /(max_rating-min_rating))
DFF.show(5)
+--------+-------+------+------------------+-------------+
| userId| itemId|rating| rating_Normalized|rating_minmax|
+--------+-------+------+------------------+-------------+
| 45352.0| 8956.0| 5.0|0.4788854316605702| 1.0|
|107082.0| 6347.0| 5.0|0.4788854316605702| 1.0|
|127045.0| 5391.0| 5.0|0.4788854316605702| 1.0|
| 87818.0|31360.0| 5.0|0.4788854316605702| 1.0|
| 35016.0|60263.0| 5.0|0.4788854316605702| 1.0|
+--------+-------+------+------------------+-------------+
only showing top 5 rows
DFF.select(max("rating")).first()[0]
5.0
train, test = DFF.randomSplit([0.8, 0.2], seed=12345)
ALSExplicit = ALS( implicitPrefs=False, userCol="userId", itemCol="itemId", ratingCol="rating_minmax",
coldStartStrategy="drop")
defaultModel = ALSExplicit.fit(train)
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 2 values for rank and 2 values for regParam, 2 values for maxIter,2 values for alpha
# this grid will have 2 x 2 x 2 x 2 = 16 parameter settings for CrossValidator to choose from.
paramMapExplicit = ParamGridBuilder() \
.addGrid(ALSExplicit.rank, [ 8, 12]) \
.addGrid(ALSExplicit.maxIter, [5,10]) \
.addGrid(ALSExplicit.regParam, [0.01,0.001]) \
.addGrid(ALSExplicit.alpha, [2.0,3.0]) \
.build()
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating_minmax")
# Run cross-validation, and choose the best set of parameters.
CVALSExplicit = CrossValidator(estimator=ALSExplicit,
estimatorParamMaps=paramMapExplicit,
evaluator=evaluatorR,
numFolds=5)
CVModelEXplicit = CVALSExplicit.fit(train)
# Make predictions on test documents. cvModel uses the best model found (cvModelInplicit).
predsExplicit = CVModelEXplicit.bestModel.transform(test)
predictions=predsExplicit.withColumn("predictnew", ((max_rating-min_rating)*col("prediction") + min_rating) )
predictions =predictions.select([c for c in predictions.columns if c not in {'rating_Normalized','rating_minmax','prediction'}])
predictions.show(5)
+------+------+------+-------------------+
|userId|itemId|rating| predictnew|
+------+------+------+-------------------+
|1562.0| 148.0| 4.0| 1.3738184571266174|
| 357.0| 148.0| 5.0| 1.4132280349731445|
|5678.0| 463.0| 4.0| 1.5864256620407104|
|2144.0| 463.0| 4.0| 1.6909311413764954|
|3369.0| 471.0| 5.0| 2.3148409128189087|
+------+------+------+-------------------+
only showing top 5 rows
We convert back normalized predictions back to its original interval between 1 and 5. The standardization to mean of 0 and standard deviation of 1 narrows the prediction prediction interval and thereby improves the quality of the predictions from the model.
# Evaluate the model by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="predictnew")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error is {}".format(rmse))
Root-mean-square error is 3.7312300283404176
# Generate top 10 digital music recommendations for each user
userRecs = CVModelEXplicit.bestModel.recommendForAllUsers(10)
# Generate top 10 user recommendations for each digital music
DigitalMusicRecs = CVModelEXplicit.bestModel.recommendForAllItems(10)
userRecs.show(10)
+------+--------------------+
|userId| recommendations|
+------+--------------------+
| 148|[[2788, 1.3300879...|
| 463|[[1289, 1.3111275...|
| 471|[[3575, 1.153979]...|
| 496|[[2079, 1.925576]...|
| 833|[[1572, 1.4387902...|
| 1088|[[1457, 1.3470203...|
| 1238|[[3472, 1.164058]...|
| 1342|[[1422, 1.4752936...|
| 1580|[[3077, 1.4021921...|
| 1591|[[1001, 1.54133],...|
+------+--------------------+
only showing top 10 rows
Single Prediction
A prediction for a single userId and itemId can be generated as follows:
d = {'userId': [0], 'itemId': [2875]}
df = pd.DataFrame(data=d)
df2=spark.createDataFrame(df)
single_pred=CVModelEXplicit.bestModel.transform(df2)
single_pred.withColumn("prediction", (sttdev_rating*col("prediction") + mean_rating) ).show()
+------+------+-----------------+
|userId|itemId| prediction|
+------+------+-----------------+
| 0| 2875|4.540941984998424|
+------+------+-----------------+
Save Models
#save to folder models which is created in the writing process
#folder models must not already be present
path="/PythonRecommenderSystem/models"
model = CVModelEXplicit
model.write().overwrite().save(path)
#load the model
sameModel = CVModelEXplicit.load(path)
sameModel
CrossValidatorModel_0ed894fbc20d
#stop the spark cluster here
pool.close()
spark.stop()