How to read only n rows of large CSV file on HDFS using spark-csv package?

44.2k Views Asked by At

I have a big distributed file on HDFS and each time I use sqlContext with spark-csv package, it first loads the entire file which takes quite some time.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

now as I just want to do some quick check at times, all I need is few/ any n rows of the entire file.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

but all these run after the file load is done. Can't I just restrict the number of rows while reading the file itself ? I am referring to n_rows equivalent of pandas in spark-csv, like:

pd_df = pandas.read_csv("file_path", nrows=20)

Or it might be the case that spark does not actually load the file, the first step, but in this case, why is my file load step taking too much time then?

I want

df.count()

to give me only n and not all rows, is it possible ?

7

There are 7 best solutions below

5
On BEST ANSWER

My understanding is that reading just a few lines is not supported by spark-csv module directly, and as a workaround you could just read the file as a text file, take as many lines as you want and save it to some temporary location. With the lines saved, you could use spark-csv to read the lines, including inferSchema option (that you may want to use given you are in exploration mode).

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
1
On

Not inferring schema and using limit(n) worked for me, in all aspects.

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

Note: If we use inferschema='true', its again the same time, and maybe hence the same old thing.

But if we dun have idea of the schema, Jacek Laskowski solutions works well too. :)

9
On

You can use limit(n).

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

This will just load 20 rows.

1
On

The solution given by Jacek Laskowski works well. Presenting an in-memory variation below.

I recently ran into this problem. I was using databricks and had a huge csv directory (200 files of 200MB each)

I originally had

val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")

display(df)

which took a lot of time (10+ minutes), but then I change it to below and it ran instantly (2 seconds)

val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)

val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))

display(df)

Inferring schema for text formats is hard and it can be done this way for the csv and json (but not if it's a multi-line json) formats.

0
On

Since I didn't see that solution in the answers, the pure SQL-approach is working for me:

df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")

If there is no header the columns will be named _c0, _c1, etc. No schema required.

0
On

May be this would be helpful who is working in java. Applying limit will not help to reduce the time. You have to collect the n rows from the file.

        DataFrameReader frameReader = spark
          .read()
          .format("csv")
          .option("inferSchema", "true");
    //set framereader options, delimiters etc

    List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
    return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));
2
On

Since PySpark 2.3 you can simply load data as text, limit, and apply csv reader on the result:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

Scala counterpart is available since Spark 2.2:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

In Spark 3.0.0 or later one can also apply limit and use from_csv function, but it requires a schema, so it probably won't fit your requirements.