I used Apache Bahir's AMQSource connector that listens to ActiveMQ, but when I run the Flink job to consume the data from ActiveMQ no output is generated.
For example, the connector is listening to ActiveMQ which contains 4 messages, but when I run the Flink job no data get consumed.
val brokerURL = "tcp://localhost:61616"
val destinationName = "TEST.FOO"
val filePath = "C:\\output" + System.currentTimeMillis + ".csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new MemoryStateBackend(1000, false))
val config = new AMQSourceConfig.AMQSourceConfigBuilder[String]()
.setConnectionFactory(new ActiveMQConnectionFactory(brokerURL))
.setDestinationName(destinationName)
.setDeserializationSchema(new SimpleStringSchema)
.setDestinationType(DestinationType.QUEUE)
.setRunningChecker(new RunningChecker).build
val amqSource = new AMQSource[String](config)
val stream = env.addSource(amqSource)
stream.map(/*Some MapFunction*/)
stream.writeAsText(filePath)
stream.print
env.execute
AMQSource expects message as bytes, see code from run method under AMQSource.class:
When produce data to ActiveMQ instead of text message:
Use bytes message: