I'm trying to tune the parameters of an ALS matrix factorization model that uses implicit data. For this, I'm trying to use pyspark.ml.tuning.CrossValidator to run through a parameter grid and select the best model. I believe my problem is in the evaluator, but I can't figure it out.
I can get this to work for an explicit data model with a regression RMSE evaluator, as follows:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import rand
conf = SparkConf()
.setAppName("MovieLensALS")
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])
alsExplicit = ALS()
defaultModel = alsExplicit.fit(dfRatings)
paramMapExplicit = ParamGridBuilder()
.addGrid(alsExplicit.rank, [8, 12])
.addGrid(alsExplicit.maxIter, [10, 15])
.addGrid(alsExplicit.regParam, [1.0, 10.0])
.build()
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")
cvExplicit = CrossValidator(estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR)
cvModelExplicit = cvExplicit.fit(dfRatings)
predsExplicit = cvModelExplicit.bestModel.transform(dfRatingsTest)
predsExplicit.show()
When I try to do this for implicit data (let's say counts of views rather than ratings), I get an error that I can't quite figure out. Here's the code (very similar to the above):
dfCounts = sqlContext.createDataFrame([(0,0,0), (0,1,12), (0,2,3), (1,0,5), (1,1,9), (1,2,0), (2,0,0), (2,1,11), (2,2,25)],
["user", "item", "rating"])
dfCountsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])
alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCounts)
paramMapImplicit = ParamGridBuilder()
.addGrid(alsImplicit.rank, [8, 12])
.addGrid(alsImplicit.maxIter, [10, 15])
.addGrid(alsImplicit.regParam, [1.0, 10.0])
.addGrid(alsImplicit.alpha, [2.0,3.0])
.build()
evaluatorB = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="rating")
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")
cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR)
cvModel = cv.fit(dfCounts)
predsImplicit = cvModel.bestModel.transform(dfCountsTest)
predsImplicit.show()
I tried doing this with an RMSE evaluator and I get an error. As I understand, I should also be able to use the AUC metric for the binary classification evaluator, because the predictions of the implicit matrix factorization are a confidence matrix c_ui for predictions of a binary matrix p_ui per this paper, which the documentation for pyspark ALS cites.
Using either evaluator gives me an error and I can't find any fruitful discussion about cross-validating implicit ALS models online. I'm looking through the CrossValidator source code to try to figure out what's wrong, but am having trouble. One of my thoughts is that after the process converts the implicit data matrix r_ui to the binary matrix p_ui and confidence matrix c_ui, I'm not sure what it's comparing the predicted c_ui matrix against during the evaluation stage.
Here is the error:
Traceback (most recent call last):
File "<ipython-input-16-6c43b997005e>", line 1, in <module>
cvModel = cv.fit(dfCounts)
File "C:/spark-1.6.1-bin-hadoop2.6/pythonpysparkmlpipeline.py", line 69, in fit
return self._fit(dataset)
File "C:/spark-1.6.1-bin-hadoop2.6/pythonpysparkmluning.py", line 239, in _fit
model = est.fit(train, epm[j])
File "C:/spark-1.6.1-bin-hadoop2.6/pythonpysparkmlpipeline.py", line 67, in fit
return self.copy(params)._fit(dataset)
File "C:/spark-1.6.1-bin-hadoop2.6/pythonpysparkmlwrapper.py", line 133, in _fit
java_model = self._fit_java(dataset)
File "C:/spark-1.6.1-bin-hadoop2.6/pythonpysparkmlwrapper.py", line 130, in _fit_java
return self._java_obj.fit(dataset._jdf)
File "C:spark-1.6.1-bin-hadoop2.6pythonlibpy4j-0.9-src.zippy4jjava_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:/spark-1.6.1-bin-hadoop2.6/pythonpysparksqlutils.py", line 45, in deco
return f(*a, **kw)
File "C:spark-1.6.1-bin-hadoop2.6pythonlibpy4j-0.9-src.zippy4jprotocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
etc.......
UPDATE
I tried scaling the input so it's in the range of 0 to 1 and using a RMSE evaluator. It seems to work well until I try to insert it into the CrossValidator.
The following code works. I get predictions and i get an RMSE value from my evaluator.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
conf = SparkConf()
.setAppName("ALSPractice")
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# Users 0, 1, 2, 3 - Items 0, 1, 2, 3, 4, 5 - Ratings 0.0-5.0
dfCounts2 = sqlContext.createDataFrame([(0,0,5.0), (0,1,5.0), (0,3,0.0), (0,4,0.0),
(1,0,5.0), (1,2,4.0), (1,3,0.0), (1,4,0.0),
(2,0,0.0), (2,2,0.0), (2,3,5.0), (2,4,5.0),
(3,0,0.0), (3,1,0.0), (3,3,4.0) ],
["user", "item", "rating"])
dfCountsTest2 = sqlContext.createDataFrame([(0,0), (0,1), (0,2), (0,3), (0,4),
(1,0), (1,1), (1,2), (1,3), (1,4),
(2,0), (2,1), (2,2), (2,3), (2,4),
(3,0), (3,1), (3,2), (3,3), (3,4)], ["user", "item"])
# Normalize rating data to [0,1] range based on max rating
colmax = dfCounts2.select(F.max('rating')).collect()[0].asDict().values()[0]
normalize = udf(lambda x: x/colmax, FloatType())
dfCountsNorm = dfCounts2.withColumn('ratingNorm', normalize(col('rating')))
alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCountsNorm)
preds = defaultModelImplicit.transform(dfCountsTest2)
evaluatorR2 = RegressionEvaluator(metricName="rmse", labelCol="ratingNorm")
evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))
preds = defaultModelImplicit.transform(dfCountsTest2)
What I don't understand is why the following doesn't work. I'm using the same estimator, the same evaluator and fitting the same data. Why would these work above but not within the CrossValidator:
paramMapImplicit = ParamGridBuilder()
.addGrid(alsImplicit.rank, [8, 12])
.addGrid(alsImplicit.maxIter, [10, 15])
.addGrid(alsImplicit.regParam, [1.0, 10.0])
.addGrid(alsImplicit.alpha, [2.0,3.0])
.build()
cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR2)
cvModel = cv.fit(dfCountsNorm)
See Question&Answers more detail:
os