This project is created to participate OutBrain Click Prediction competition hosted on Kaggle. The content of this project involves building a collaborative filtering system using Spark. The task in this competition is to predict the likelihood of user cliking certain ads or documents based on historical data.
- rank the recommendations in each group by decreasing predicted likelihood of being clicked
- OutBrain provides a relational dateset including details of historical pageview, documents, user behaviors, ads details.
- uuid: an unique id for each user.
- document_id: an unique id for each docuement(a web page with content, Ex:a news article), on each documents, a set of ads are displayed.
- ad_id: an unique id for each ad.
- clicked: (1 if clicked, 0 otherwise).
- campaign_id: an unique id for each compaign. Each ad belong to a campaign run by a advertiser(advertiser_id).
- display_id: each context is given a display_id. In each such set, the user clicked on at least one recommendation.
table name | attributes | details |
---|---|---|
page_views | uuid, document_id, timestamp ... | a log of users visiting docuemnts |
click_train | display_id, ad_id, clicked | training data |
click_test | display_id, ad_id | testing data |
promoted_content | ad_id, document_id, compaign_id, advertiser_id | details on every add |
docuements_meta | document_id,publisher_id,... | details on every document |
events | display_id, uuid, document_id,time_stamp... | informations on display_id context; convers both the train and test set |
- Spark is especially useful and fast in area of data science, the reason being, instead of storing intermediate data flows on hdfs in Hadoop, it stores intermediate result in memory via a new data model called resilient distributed datasets (RDD), do so prevent the reloading of data between iterations. In many data analytical algorithms, iterative methods are called with slightly changing parameters (Ex: gradient descent). Because of iterative nature of many data analytical and machine learning algorithms, Spark enhance the speed performance in big data applications.
-
Collaborative filtering, also known as recommendation engines, are widely used in the ecommerce space to generate recommendations to customers based on user-behavior data. The algorithm used for this job is known as User-based recommenders
Find users that are similar to the target user, and uses their collaborative ratings to make recommendations for the target user.
The idea about matrix factorization is to find latent features underlything the interactions between 2 different entities through find a linear algebra representation on the latent features. In this case, it is to find the underlying feartures shared by users who click same ads.
In this problem, there is a set of all users U and a set of all ads D. A rating matrixed R is a matrix with dimention of |U|*|D| where R(i,j) represents the binary indicator whether ith customer clicked on jth ads. The elements on R where the customers did not clicked the ads are filled with zeros. The goal is to factorize R into two submatrix P=|U|*k and Q =|D|*k so that P*QT approxiamates R . Here, K is the vector representation of the underlying latent features. The rows of P represents the association between a user and the latent features. The rows of Q represents the association between a item and the latent featuers. The ultimate goal here is to come up with a pair of P and Q so that their cross product approximates existing observations on users' behavior; meanwhile, the K offers the recommendation engine on estimation of latent features so that it can provide extra information on unobserved R(i,j).
The optimization process to P and Q could just start with any random k, and using any OR (brief for Operation Research) techniques such as stochatic gradient descent to come up with a relative optimal. Such process is called Alternative Least Square (ALS). The ALS method has already be implemented in the mllib from pyspark library.
Even though ALS has already be implemented in mllib it is worth to menth that the problem of opimizing K is a high-dimentional non-convex problem which means in many situations (especially for big datasets) the global optimal in not guarrenteed. In fact, the global optimal is almost never reached in practical.
It is very import to point out, as the complexity of K increases, it is very possible for the model to overfit. Just like some regression techniques like Lasso and Ridge, ALS could be penalized by the model's complexity. Therefore, the optimization target is a combination of (sum of error) and (regulating_prameter*K's complexity).
The provided csv file was first loaded into mysql database on hortonworks sandbox. All queries used are stored in the folder "queries". The uuid (user ids) are transfered from varchar(20) into int values because the default class "rating" from pyspark library only takes user_id as int. After those preparation are done, I used sqoop to import the data set into hdfs as following:
sqoop import --connect jdbc:mysql://sandbox.hortonworks.com:3306/spark_data --username root --table training_subset --target-dir /tmp/spark_data/training_subset --driver com.mysql.jdbc.Driver --as-textfile -m 1
Due to my current limited storage space for the virtual machine sandbox, I was not able to import all the training set but only a subset of the training set.
It is important to point out spark will not execute any transformations until an action is instructed. In my particular case, the data sheet is untouched for all "map" functions and it is only executed when the specific model "ALS" is called:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
conf = SparkConf().setAppName("py_rec").setMaster("local")
sc = SparkContext(conf = conf)
data = sc.textFile("hdfs:///tmp/spark_data/training_subset/part-m-00000")
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[4]), int(l[2]), float(l[3])))
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
After training the data, I reused the training set to test validity of the model and calculate the mean square error (MSE). The output MSE is very close to 0 which is very much expected since the definition of P*Q is supposed to approximate original R.
testdata = ratings.map(lambda p: (p[0],p[1]))
predictions = model.predictAll(testdata).map(lambda r:((r[0],r[1]),r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
To accomplish the competition, I was going resize the storage for the sandbox to fit all the training set in the model. I expect the training time will increase exponentially as the size of U and D increases.