How to connect to IBM COS (Cloud Object Store) using SPARK, How to resolve "No FileSystem for scheme: cos"

1.9k Views Asked by At

I am trying to create connection to IBM COS (Cloud Object Store) using Spark. Spark Version = 2.4.4, Scala Version = 2.11.12.

I am running it locally with correct credentials but I observe following error - "No FileSystem for scheme: cos"

I am sharing code snippet along with error log. Can someone help me resolve this.

Thanks in advance !

Code Snippet:

import com.ibm.ibmos2spark.CloudObjectStorage
import org.apache.spark.sql.SparkSession

object CosConnection extends App{
  var credentials = scala.collection.mutable.HashMap[String, String](
      "endPoint"->"ENDPOINT",
      "accessKey"->"ACCESSKEY",
      "secretKey"->"SECRETKEY"
  )
  var bucketName = "FOO"
  var objectname = "xyz.csv"

  var configurationName = "softlayer_cos" 

  val spark = SparkSession
    .builder()
    .appName("Connect IBM COS")
    .master("local")
    .getOrCreate()


  spark.sparkContext.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
  spark.sparkContext.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
  spark.sparkContext.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")

  var cos = new CloudObjectStorage(spark.sparkContext, credentials, configurationName=configurationName)

  var dfData1 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load(cos.url(bucketName, objectname))

  dfData1.printSchema()
  dfData1.show(5,0)
}

ERROR:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: cos
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
3

There are 3 best solutions below

0
On

You have to set .config("spark.hadoop.fs.stocator.scheme.list", "cos") along with some others fs.cos... configurations.

Here's a Python end-to-end snippet code example that works. Should be straightforward to convert into Scala:

from pyspark.sql import SparkSession

stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
cos_instance_name = '<myCosIntanceName>'
bucket_name = '<bucketName>'
s3_region = '<region>'
cos_iam_api_key = '*******'
iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'

spark_builder = (
    SparkSession
        .builder
        .appName('test_app'))

spark_builder.config('spark.driver.extraClassPath', stocator_jar)
spark_builder.config('spark.executor.extraClassPath', stocator_jar)
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
spark_builder.config("fs.stocator.cos.scheme", "cos")

spark_sess = spark_builder.getOrCreate()

dataset = spark_sess.range(1, 10)
dataset = dataset.withColumnRenamed('id', 'user_idx')

dataset.repartition(1).write.csv(
    f'cos://{bucket_name}.{cos_instance_name}/test.csv',
    mode='overwrite',
    header=True)

spark_sess.stop()
print('done!')
0
On

This issue was resolved by mapping following stocator dependency with SPARK Version = 2.4.4, SCALA Version = 2.11.12

// https://mvnrepository.com/artifact/com.ibm.stocator/stocator
libraryDependencies += "com.ibm.stocator" % "stocator" % "1.0.24"

Make sure you have stocator-1.0.24-jar-with-dependencies.jar in external libraries when you build the package

Also ensure you pass you endpoint as s3.us.cloud-object-storage.appdomain.cloud instead https://s3.us.cloud-object-storage.appdomain.cloud

You can build stocator jar manually and include target/stocator-1.0.24-SNAPSHOT-IBM-SDK.jar jar into ClassPath (if needed) -

git clone https://github.com/SparkTC/stocator
cd stocator
git fetch
git checkout -b 1.0.24-ibm-sdk origin/1.0.24-ibm-sdk
mvn clean install –DskipTests
0
On

I am using spark version 2.4.5 and Scala version 2.11.12 on Windows 10. I have already added classpath for both in the environment variable.

  1. Command to start Spark shell (open command prompt and paste below command):

    spark-shell --packages com.ibm.stocator:stocator:1.0.36

If you are getting details as below it means you have lanched spark-shell successfully. enter image description here

you can also check it on the browser as given in command prompt like -- Spark context Web UI available at http://localhost:4040 (in your case port maybe change).

  1. Set configuration information in scala (my COS location is us-east):

    sc.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
    sc.hadoopConfiguration.set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
    sc.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
    sc.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")
    sc.hadoopConfiguration.set("fs.cos.mycos.access.key", "your access key")
    sc.hadoopConfiguration.set("fs.cos.mycos.secret.key", "your secret key")
    sc.hadoopConfiguration.set("fs.cos.mycos.endpoint", "https://s3.us-east.cloud-object-storage.appdomain.cloud")
    
  2. Get the list of objects from the manifest file:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val cosContent = sqlContext.read.text("cos://someBucketName.mycos/someFile.mf")
    cosContent.show(false)
    

enter image description here

OR, you can read data from the parquet file like below:

 val event1 = sqlContext.read.parquet("cos://someBucketName.mycos/parquetDirectoryName/")
 event1.printSchema()
 event1.count()
 event1.show(false)

enter image description here