How to add a column as a file name to a parsed dataset in Palantir Foundry?

164 Views Asked by At

I have a raw dataset that contains multiple CSV files in Palantir Foundry. I want to achieve two things:

  1. Parse CSV files into a dataset.
  2. Add a new column that contains each file name.

Any suggestions/directions?

I am new to the platform but have bit of understanding of PySpark. I am able to manage parsing files but not adding a column that contain file name.

Below is the code I am using. But it gives only one file name.

from transforms.api import transform, Input, Output, incremental
from transforms.verbs.dataframes import 
sanitize_schema_for_parquet
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, 
StringType, IntegerType, TimestampType


@incremental()
@transform(
    output=Output("rid"),
    raw=Input("rid")
)
def read_csv(ctx, raw, output):
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
file_name = list(raw.filesystem().ls(glob='*.csv'))[0].path
csv_schema = StructType([
        StructField("SamaccountName", StringType(), True),
        StructField("DisplayName", StringType(), True),
        StructField("Alias", StringType(), True),
        StructField("PrimarysmtpAddress", StringType(), True),
        StructField("TotalMBXSize", StringType(), True),
        StructField("UserMailboxSize", StringType(), True),
        StructField("TotalDeletedItemSize", StringType(), True),
        StructField("ItemCount", IntegerType(), True),
        StructField("LocatedOnDatabase", StringType(), True),
        StructField("CreationDateTime", TimestampType(), True),
        StructField("MailboxType", StringType(), True),
        StructField("RecipientTypeDetails", StringType(), True),
        StructField("RetentionPolicy", StringType(), True),
        StructField("CustomAttribute3", StringType(), True),
        StructField("CustomAttribute7", StringType(), True)
    ])
df = (
    ctx
    .spark_session
    .read
    # .option("encoding", "UTF-8")
    .option("header", True)
    .schema(csv_schema)
    .option("delimiter", ";")
    .option("inferSchema", True)
    .csv(files)
).withColumn("file_name", F.lit(file_name))
df = df.withColumn(
    "Date", F.to_date(
        F.regexp_extract(df.file_name, r'Outlookstats_(\d{8})_\d{6}\.csv', 1), 'yyyyMMdd'))
output.write_dataframe(sanitize_schema_for_parquet(df))
2

There are 2 best solutions below

0
On

In Foundry, you can use input_file_name() from pyspark.sql.functions. I have tested this in Foundry and it successfully creates a new column containing the filename from the dataset. Your code should look like the following:

from transforms.api import transform, Input, Output, incremental
from transforms.verbs.dataframes import 
sanitize_schema_for_parquet
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, 
StringType, IntegerType, TimestampType


@incremental()
@transform(
 output=Output("rid"),
 raw=Input("rid")
)
def read_csv(ctx, raw, output):
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
csv_schema = StructType([
        StructField("SamaccountName", StringType(), True),
        StructField("DisplayName", StringType(), True),
        StructField("Alias", StringType(), True),
        StructField("PrimarysmtpAddress", StringType(), True),
        StructField("TotalMBXSize", StringType(), True),
        StructField("UserMailboxSize", StringType(), True),
        StructField("TotalDeletedItemSize", StringType(), True),
        StructField("ItemCount", IntegerType(), True),
        StructField("LocatedOnDatabase", StringType(), True),
        StructField("CreationDateTime", TimestampType(), True),
        StructField("MailboxType", StringType(), True),
        StructField("RecipientTypeDetails", StringType(), True),
        StructField("RetentionPolicy", StringType(), True),
        StructField("CustomAttribute3", StringType(), True),
        StructField("CustomAttribute7", StringType(), True)
    ])
df = (
    ctx
    .spark_session
    .read
    .option("header", True)
    .schema(csv_schema)
    .option("delimiter", ";")
    .option("inferSchema", True)
    .csv(files)
).withColumn("file_name", F.regexp_extract(F.input_file_name(), r"files\/(.*)", 1))
df = df.withColumn(
    "Date", F.to_date(
        F.regexp_extract(df.file_name, r'Outlookstats_(\d{8})_\d{6}\.csv', 1), 'yyyyMMdd'))
output.write_dataframe(sanitize_schema_for_parquet(df))

The additional regexp_replace() should remove all of the extra Foundry URL and dataset RID information from the beginning, leaving you with just the filename you expect. At first glance, the filenames might all look the same but remember that there are likely multiple rows per CSV. Be sure to check View Stats on the file_name column in the dataset preview after you build.

2
On

You may use input_file_name SQL function while reading the files along with withColumn method. Please refer the below code snippet:

df = spark.read.csv(input_path)\
     .withColumn("file_name", input_file_name())