Spark Streaming Kinesis consumer return empty data

765 Views Asked by At

I am trying to consume a Kinesis Stream using spark streaming libraries, org.apache.spark.streaming.kinesis.KinesisUtils. I can verify that the Stream has data in it using a python script. But however, while trying to write a consumer in scala, I have been getting empty data. here's my code:

def getKinesisData = {

    val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
    val streamName = "myAwesomeStream"
    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials != null, "No AWS credentials found.")

    val kinesisClient = new AmazonKinesisClient(credentials)
    kinesisClient.setEndpoint(endpointUrl)

    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size    
    val numStreams = numShards
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval

    val sparkConfig = new SparkConf().setAppName("myAwesomeApp").setMaster("local")
    val ssc = new StreamingContext(sparkConfig, batchInterval)

    val kinesisStreams = (0 until numStreams).map { i =>
      println(i)
      KinesisUtils.createStream(ssc, "myAwesomeApp", streamName, endpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2
      )
    }

    val unionStreams = ssc.union(kinesisStreams)

    // Convert each line of Array[Byte] to String, and split into words
    val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))

    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
    wordCounts.print()
  }

I got this code as an example from github and I don't really care about all the unions, and flatmapping and wordcounts that have been done in the later part of the code. I just need to know how I can get the actual data from the stream.

UPDATE: It prints the following on the console while I run it

16/12/16 14:57:01 INFO SparkContext: Running Spark version 2.0.0
16/12/16 14:57:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/16 14:57:02 INFO SecurityManager: Changing view acls to: 
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls to: 
16/12/16 14:57:02 INFO SecurityManager: Changing view acls groups to: 
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls groups to: 
16/12/16 14:57:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(username); groups with view permissions: Set(); users  with modify permissions: Set(username); groups with modify permissions: Set()
16/12/16 14:57:02 INFO Utils: Successfully started service 'sparkDriver' on port 54774.
16/12/16 14:57:02 INFO SparkEnv: Registering MapOutputTracker
16/12/16 14:57:02 INFO SparkEnv: Registering BlockManagerMaster
16/12/16 14:57:02 INFO DiskBlockManager: Created local directory at 
16/12/16 14:57:02 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
16/12/16 14:57:02 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/16 14:57:02 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/16 14:57:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://<I masked this IP address and port>
16/12/16 14:57:03 INFO Executor: Starting executor ID driver on host localhost
16/12/16 14:57:03 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54775.
16/12/16 14:57:03 INFO NettyBlockTransferService: Server created on <I masked this IP address and port>
16/12/16 14:57:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 INFO BlockManagerMasterEndpoint: Registering block manager <I masked this IP address and port> with 2004.6 MB RAM, BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.

0 <-- printing shard 
1 <-- printing shard
#### PRINTING kinesisStreams ###### 
Vector(org.apache.spark.streaming.kinesis.KinesisInputDStream@2650f79,    org.apache.spark.streaming.kinesis.KinesisInputDStream@75fc1992)
#### PRINTING unionStreams ######
()
#### words######
org.apache.spark.streaming.dstream.FlatMappedDStream@6fd12c5
#### PRINTING wordCounts######
org.apache.spark.streaming.dstream.ShuffledDStream@790a251b

16/12/16 14:57:03 INFO SparkContext: Invoking stop() from shutdown hook
16/12/16 14:57:03 INFO SparkUI: Stopped Spark web UI at http://<I masked this IP address and port>
16/12/16 14:57:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/16 14:57:03 INFO MemoryStore: MemoryStore cleared
16/12/16 14:57:03 INFO BlockManager: BlockManager stopped
16/12/16 14:57:03 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/16 14:57:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/16 14:57:03 INFO SparkContext: Successfully stopped SparkContext
16/12/16 14:57:03 INFO ShutdownHookManager: Shutdown hook called
16/12/16 14:57:03 INFO ShutdownHookManager: Deleting directory 
2

There are 2 best solutions below

0
On BEST ANSWER

The problem was with the 1.5.2 version of Spark Library that does not work well with Kinesis.

0
On

Hope this can help someone having this issue. If u are facing this issue, it could not be a real error.

Kinesis Kafka Integration uses Receiver API and it runs in a diffrent thread from either Driver or Executors. There is an initial lagging period where you think everything is started but Kinesis Receiver still running some procedures before it actually downloads data from Kinesis.

Solution: TO WAIT, in my case, data appears at Spark side after 40-50 seconds