PySpark raising Py4JavaError when trying to fit ALS model

44 Views Asked by At

I keep running into the same Py4JavaError every time I try to fit an ALS model. How to fix this? -- I'm using a Jupyter cloud notebook.

Py4JJavaError                             Traceback (most recent call last)
Cell In[20], line 4
      1 from pyspark.sql.functions import col
      3 #fit ALS model on training 
----> 4 model = als.fit(train_df)
      6 #generate preds on test
      7 predictions = model.transform(test_df)
> File ~/anaconda3/lib/python3.11/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params)
    203         return self.copy(params)._fit(dataset)
    204     else:
--> 205         return self._fit(dataset)
    206 else:
    207     raise TypeError(
    208         "Params must be either a param map or a list/tuple of param maps, "
    209         "but got %s." % type(params)
    210     )
> Py4JJavaError: An error occurred while calling o586.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in stage 98.0 (TID 245) (192.168.0.131 executor driver): java.lang.RuntimeException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

I have tried to re classify all columns (i.e., make Yards Gained a Float, confirm all IDs used are numeric, etc.), various ALS params such as different iterations or trying True for nonneg, and I keep running into this problem. I have verified my version of Java, Python, and have redownloaded Spark and hadoop libraries several times -- I am unsure of how to fix this error any recommendations?

# In[1]:

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, when, floor
from pyspark.ml.evaluation import RegressionEvaluator

# In[2]:

import sys
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

# In[3]:

#create spark sesh/app
spark = SparkSession.builder.appName("NFL Play Recommender").getOrCreate()

# In[4]:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


# In[5]:


#df
data = spark.read.csv("ALL.csv", header=True, inferSchema=True)


# In[6]:


#do Time in sec because no ints i guess?
data = data.withColumn("TotalTimeSeconds", 
                        (floor(col("TotalTime") * 60)))
#same with pass and rushing defense 
data = data.withColumn("ScaledPassD", (col("Pass D") * 10).cast("integer"))
data = data.withColumn("ScaledRushD", (col("Rush D") * 10).cast("integer"))


# In[7]:


#situationID
data = data.withColumn("SituationID",
                        col("Location Actual") + 
                        col("ToGo") * 10 + 
                        col("Down") * 100 +
                        col("Lead Num") * 1000 + 
                        col("TotalTimeSeconds") * 10000 +  
                        col("ScaledPassD") * 100 +  
                        col("ScaledRushD") * 1000 +  
                        when(col("RoofType Num") == 0, 0).when(col("RoofType") == 1, 1).otherwise(2) * 1000 +   
                        when(col("Surface Num") == 1, 1).otherwise(0) * 100  #
                       )
#data.head()


# In[8]:


from pyspark.sql import functions as F
data = data.withColumn('WeatherID', 
                   (F.col('Hourly Feels Like').cast('int') * 10000) +  #adjust scales
                   (F.col('Hourly Wind Gust').cast('int') * 1000) + 
                   (F.col('Hourly Wind Speed').cast('int') * 100) +
                   (F.col('Hourly Icon') * 10) + 
                   (F.col('Dark')))


# In[9]:


display(data.select("Play Type").distinct().show())


# In[10]:


data = data.withColumn('Play Type', when(col('Play Type') == 'Pass', 1).otherwise(2))
display(data.select("Play Type").distinct().show())


# In[11]:


data = data.withColumn("Yards Gained", col("Yards Gained").cast("float")) #convert to float
data.select("Yards Gained").show(5)


# In[12]:


from pyspark.sql import functions as F

def split_train_test_spark(df):
    train_frames = []  # empty lists
    test_frames = []
    
    for season in range(2013, 2023):  # iterate through each season
        season_data = df.filter(df['SeasonID'] == season)  # filter by season
        next_season_data = df.filter(df['SeasonID'] == season + 1)  # to find next and backtrack
        
    
        first_game_id = season_data.select('GameID').orderBy('GameID').first()[0]  # grab first GameID
        first_game_data = season_data.filter(season_data['GameID'] == first_game_id)  # add to test
        test_frames.append(first_game_data)
        
        #get last seasons last game by finding next season's first
        if next_season_data.count() > 0:  # if not the last season
            next_first_game_id = next_season_data.select('GameID').orderBy('GameID').first()[0]
            last_game_data = season_data.filter(season_data['GameID'] == next_first_game_id - 1)
        else:  # if last season, take last game
            last_game_id = season_data.select('GameID').orderBy(F.desc('GameID')).first()[0]
            last_game_data = season_data.filter(season_data['GameID'] == last_game_id)
        
        # Add the last game data to the test set
        test_frames.append(last_game_data)
        
        # Every other game that is not first or last will go to the training set
        remaining_season_data = season_data.filter(
            (season_data['GameID'] != first_game_id) & 
            (season_data['GameID'] != last_game_data.select('GameID').first()[0])
        )
        train_frames.append(remaining_season_data)
        
    # Concatenate all frames for training and testing
    train_df = train_frames[0]
    for frame in train_frames[1:]:
        train_df = train_df.union(frame)
    
    test_df = test_frames[0]
    for frame in test_frames[1:]:
        test_df = test_df.union(frame)
    
    return train_df, test_df

#create data
train_df, test_df = split_train_test_spark(data)  

#.count() for spark apparently 
print(train_df.count(), test_df.count())


# In[13]:

#ALS params
als = ALS(maxIter=10, regParam=0.1, userCol="SituationID", itemCol="PlayID", ratingCol="Yards Gained", coldStartStrategy="drop", nonnegative=False)


# In[18]:


#check data types of all columns used in ALS
als_columns = ['SituationID', 'PlayID', 'Yards Gained', 'Play Type']  # Add any other relevant columns
for col_name in als_columns:
    print(f"Data type of {col_name}: {data.schema[col_name].dataType}")


# In[20]:


from pyspark.sql.functions import col

#fit ALS model on training 
model = als.fit(train_df)
0

There are 0 best solutions below