azure eventhub receive not working in linux

516 Views Asked by At

I am working on a project where I read device events from azure IotHub. I used the code provided in link . I am using spring boot for my project.The code was working completely fine on my windows machine, but when I deployed the code on my server(linux machine) the azure eventhub receive is not working. I also tried the code on a local linux machine and I am facing the same issue, the applications throws the below error and doesn't receive any new events.I have also enabled the ports 5671 and 9352.

    Error:
    "Operation not allowed after the com.microsoft.azure.servicebus.MessageReceiver instance is Closed."

    **Code**:
    public class ReceiveEventService {

    public void startListening() {
            EventHubClient client0 = receiveMessages("0");
            EventHubClient client1 = receiveMessages("1");
            try {
                log.info(this.getClass(), "startListening", "Started listening to azure");
                System.in.read();
                client0.closeSync();
                client1.closeSync();
                System.exit(0);
            } catch (ServiceBusException | IOException e) {
                log.error(this.getClass(), "error in azure startListening", e);
            }
        }

    private EventHubClient receiveMessages(final String partitionId) {
            EventHubClient client = null;
            try {
                client = EventHubClient.createFromConnectionStringSync(deviceEndPointString);
            } catch (Exception e) {
                log.error(this.getClass(), "receiveMessages - Failed to create client", e);
                 System.exit(1);
            }
            try {
    client.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId, Instant.now())
                        .thenAccept(new Consumer<PartitionReceiver>() {
                            public void accept(PartitionReceiver receiver) {
                                log.info(this.getClass(), "receiveMessages",
                                        " *Created receiver on partition " + partitionId);
                                try {
                                    while (true) {
                                        Iterable<EventData> receivedEvents = receiver.receive(100).get();
                                        int batchSize = 0;
                                        if (receivedEvents != null) {
                                            for (EventData receivedEvent : receivedEvents) {
                                                log.info(this.getClass(), "receiveMessages", "Device ID:"
                                                        + receivedEvent.getSystemProperties()
                                                                .get("iothub-connection-device-id")
                                                        + " offset:" + receivedEvent.getSystemProperties().getOffset()
                                                        + " EnqueueTime:"
                                                        + receivedEvent.getSystemProperties().getEnqueuedTime() + "SeqNo:"
                                                        + receivedEvent.getSystemProperties().getSequenceNumber());
                                                batchSize++;
                                            }
                                        }
                                        log.debug(this.getClass(), "receiveMessages:",
                                                "Partition " + partitionId + " ReceivedBatch size" + batchSize);
                                    }
                                } catch (Exception e) {
                                    log.error(this.getClass(), "receiveMessages-Failed to receive messages", e);
                                }
                            }
                        });

            } catch (Exception e) {
                log.error(this.getClass(), "receiveMessages-Failed to create receiver", e);
            }
            return client;
        }

    }

The dependencies in my pom.xml is as below.

<dependency>
            <groupId>com.microsoft.azure.sdk.iot</groupId>
            <artifactId>iot-service-client</artifactId>
            <version>1.3.19</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-eventhubs-eph</artifactId>
            <version>0.13.0</version>
        </dependency>

        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-eventhubs</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.azure.sdk.iot</groupId>
            <artifactId>iot-device-client</artifactId>
            <version>1.1.26</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-servicebus</artifactId>
            <version>0.9.7</version>
        </dependency>

My main class is as below. My code execution is working fine on linux also, but only the azure eventhub part is throwing error and not working.

@SpringBootApplication
public class AzureApplication extends SpringBootServletInitializer{

    public static void main(String[] args) {
        SpringApplication.run(AzureApplication.class, args);
    }

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(AzureApplication.class);
    }

}
1

There are 1 best solutions below

0
On

I commented the lines "client.closeSync()" and everything started working perfectly both on my local linux machine and azure VM (linux). I am not sure if this is the right approach but the workaround seems to work as of now.

`public void startListening() {
            EventHubClient client0 = receiveMessages("0");
            EventHubClient client1 = receiveMessages("1");
            try {
                log.info(this.getClass(), "startListening", "Started listening to azure");
               // System.in.read();
               // client0.closeSync();
               // client1.closeSync();
               // System.exit(0);
            } catch (Exception e) {
                log.error("error in azure startListening", e);
            }
        }`