I have a raw dataset that contains multiple CSV files in Palantir Foundry. I want to achieve two things:
- Parse CSV files into a dataset.
- Add a new column that contains each file name.
Any suggestions/directions?
I am new to the platform but have bit of understanding of PySpark. I am able to manage parsing files but not adding a column that contain file name.
Below is the code I am using. But it gives only one file name.
from transforms.api import transform, Input, Output, incremental
from transforms.verbs.dataframes import
sanitize_schema_for_parquet
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField,
StringType, IntegerType, TimestampType
@incremental()
@transform(
output=Output("rid"),
raw=Input("rid")
)
def read_csv(ctx, raw, output):
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
file_name = list(raw.filesystem().ls(glob='*.csv'))[0].path
csv_schema = StructType([
StructField("SamaccountName", StringType(), True),
StructField("DisplayName", StringType(), True),
StructField("Alias", StringType(), True),
StructField("PrimarysmtpAddress", StringType(), True),
StructField("TotalMBXSize", StringType(), True),
StructField("UserMailboxSize", StringType(), True),
StructField("TotalDeletedItemSize", StringType(), True),
StructField("ItemCount", IntegerType(), True),
StructField("LocatedOnDatabase", StringType(), True),
StructField("CreationDateTime", TimestampType(), True),
StructField("MailboxType", StringType(), True),
StructField("RecipientTypeDetails", StringType(), True),
StructField("RetentionPolicy", StringType(), True),
StructField("CustomAttribute3", StringType(), True),
StructField("CustomAttribute7", StringType(), True)
])
df = (
ctx
.spark_session
.read
# .option("encoding", "UTF-8")
.option("header", True)
.schema(csv_schema)
.option("delimiter", ";")
.option("inferSchema", True)
.csv(files)
).withColumn("file_name", F.lit(file_name))
df = df.withColumn(
"Date", F.to_date(
F.regexp_extract(df.file_name, r'Outlookstats_(\d{8})_\d{6}\.csv', 1), 'yyyyMMdd'))
output.write_dataframe(sanitize_schema_for_parquet(df))
You may use input_file_name SQL function while reading the files along with withColumn method. Please refer the below code snippet: