Below is the sample code snippet that is used for data fetch from HBase. This worked fine with Spark 3.1.2. However after upgrading to Spark 3.2.1, it is not working i.e. returned RDD doesn't contain any value. Also, it is not throwing any exception.
def getInfo(sc: SparkContext, startDate:String, cachingValue: Int, sparkLoggerParams: SparkLoggerParams, zkIP: String, zkPort: String): RDD[(String)] = {{
val scan = new Scan
scan.addFamily("family")
scan.addColumn("family","time")
val rdd = getHbaseConfiguredRDDFromScan(sc, zkIP, zkPort, "myTable", scan, cachingValue, sparkLoggerParams)
val output: RDD[(String)] = rdd.map { row =>
(Bytes.toString(row._2.getRow))
}
output
}
def getHbaseConfiguredRDDFromScan(sc: SparkContext, zkIP: String, zkPort: String, tableName: String,
scan: Scan, cachingValue: Int, sparkLoggerParams: SparkLoggerParams): NewHadoopRDD[ImmutableBytesWritable, Result] = {
scan.setCaching(cachingValue)
val scanString = Base64.getEncoder.encodeToString(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(scan).toByteArray)
val hbaseContext = new SparkHBaseContext(zkIP, zkPort)
val hbaseConfig = hbaseContext.getConfiguration()
hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableName)
hbaseConfig.set(TableInputFormat.SCAN, scanString)
sc.newAPIHadoopRDD(
hbaseConfig,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]
).asInstanceOf[NewHadoopRDD[ImmutableBytesWritable, Result]]
}
Also, If we fetch using Scan directly without using NewAPIHadoopRDD, it works.
Software versions:
- Spark: 3.2.1 prebuilt with user provided Apache Hadoop
- Scala: 2.12.10
- HBase: 2.4.9
- Hadoop: 2.10.1
I found out the solution to this one. See this upgrade guide from Spark 3.1.x to Spark 3.2.x: https://spark.apache.org/docs/latest/core-migration-guide.html
It can be set like this on spark-submit:
Alternatively, you can also set these globally at $SPARK_HOME/conf/spark-defaults.conf to apply for every Spark application.