We are using Flink version of 1.13.5 and trying to read the ORC files from AWS S3 location. And, we are deploying our application in a self-managed flink cluster. Please find the below code for reading ORC files,
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.hadoop.conf.Configuration
import java.util.concurrent.TimeUnit
import java.time.LocalDateTime
import org.apache.flink.core.fs.Path
import org.apache.flink.orc.OrcRowInputFormat
val en = StreamExecutionEnvironment.getExecutionEnvironment
val config = new Configuration
config.setInt("fs.s3.connection.maximum", 15000)
config.setString("fs.s3.readahead.range", "1M")
val Schema = "struct<empId:int,name:string,salary:double>"
val orcRowInput = new OrcRowInputFormat("s3a://NotApplicable", Schema, config)
val fileRead = en.readFile(orcRowInput, "s3a://bucket/folder/", FileProcessingMode.PROCESS_CONTINUOUSLY, TimeUnit.HOURS.toMillis(24)).name("s3a://bucket/folder/").uid("s3a://bucket/folder/")
env.execute("Flink APP")
When we execute the above application as a jar in Flink 1.13.5 version cluster, we are facing the below exception,
RUNNING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FileSystem at java.lang.ClassLoader.defineClass(Native Method)
we are having the following dependency jars in our fat jar itself.
- flink-orc_2.11 with version 1.13.5
- hadoop-hdfs-client with version 3.3.6
- hadoop-client with version 3.3.6
- hadoop-common with version 3.3.6
- hadoop-aws with version 3.3.6
- flink-s3-fs-hadoop with version 1.13.5
- Other Flink dependencies with version 1.13.5
Also, we are having the presto and flink-s3 hadoop jars in all the nodes of our flink clusters in below folder structure,
- /opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.13.5.jar
- /opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.13.5.jar
We are not installed with the hadoop package in our Flink cluster under /etc/hadoop/conf folder. And, HADOOP_HOME is also not set in our Flink cluster. Do we need to be installed with hadoop in our Flink cluster in order to read the ORC files from S3 using the above code? Or Are we missing anything? Kindly help us to rectify this issue. Thanks in advance.
You can't include Hadoop in your fat JAR; it needs to be loaded prior to starting the Flink cluster. If you want to use Flink with Hadoop, you need to have a Flink setup that includes the Hadoop dependencies, rather than adding Hadoop as an application dependency. Flink will use the Hadoop dependencies specified by the HADOOP_CLASSPATH environment variable, which can be set in the following way:
This is documented at https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/project-configuration/#hadoop-dependencies