pyspark ml training ALS: No ratings available from MapPartitionsRDD

222 Views Asked by At

I'm trying to train ALS with data in each batch from kafka using spark streaming and facing with below error.
I think it's because the rating column is negative or something invalid like wrong data type, so I filtered and changed to double it but it doesn't work.

pyspark.sql.utils.StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "D:\DE\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 196, in call
    raise e
  File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "D:/Project/Final/Code/consumer.py", line 19, in train_data
    model = als.fit(train)
  File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\ml\base.py", line 161, in fit
    return self._fit(dataset)
  File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\ml\wrapper.py", line 335, in _fit
    java_model = self._fit_java(dataset)
  File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\ml\wrapper.py", line 332, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "D:\DE\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "D:\DE\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: requirement failed: No ratings available from MapPartitionsRDD[55] at map at ALS.scala:699

my code

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
    .option("failOnDataLoss", "false")\
    .option("subscribe", TOPIC_NAME) \
    .option("startingOffsets","earliest")\
    .load()
  print('reading data from topic:',TOPIC_NAME,'...')

  df_1 = df.selectExpr("CAST(value AS STRING)")
  df_2 = df_1.select(regexp_replace('value','\\\\','').alias('value'))
  data = df_2.withColumn('value',F.expr("substring(value,2,length(value)-1)"))


  data2 = data.select(F.json_tuple(F.col('value'),
                    'event_id','event_time','event_type','product_id','category_id',
                    'category_code','brand','price','user_id','user_session'))\
              .toDF('event_id','event_time','event_type','product_id','category_id',
                    'category_code','brand','price','user_id','user_session')\
              .filter(F.col('brand').isNotNull())
  l1_product = data2.select('product_id','category_id','category_code','brand','price').distinct()
  l1_ratings = data2.withColumn('timestamp',F.to_utc_timestamp('event_time','+07:00')\
                                             .cast(TimestampType()))\
                    .select('user_id','product_id','event_type','timestamp')\
                    .withWatermark('timestamp','10 minutes')\
                    .withColumn('score',F.when(F.col('event_type')=='remove_from_cart',-1)
                                          .when(F.col('event_type')=='purchase',3)
                                          .otherwise(1))\
                    .groupBy('user_id','product_id','timestamp').agg(F.sum('score').alias('rating'))\
                    .withColumn('rating',F.when(F.col('rating')>5,5)
                                          .otherwise(F.col('rating'))).alias('rating')\
                    .withColumn('rating',F.col('rating').cast(DoubleType()))\
                    .withColumn('user_id',F.col('user_id').cast(LongType()))\
                    .withColumn('product_id',F.col('product_id').cast(LongType()))\
                    .filter('rating >= 0')
            
  write = l1_ratings.writeStream\
          .foreachBatch(train_data).start()

and the train_data function

def train_data(df,epoch_id) -> None:
  (train, test) = df.randomSplit([0.8, 0.2])
  als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="product_id", ratingCol="rating",
          coldStartStrategy="drop", rank=10, seed=0)
  model = als.fit(train)  

  userRecs = model.recommendForAllUsers(10)
  userRecs.writeStream\
            .trigger(processingTime='0 seconds') \
            .outputMode('append') \
            .option('truncate','false') \
            .format('console') \
            .start()   

So, how can i fix it?

0

There are 0 best solutions below