I'm trying to train ALS with data in each batch from kafka using spark streaming and facing with below error.
I think it's because the rating
column is negative or something invalid like wrong data type, so I filtered and changed to double it but it doesn't work.
pyspark.sql.utils.StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "D:\DE\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 196, in call
raise e
File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 193, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "D:/Project/Final/Code/consumer.py", line 19, in train_data
model = als.fit(train)
File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\ml\base.py", line 161, in fit
return self._fit(dataset)
File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\ml\wrapper.py", line 335, in _fit
java_model = self._fit_java(dataset)
File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\ml\wrapper.py", line 332, in _fit_java
return self._java_obj.fit(dataset._jdf)
File "D:\DE\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.IllegalArgumentException: requirement failed: No ratings available from MapPartitionsRDD[55] at map at ALS.scala:699
my code
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
.option("failOnDataLoss", "false")\
.option("subscribe", TOPIC_NAME) \
.option("startingOffsets","earliest")\
.load()
print('reading data from topic:',TOPIC_NAME,'...')
df_1 = df.selectExpr("CAST(value AS STRING)")
df_2 = df_1.select(regexp_replace('value','\\\\','').alias('value'))
data = df_2.withColumn('value',F.expr("substring(value,2,length(value)-1)"))
data2 = data.select(F.json_tuple(F.col('value'),
'event_id','event_time','event_type','product_id','category_id',
'category_code','brand','price','user_id','user_session'))\
.toDF('event_id','event_time','event_type','product_id','category_id',
'category_code','brand','price','user_id','user_session')\
.filter(F.col('brand').isNotNull())
l1_product = data2.select('product_id','category_id','category_code','brand','price').distinct()
l1_ratings = data2.withColumn('timestamp',F.to_utc_timestamp('event_time','+07:00')\
.cast(TimestampType()))\
.select('user_id','product_id','event_type','timestamp')\
.withWatermark('timestamp','10 minutes')\
.withColumn('score',F.when(F.col('event_type')=='remove_from_cart',-1)
.when(F.col('event_type')=='purchase',3)
.otherwise(1))\
.groupBy('user_id','product_id','timestamp').agg(F.sum('score').alias('rating'))\
.withColumn('rating',F.when(F.col('rating')>5,5)
.otherwise(F.col('rating'))).alias('rating')\
.withColumn('rating',F.col('rating').cast(DoubleType()))\
.withColumn('user_id',F.col('user_id').cast(LongType()))\
.withColumn('product_id',F.col('product_id').cast(LongType()))\
.filter('rating >= 0')
write = l1_ratings.writeStream\
.foreachBatch(train_data).start()
and the train_data
function
def train_data(df,epoch_id) -> None:
(train, test) = df.randomSplit([0.8, 0.2])
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="product_id", ratingCol="rating",
coldStartStrategy="drop", rank=10, seed=0)
model = als.fit(train)
userRecs = model.recommendForAllUsers(10)
userRecs.writeStream\
.trigger(processingTime='0 seconds') \
.outputMode('append') \
.option('truncate','false') \
.format('console') \
.start()
So, how can i fix it?