I keep running into the same Py4JavaError every time I try to fit an ALS model. How to fix this? -- I'm using a Jupyter cloud notebook.
Py4JJavaError Traceback (most recent call last)
Cell In[20], line 4
1 from pyspark.sql.functions import col
3 #fit ALS model on training
----> 4 model = als.fit(train_df)
6 #generate preds on test
7 predictions = model.transform(test_df)
> File ~/anaconda3/lib/python3.11/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params)
203 return self.copy(params)._fit(dataset)
204 else:
--> 205 return self._fit(dataset)
206 else:
207 raise TypeError(
208 "Params must be either a param map or a list/tuple of param maps, "
209 "but got %s." % type(params)
210 )
> Py4JJavaError: An error occurred while calling o586.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in stage 98.0 (TID 245) (192.168.0.131 executor driver): java.lang.RuntimeException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
I have tried to re classify all columns (i.e., make Yards Gained a Float, confirm all IDs used are numeric, etc.), various ALS params such as different iterations or trying True for nonneg, and I keep running into this problem. I have verified my version of Java, Python, and have redownloaded Spark and hadoop libraries several times -- I am unsure of how to fix this error any recommendations?
# In[1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, when, floor
from pyspark.ml.evaluation import RegressionEvaluator
# In[2]:
import sys
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable
# In[3]:
#create spark sesh/app
spark = SparkSession.builder.appName("NFL Play Recommender").getOrCreate()
# In[4]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# In[5]:
#df
data = spark.read.csv("ALL.csv", header=True, inferSchema=True)
# In[6]:
#do Time in sec because no ints i guess?
data = data.withColumn("TotalTimeSeconds",
(floor(col("TotalTime") * 60)))
#same with pass and rushing defense
data = data.withColumn("ScaledPassD", (col("Pass D") * 10).cast("integer"))
data = data.withColumn("ScaledRushD", (col("Rush D") * 10).cast("integer"))
# In[7]:
#situationID
data = data.withColumn("SituationID",
col("Location Actual") +
col("ToGo") * 10 +
col("Down") * 100 +
col("Lead Num") * 1000 +
col("TotalTimeSeconds") * 10000 +
col("ScaledPassD") * 100 +
col("ScaledRushD") * 1000 +
when(col("RoofType Num") == 0, 0).when(col("RoofType") == 1, 1).otherwise(2) * 1000 +
when(col("Surface Num") == 1, 1).otherwise(0) * 100 #
)
#data.head()
# In[8]:
from pyspark.sql import functions as F
data = data.withColumn('WeatherID',
(F.col('Hourly Feels Like').cast('int') * 10000) + #adjust scales
(F.col('Hourly Wind Gust').cast('int') * 1000) +
(F.col('Hourly Wind Speed').cast('int') * 100) +
(F.col('Hourly Icon') * 10) +
(F.col('Dark')))
# In[9]:
display(data.select("Play Type").distinct().show())
# In[10]:
data = data.withColumn('Play Type', when(col('Play Type') == 'Pass', 1).otherwise(2))
display(data.select("Play Type").distinct().show())
# In[11]:
data = data.withColumn("Yards Gained", col("Yards Gained").cast("float")) #convert to float
data.select("Yards Gained").show(5)
# In[12]:
from pyspark.sql import functions as F
def split_train_test_spark(df):
train_frames = [] # empty lists
test_frames = []
for season in range(2013, 2023): # iterate through each season
season_data = df.filter(df['SeasonID'] == season) # filter by season
next_season_data = df.filter(df['SeasonID'] == season + 1) # to find next and backtrack
first_game_id = season_data.select('GameID').orderBy('GameID').first()[0] # grab first GameID
first_game_data = season_data.filter(season_data['GameID'] == first_game_id) # add to test
test_frames.append(first_game_data)
#get last seasons last game by finding next season's first
if next_season_data.count() > 0: # if not the last season
next_first_game_id = next_season_data.select('GameID').orderBy('GameID').first()[0]
last_game_data = season_data.filter(season_data['GameID'] == next_first_game_id - 1)
else: # if last season, take last game
last_game_id = season_data.select('GameID').orderBy(F.desc('GameID')).first()[0]
last_game_data = season_data.filter(season_data['GameID'] == last_game_id)
# Add the last game data to the test set
test_frames.append(last_game_data)
# Every other game that is not first or last will go to the training set
remaining_season_data = season_data.filter(
(season_data['GameID'] != first_game_id) &
(season_data['GameID'] != last_game_data.select('GameID').first()[0])
)
train_frames.append(remaining_season_data)
# Concatenate all frames for training and testing
train_df = train_frames[0]
for frame in train_frames[1:]:
train_df = train_df.union(frame)
test_df = test_frames[0]
for frame in test_frames[1:]:
test_df = test_df.union(frame)
return train_df, test_df
#create data
train_df, test_df = split_train_test_spark(data)
#.count() for spark apparently
print(train_df.count(), test_df.count())
# In[13]:
#ALS params
als = ALS(maxIter=10, regParam=0.1, userCol="SituationID", itemCol="PlayID", ratingCol="Yards Gained", coldStartStrategy="drop", nonnegative=False)
# In[18]:
#check data types of all columns used in ALS
als_columns = ['SituationID', 'PlayID', 'Yards Gained', 'Play Type'] # Add any other relevant columns
for col_name in als_columns:
print(f"Data type of {col_name}: {data.schema[col_name].dataType}")
# In[20]:
from pyspark.sql.functions import col
#fit ALS model on training
model = als.fit(train_df)