How to use Twitter Heron with Storm Flux

212 Views Asked by At

I am trying to migrate a project from apache-storm to twitter-heron. After a lot of struggle, I was able to get rid of most of the errors, such as use className: "org.apache.storm.kafka.ZkHosts" instead of className: "storm.kafka.ZkHosts". However, I am stuck in submitting the topology. I use Flux to submit the topology to storm.

I am getting a NullPointerException when it creates a CuratorFramework object in ZkState. On further digging I found an issue in github where it says, this issue is caused if the configurations about the zookeeper were not set.

Further debugging I found the issue is because I am missing the following configurations which are required in ZkState.java:46.

storm.zookeeper.session.timeout
storm.zookeeper.connection.timeout
storm.zookeeper.retry.times
storm.zookeeper.retry.interval

While I have managed to identify the issue, However, I am not sure where to add this in my config. Can someone please help me in where to add the above config. Thank you.

My Flux Config

name: "My_Topology"
components:
  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "localhost:2181"

  - id: "SpoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts" # brokerHosts
      - "my-topic"  # topic
      - "/my-zkRoot" # zkRoot
      - "my-id" # spoutId
    properties: 
      - name: "zkServers"
        value: ["localhost"]
      - name: "zkPort"
        value: 2181
      - name: "zkRoot"
        value: "/my-zkRoot"
      - name: "retryInitialDelayMs"
        value: 2000
      - name: "retryDelayMultiplier"
        value: 2

config:
  topology.workers: 5
  topology.testing.always.try.serialize: true

spouts:
  - id: "kafka-spout"
    className: "org.apache.storm.kafka.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "SpoutConfig"
bolts:
  - id: "my-bolt"
    className: "com.example.sample.MyBolt"
    parallelism: 1

streams:
  - name: "kafka_spout --> my_bolt"
    from: "kafka-spout"
    to: "my-bolt"
    grouping:
      type: SHUFFLE
2

There are 2 best solutions below

0
On BEST ANSWER

you can add these to the config section to your Flux yaml file

config:
  topology.workers: 5
  topology.testing.always.try.serialize: true
  storm.zookeeper.session.timeout: 30000
  storm.zookeeper.connection.timeout: 30000
  storm.zookeeper.retry.times: 5
  storm.zookeeper.retry.interval: 2000
0
On

We have natively integrated Storm Flux into Heron to make it easy. With Heron ECO - you can write the topologies for two different API

  • Native Heron API
  • Storm API

For additional information, please take a look at the ECO documentation here

https://apache.github.io/incubator-heron/docs/developers/java/eco-api/