Databricks: store df row by row - Each row into a json file

33 Views Asked by At

I currently have a df of around 44000 rows. My goal is to store in a blob container (let's name it output) each row into separate jsons. Meaning that i will end up with 44000 jsons and each json will contain one row of the df.

I'm currently using the below piece of code:

# my function to write:
def write_output_file(file_name, storageName, output_df):
  # Setting the final file location
  outputName = "output-" + file_name.split('.')[0]
  output_container_path = f"wasbs://[email protected]"
  output_blob_folder = output_container_path #+ "/" + outputName + "_Final"
  
  # Write the result
  output_df.write.mode("overwrite").option("ignoreNullFields", "false").option("maxRecordsPerFile", 1).json(output_blob_folder)

  return output_blob_folder

# command calling this function:
chunk_size = 5000
num_chunks = final_df.count() // chunk_size + 1

# Split final_df into chunks of 5000 rows
dfs = final_df.randomSplit([1.0] * num_chunks)

# Write each chunk to output file
for i, df in enumerate(dfs):
    write_output_file(fileName, storageName, df)

My problem is that i end up with only one chunk so around 5000 jsons in the blob.

1

There are 1 best solutions below

3
vagitus On

I think you need to use foreach function. I also modified your code a bit:

import json

def write_output_file(row):
  # Create dictionary from PySpark row object
  row_dict = row.asDict()

  # Serialize dictionary as JSON and write to output blob container
  output_blob_path = "wasbs://[email protected]/output/"
  file_name = f"output-{row_dict['<your_column_name_here>']}.json"
  json_content = json.dumps(row_dict, ensure_ascii=False)
  with open(file_name, "w", encoding="utf-8") as f:
      f.write(json_content)

# Apply write_output_file function to each row of the DataFrame
final_df.foreach(write_output_file)