java.io.FileNotFoundException error in Apache Spark even though my file exists

2.6k Views Asked by At

I'm new to spark and doing on POC to download a file and then read it. However, I am facing issue that the file doesn't exists.

java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist

But when I printed the path of the file I find out the file exists and the path is also correct.

This is the output

23/02/19 13:10:46 INFO BlockManagerMasterEndpoint: Registering block manager 10.14.142.21:37515 with 2.2 GiB RAM, BlockManagerId(1, 10.14.142.21, 37515, None)
FILE IS DOWNLOADED
['/app/data-Feb-19-2023_131049.json']
23/02/19 13:10:49 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/02/19 13:10:49 INFO SharedState: Warehouse path is 'file:/app/spark-warehouse'.
23/02/19 13:10:50 INFO InMemoryFileIndex: It took 39 ms to list leaf files for 1 paths.
23/02/19 13:10:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 206.6 KiB, free 1048.6 MiB)
23/02/19 13:10:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 35.8 KiB, free 1048.6 MiB)
23/02/19 13:10:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on experian-el-d41b428669cc1e8e-driver-svc.environments-quin-dev-1.svc:7079 (size: 35.8 KiB, free: 1048.8 MiB)
23/02/19 13:10:51 INFO SparkContext: Created broadcast 0 from json at <unknown>:0
23/02/19 13:10:51 INFO FileInputFormat: Total input files to process : 1
23/02/19 13:10:51 INFO FileInputFormat: Total input files to process : 1
23/02/19 13:10:51 INFO SparkContext: Starting job: json at <unknown>:0
23/02/19 13:10:51 INFO DAGScheduler: Got job 0 (json at <unknown>:0) with 1 output partitions
23/02/19 13:10:51 INFO DAGScheduler: Final stage: ResultStage 0 (json at <unknown>:0)
23/02/19 13:10:51 INFO DAGScheduler: Parents of final stage: List()
23/02/19 13:10:51 INFO DAGScheduler: Missing parents: List()
23/02/19 13:10:51 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at json at <unknown>:0), which has no missing parents
23/02/19 13:10:51 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.0 KiB, free 1048.6 MiB)
23/02/19 13:10:51 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.8 KiB, free 1048.5 MiB)
23/02/19 13:10:51 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on experian-el-d41b428669cc1e8e-driver-svc.environments-quin-dev-1.svc:7079 (size: 4.8 KiB, free: 1048.8 MiB)
23/02/19 13:10:51 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1513
23/02/19 13:10:51 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at json at <unknown>:0) (first 15 tasks are for partitions Vector(0))
23/02/19 13:10:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
23/02/19 13:10:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
23/02/19 13:10:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.14.142.21:37515 (size: 4.8 KiB, free: 2.2 GiB)
23/02/19 13:10:52 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.14.142.21:37515 (size: 35.8 KiB, free: 2.2 GiB)
23/02/19 13:10:52 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.14.142.21 executor 1): java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStream(CodecStreams.scala:40)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStreamWithCloseResource(CodecStreams.scala:52)
    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.dataToInputStream(JsonDataSource.scala:195)
    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.createParser(JsonDataSource.scala:199)
    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.$anonfun$infer$4(JsonDataSource.scala:165)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$3(JsonInferSchema.scala:86)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2763)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$2(JsonInferSchema.scala:86)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator.isEmpty(Iterator.scala:387)
    at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
    at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
    at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
    at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
    at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:103)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

23/02/19 13:10:52 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
23/02/19 13:10:52 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 10.14.142.21, executor 1: java.io.FileNotFoundException (File file:/app/data-Feb-19-2023_131049.json does not exist) [duplicate 1]
23/02/19 13:10:52 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
23/02/19 13:10:52 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 10.14.142.21, executor 1: java.io.FileNotFoundException (File file:/app/data-Feb-19-2023_131049.json does not exist) [duplicate 2]
23/02/19 13:10:52 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3) (10.14.142.21, executor 1, partition 0, PROCESS_LOCAL, 4602 bytes) taskResourceAssignments Map()
23/02/19 13:10:52 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 10.14.142.21, executor 1: java.io.FileNotFoundException (File file:/app/data-Feb-19-2023_131049.json does not exist) [duplicate 3]
23/02/19 13:10:52 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
23/02/19 13:10:52 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
23/02/19 13:10:52 INFO TaskSchedulerImpl: Cancelling stage 0
23/02/19 13:10:52 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
23/02/19 13:10:52 INFO DAGScheduler: ResultStage 0 (json at <unknown>:0) failed in 1.128 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.14.142.21 executor 1): java.io.FileNotFoundException: File file:/app/data-Feb-19-2023_131049.json does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStream(CodecStreams.scala:40)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createInputStreamWithCloseResource(CodecStreams.scala:52)
    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.dataToInputStream(JsonDataSource.scala:195)
    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.createParser(JsonDataSource.scala:199)
    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.$anonfun$infer$4(JsonDataSource.scala:165)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$3(JsonInferSchema.scala:86)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2763)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$2(JsonInferSchema.scala:86)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator.isEmpty(Iterator.scala:387)
    at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
    at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
    at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
    at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
    at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:103)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

This is my code to download the file and and print its path

    def find_files(self, filename, search_path):
        result = []

        # Wlaking top-down from the root
        for root, dir, files in os.walk(search_path):
            if filename in files:
                result.append(os.path.join(root, filename))
        return result

    def downloadData(self, access_token, data):
        headers = {
            'Content-Type': 'application/json',
            'Charset': 'UTF-8',
            'Authorization': f'Bearer {access_token}'
        }

        try:
            response = requests.post(self.kyc_url, data=json.dumps(data), headers=headers)
            response.raise_for_status()
            logger.debug("received kyc data")
            response_filename = ("data-" + time.strftime('%b-%d-%Y_%H%M%S', time.localtime()) + ".json")
            with open(response_filename, 'w', encoding='utf-8') as f:
                json.dump(response.json(), f, ensure_ascii=False, indent=4)
            f.close()
            print("FILE IS DOWNLOADED")
            print(self.find_files(response_filename, "/"))
        except requests.exceptions.HTTPError as err:
            logger.error("failed to fetch kyc data")
            raise SystemExit(err)
        return response_filename

This is my code to read the file and upload to minio

    def load(spark: SparkSession, json_file_path: str, destination_path: str) -> None:
        df = spark.read.option("multiline", "true").json(json_file_path)
        df.write.format("delta").save(f"s3a://{destination_path}")

I'm running spark in k8s with spark operator.

This is my SparkApplication manifest

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: myApp
  namespace: demo
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "myImage"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/main.py
  sparkVersion: "3.3.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  timeToLiveSeconds: 86400
  deps:
    packages:
      - io.delta:delta-core_2.12:2.2.0
      - org.apache.hadoop:hadoop-aws:3.3.1
  driver:
    env:
      - name: NAMESPACE
        value: demo
    cores: 2
    coreLimit: "2000m"
    memory: "2048m"
    labels:
      version: 3.3.1
    serviceAccount: spark-driver
  executor:
    cores: 4
    instances: 1
    memory: "4096m"
    coreRequest: "500m"
    coreLimit: "4000m"
    labels:
      version: 3.3.1
  dynamicAllocation:
    enabled: false

Can someone please point out what I am doing wrong ?

Thank you

1

There are 1 best solutions below

6
On BEST ANSWER

If you are running in cluster mode then you need your input files to be shared on a shared FS like HDFS or S3 but not on local FS, since both of driver and executors should have access to the input file.