Building Recommender System in Spark : Alternating Least Squares Algorithm

A common challenge for online companies such as in retail marketing is that of recommending new products to users in personalized settings (e.g. Amazon’s product recommender system, and ebay recom-mendations). Often not only is interest in recommending new products but also recommending products that have been purchased in the past by the shopper and adding a targeted discount coupon with the goal of increasing sales. A common approach to solving this problem is through matrix factorization, where given the ratings that users have given certain items and are tasked with predicting their ratings for the rest of the items.

This can be formulated as a learning problem in which we are given the ratings that users have given certain items and are tasked with predicting their ratings for the rest of the items. A user-item matrix can be constructed as follows: Given n users and m items, an n . m matrix A in which the (u; i)th entry is rui { the rating for item i by user u. The matrix A has many unobserved ratings which is represented as missing ratings.

We load the required packages below.

%matplotlib inline
import pandas as pd
import gzip
import os 
import matplotlib.pyplot as plt
import os, random, time, shutil, requests
import numpy as np
# Hide warnings if there are any
import warnings
warnings.filterwarnings('ignore')
import seaborn as sns; sns.set() 
from datetime import datetime, timedelta
from dateutil.parser import parse
from scipy import stats
from ast import literal_eval
import warnings; warnings.simplefilter('ignore')
from sklearn.model_selection import train_test_split
from numpy import linalg as LA
import multiprocessing
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
path='/Users/nanaakwasiabayieboateng/PythonRecommenderSystem'


# Check current working directory.
retval = os.getcwd()
print ("Current working directory %s" % retval)

# Now change the directory
os.chdir( path )

# Check current working directory.
retval = os.getcwd()

print("Directory changed successfully %s" % retval)

# list files in current directory
#files = os.listdir(path)
#print(files)
Current working directory /Users/nanaakwasiabayieboateng/SparkPython2
Directory changed successfully /Users/nanaakwasiabayieboateng/PythonRecommenderSystem

The Musical Instruments reviews dataset from the Amazon product dataset used in this work can be found here .This dataset contains product reviews and metadata from Amazon, including 142.8 million reviews spanning May 1996 - July 2014. This dataset includes reviews (ratings, text, helpfulness votes), product metadata (descriptions, category information, price, brand, and image features), and links (also viewed/also bought graphs).The discription of the dataset is as follows: reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B asin - ID of the product, e.g. 0000013714 reviewerName - name of the reviewer helpful - helpfulness rating of the review, e.g. 2/3 reviewText - text of the review overall - rating of the product summary - summary of the review unixReviewTime - time of the review (unix time) reviewTime - time of the review (raw)

Import the data downloaded from the amazon site as below.

import json

Musical_Instruments = []
for line in open('reviews_Musical_Instruments.json', 'r'):
    Musical_Instruments.append(json.loads(line))
df=pd.DataFrame(Musical_Instruments)
df.head(3)
asin helpful overall reviewText reviewTime reviewerID reviewerName summary unixReviewTime
0 0006428320 [0, 0] 3.0 The portfolio is fine except for the fact that... 03 11, 2014 A1YS9MDZP93857 John Taylor Parts missing 1394496000
1 0014072149 [0, 0] 5.0 If you are a serious violin student on a budge... 06 6, 2013 A3TS466QBAWB9D Silver Pencil Perform it with a friend, today! 1370476800
2 0041291905 [0, 0] 5.0 This is and excellent edition and perfectly tr... 10 14, 2013 A3BUDYITWUSIS7 joyce gabriel cornett Vivalldi's Four Seasons 1381708800

check the class of each column variable

df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500176 entries, 0 to 500175
Data columns (total 9 columns):
asin              500176 non-null object
helpful           500176 non-null object
overall           500176 non-null float64
reviewText        500176 non-null object
reviewTime        500176 non-null object
reviewerID        500176 non-null object
reviewerName      497590 non-null object
summary           500176 non-null object
unixReviewTime    500176 non-null int64
dtypes: float64(1), int64(1), object(7)
memory usage: 34.3+ MB
df.dtypes
asin               object
helpful            object
overall           float64
reviewText         object
reviewTime         object
reviewerID         object
reviewerName       object
summary            object
unixReviewTime      int64
dtype: object

Alternative if we convert the variable unixReviewTime which is in unix time format,thus the number of seconds since January 01 1970 (UTC) into a standard date format, we can easily use pandas to extract the day, month and year from the date.


#convert unix time in seconds to date-time
df["Date_Time"] = pd.to_datetime(df["unixReviewTime"], unit='s')
df['year'] = df['Date_Time'].dt.year
df['month'] = df['Date_Time'].dt.month
df['day'] = df['Date_Time'].dt.day
df.head(3)
asin helpful overall reviewText reviewTime reviewerID reviewerName summary unixReviewTime Date_Time year month day
0 0006428320 [0, 0] 3.0 The portfolio is fine except for the fact that... 03 11, 2014 A1YS9MDZP93857 John Taylor Parts missing 1394496000 2014-03-11 2014 3 11
1 0014072149 [0, 0] 5.0 If you are a serious violin student on a budge... 06 6, 2013 A3TS466QBAWB9D Silver Pencil Perform it with a friend, today! 1370476800 2013-06-06 2013 6 6
2 0041291905 [0, 0] 5.0 This is and excellent edition and perfectly tr... 10 14, 2013 A3BUDYITWUSIS7 joyce gabriel cornett Vivalldi's Four Seasons 1381708800 2013-10-14 2013 10 14

Convert the aisin column from string to numeric.

df["asin"] = df["asin"].convert_objects(convert_numeric=True)
#df['asin']=  df["asin"].astype(str).astype(float)

The summary statistics for the numeric columns from the dataset is shown below:

df.describe()
asin overall unixReviewTime year month day
count 2.114000e+03 500176.000000 5.001760e+05 500176.000000 500176.000000 500176.000000
mean 1.472931e+09 4.244350 1.344561e+09 2012.151987 6.001763 15.543731
std 1.336889e+09 1.203374 6.487853e+07 2.095034 3.623264 8.794805
min 6.428320e+06 1.000000 8.934624e+08 1998.000000 1.000000 1.000000
25% 7.678510e+08 4.000000 1.325030e+09 2011.000000 3.000000 8.000000
50% 1.417030e+09 5.000000 1.364342e+09 2013.000000 6.000000 15.000000
75% 1.417030e+09 5.000000 1.388189e+09 2013.000000 9.000000 23.000000
max 9.868239e+09 5.000000 1.406074e+09 2014.000000 12.000000 31.000000
df.shape
(500176, 13)

The columns which are to build the reccomender system are the reviewer/user ID, the ID of the product aisin and the item rating (overall). We can rename some of the columns to a lot recognizable and select the columns which are needed to build our reccomender system.

df.isnull().sum()
asin              498062
helpful                0
overall                0
reviewText             0
reviewTime             0
reviewerID             0
reviewerName        2586
summary                0
unixReviewTime         0
Date_Time              0
year                   0
month                  0
day                    0
dtype: int64
df.rename(columns={'reviewerID': 'UserID', 'overall': 'rating','asin':'ItemID'}, inplace=True)
df.head(3)
ItemID helpful rating reviewText reviewTime UserID reviewerName summary unixReviewTime Date_Time year month day
0 6428320.0 [0, 0] 3.0 The portfolio is fine except for the fact that... 03 11, 2014 A1YS9MDZP93857 John Taylor Parts missing 1394496000 2014-03-11 2014 3 11
1 14072149.0 [0, 0] 5.0 If you are a serious violin student on a budge... 06 6, 2013 A3TS466QBAWB9D Silver Pencil Perform it with a friend, today! 1370476800 2013-06-06 2013 6 6
2 41291905.0 [0, 0] 5.0 This is and excellent edition and perfectly tr... 10 14, 2013 A3BUDYITWUSIS7 joyce gabriel cornett Vivalldi's Four Seasons 1381708800 2013-10-14 2013 10 14
dff=df[['UserID', 'ItemID', 'rating']]
dff.head()
UserID ItemID rating
0 A1YS9MDZP93857 6428320.0 3.0
1 A3TS466QBAWB9D 14072149.0 5.0
2 A3BUDYITWUSIS7 41291905.0 5.0
3 A19K10Z0D2NTZK 41913574.0 5.0
4 A14X336IB4JD89 201891859.0 1.0

We create a SparkSession then import the recommendation system and evaluation algorithms. SparkSessions were introduced in Spark 2.0.The SparkContext is no longer needed to start a session since the sparkSession creates a SparkContext,SpankConf and SparkContext.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ALSRecommenderSystem').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

We can create a Spark DataFrame below by converting the pandas dataframe to spark dataframe, alternative we could have imported the data straight as a Spark DataFrame with spark.read.csv(‘/file path’, inferSchema=True, header=True)

DF=spark.createDataFrame(dff)
DF.show(4)
+--------------+-----------+------+
|        UserID|     ItemID|rating|
+--------------+-----------+------+
|A1YS9MDZP93857|  6428320.0|   3.0|
|A3TS466QBAWB9D|1.4072149E7|   5.0|
|A3BUDYITWUSIS7|4.1291905E7|   5.0|
|A19K10Z0D2NTZK|4.1913574E7|   5.0|
+--------------+-----------+------+
only showing top 4 rows

DF.describe() function produces the summary statistics from the data. There are about 157836 observations. The mean rating is about 4.3. The rating ranges from 1 to 5. The toPandas() function converts the summary statistic output into a pandas dataframe which is pretty.

DF.describe().toPandas()
summary UserID ItemID rating
0 count 500176 500176 500176
1 mean None NaN 4.244349988803941
2 stddev None NaN 1.2033741013944728
3 min A0002382258OFJJ2UYNTR 6428320.0 1.0
4 max AZZZTAPYKI9RD NaN 5.0
DF.printSchema()
root
 |-- UserID: string (nullable = true)
 |-- ItemID: double (nullable = true)
 |-- rating: double (nullable = true)
DF.select('UserID').distinct().show(5)
+--------------+
|        UserID|
+--------------+
| A1KP0BRI68RCN|
|A3W0PTHD219LXE|
|A165QTRGCJVCZZ|
|A140XH16IKR4B0|
| AT9557I2MAJKU|
+--------------+
only showing top 5 rows
from pyspark.sql.functions import countDistinct, avg, stddev

DF.select(countDistinct('UserID')).show()
+----------------------+
|count(DISTINCT UserID)|
+----------------------+
|                339231|
+----------------------+

Convert string type columns to numeric

The ML library accepts only numeric input

DF=DF.withColumn("itemId", DF["ItemID"].cast("int"))
#DF=DF.withColumn("userId", DF["userId"].cast("int"))
DF.printSchema()
root
 |-- UserID: string (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- rating: double (nullable = true)

convert categorical data to numerical data in Pyspark

Since the pyspark ML accepts only numeric input, we shall convert the UserID from string type to numeric. Each UserID will be converted to a unique numeric ID number.

from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="UserID", outputCol="moduserID")
DFF = indexer.fit(DF).transform(DF)
DFF=DFF.withColumn("moduserID", DFF["moduserID"].cast("int"))
DFF=DFF.withColumn("itemId", DFF["itemId"].cast("int"))
DFF.printSchema()
root
 |-- UserID: string (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- moduserID: integer (nullable = true)
DFF.show(4)
+--------------+--------+------+---------+
|        UserID|  itemId|rating|moduserID|
+--------------+--------+------+---------+
|A1YS9MDZP93857| 6428320|   3.0|     5092|
|A3TS466QBAWB9D|14072149|   5.0|    15198|
|A3BUDYITWUSIS7|41291905|   5.0|    50865|
|A19K10Z0D2NTZK|41913574|   5.0|   309908|
+--------------+--------+------+---------+
only showing top 4 rows

rename the columns with farmiliar column names

#DFF = DFF.drop('UserID')
DFF= DFF.select(['moduserID', 'itemId', 'rating'])
DFF = DFF.select(col("moduserID").alias("userId"),col("itemId").alias("itemId"),col("rating").alias("rating"))

Training the Model and Making Predictions

To train the model and make predictions, we need a training and an evaluation set. Here the training set is 80% of randomly selected samples and the rest are for evaluation.

Note that because the split is random and there is no random state parameter as in Scikit-Learn these results will not be exactly reproducible.

DFF.show(3)
+------+--------+------+
|userId|  itemId|rating|
+------+--------+------+
|  5092| 6428320|   3.0|
| 15198|14072149|   5.0|
| 50865|41291905|   5.0|
+------+--------+------+
only showing top 3 rows
DFF.printSchema()
root
 |-- userId: integer (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- rating: double (nullable = true)

Check for missing values

We can check for which columns have missing values in the spark dataframe as shown below. Handling missing data in machine learning is an important subject. The basic approach include deleting ,mean and median imputation for continuous and categorical variables respectively.

from pyspark.sql.functions import col,sum
DFF.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in DFF.columns)).show()
+------+------+------+
|userId|itemId|rating|
+------+------+------+
|     0|     0|     0|
+------+------+------+

Equivalently,

from pyspark.sql.functions import lit

rows = DFF.count()
summary = DFF.describe().filter(col("summary") == "count")
summary.select(*((lit(rows)-col(c)).alias(c) for c in DFF.columns)).show()
+------+------+------+
|userId|itemId|rating|
+------+------+------+
|   0.0|   0.0|   0.0|
+------+------+------+

Suppose the column itemId contained missing observations and we wanted to delete the rows with NA values based on this column, we would as follows :

DFF.na.drop(subset=["itemId"])
DataFrame[userId: int, itemId: int, rating: double]
DFF.describe().toPandas()
summary userId itemId rating
0 count 500176 500176 500176
1 mean 121011.79597581651 5211427.368528278 4.244349988803941
2 stddev 107725.35618017272 8.37320292264631E7 1.2033741013944728
3 min 0 0 1.0
4 max 339230 2147483647 5.0

Split the dataframe into 75% for training and 25% as the test set.

training, test = DFF.randomSplit([0.75,0.25])

The following line creates the alternating least squares model. Then the model is fitted with the training data and predictions are made on the test set. The alternating least squares is similar to regular least squares. Inalternating least squares, the objective function is minimized by holding other varaibles constant except one at each time. By holding all other variables constant except one, the minimization reduces to something similar to least squares.

als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='itemId', ratingCol='rating')

model = als.fit(training)

predictions = model.transform(test)

To prevent the mean squared error producing NaN we drop any NaN which may be present in the predictions .

predictions.describe().toPandas()
summary userId itemId rating rating_scaled prediction
0 count 51493 51493 51493 51493 51493
1 mean 20876.33086050531 1690542.568349096 4.351873070126037 0.08935133405110446 0.08461853259996296
2 stddev 20011.651527771326 4.695574783695837E7 1.0662019165643304 0.8860103564874956 0.7040405588438029
3 min 0 0 1.0 -2.696044384738196 -2.6432934
4 max 68314 2147483647 5.0 0.6279427239795259 0.62812227
predictions = predictions.na.drop()

Evaluation Add the mean subracted earlier during standardizing and add multiply the variance dividing to scale.


evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
rmse
1.436062196037978