I am running several experiments in notebooks running on a Spark Dataproc cluster. Many of the functions stay the same from experiment to experiment (such as data preprocessing). But, after some experiments, I realize some of the preprocessing needs to be tweaked then I have to go back and make the change in every duplicated notebook. I would prefer to script the functions so they can be used by all notebooks and changed just once. I found a workaround for importing local modules here. And that generally works. Except, if my utils.py file (where all of my imported modules live) has a spark import such as from pyspark.sql import functions as F the imported function can't seem to find that import when it runs.
Example code:
utils.py
from pyspark.sql import functions as F
def create_spark_df(table_name):
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
return spark.read.format("bigquery").option("table",table_name).load()
def get_filtered_customers_table():
customers = create_spark_df('customers_table_name')
return (
customers
.where(F.col('email').isNotNull())
)
notebook cell for workaround
from google.cloud import storage
def get_module():
client = storage.Client()
bucket = client.get_bucket('gcs_bucket_name')
blobs = list(client.list_blobs(bucket,prefix='notebooks/jupyter/path_to_utils'))
#define the path to your python files at prefix
for blob in blobs[1:]: # skip 1st element since this is the top directory
name = blob.name.split('/')[-1] # get the filename only
blob.download_to_filename(name) # download python files to the local dataproc cluster
def run_module(name, args=None):
get_module()
import utils
if name=='filtered_customers':
return utils.get_filtered_customers()
Notebook cell that calls the module
filtered_customers = run_module('filtered_customers')
Although I have from pyspark.sql import functions as F in both the notebook and the utils.py file, I still get this error: NameError: name 'F' is not defined.
Any ideas why this would be happening??
I've tried googling every way I can think of and I've added the psyspark import to the specific function and notebook cell. No dice. Really hoping to figure out IF there is a way to solve this. Maybe I'm just out of luck. I should also note that create_spark_df function without filtering works just fine AND has an import -> from pyspark.sql import SparkSession -> in utils.py. Not sure why the import for functions specifically is causing an issue. Also tried that without the alias. At this point, I'm out of ideas.