pyspark MQTT structured streaming with apache bahir

I'm using spark 2.4 and I've run pyspark like this:

./bin/pyspark --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.2

pyspark runs successfully. (But when I run spark-sql-streaming-mqtt_2.11:2.4.0-SNAPSHOT, got an error)

I'm trying to get data from a MQTT broker using structured streaming. so, I've run this

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import explode
>>> from pyspark.sql.functions import split
>>> spark = SparkSession \
...     .builder \
...     .appName("Test") \
...     .getOrCreate()
>>> lines = spark.readStream\
...     .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
...     .option("topic", "/sensor")\
...     .option("brokerUrl", "tcp://localhost:1883")\
...     .load()

the error shown:

2019-03-22 01:24:43 WARN  MQTTUtils:51 - If `clientId` is not set, a random value is picked up.
Recovering from failure is not supported in such a case.
Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/opt/spark/python/pyspark/sql/", line 400, in load
    return self._df(self._jreader.load())
  File "/opt/spark/python/lib/", line 1257, in __call__
  File "/opt/spark/python/pyspark/sql/", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o43.load.
: MqttException (0)
    at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.checkIsOpen(
    at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.getFiles(
    at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.close(
    at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.stop(MQTTStreamSource.scala:228)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:190)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
    at java.lang.reflect.Method.invoke(
    at py4j.reflection.MethodInvoker.invoke(
    at py4j.reflection.ReflectionEngine.invoke(
    at py4j.Gateway.invoke(
    at py4j.commands.AbstractCommand.invokeMethod(
    at py4j.commands.CallCommand.execute(

I tried to stream MQTT data for a week. But I don't think there is a way to solve it and it is really desperate. Is there no way I can solve it? Thank you.


Try to set the persistence option.

Example :

   val lines = spark.readStream.format("datasource.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic)
  .option("cleanSession", "true")