I have the following publish method that connects to a given broker and sends a message and then disconnects:
def publish(mqttCfg: MqttConfig, topic: String, mqttQos: MqttQos): Future[Unit] = {
val client = asyncMqttClient(mqttCfg)
// Define a custom wrapper type to represent the result of the publish operation
sealed trait PublishResult
case class SuccessfulPublish(mqttPublishResult: Mqtt5PublishResult) extends PublishResult
case class FailedPublish(error: Throwable) extends PublishResult
asyncMqttClient(mqttCfg).connect()
.thenCompose(_ => client.publishWith().topic(topic).qos(mqttQos).payload("HELLO WORLD!".getBytes()).send())
.thenAccept(result => {
val publishResult = Try(result)
publishResult match {
case Success(message) =>
println(s"publishedResult: $message") // TODO: Change to logger
case Failure(error) =>
println(s"Failed to publish: ${error.getMessage}") // TODO: Change to logg
}
})
.thenCompose(_ => client.disconnect())
.thenAccept(_ => println("disconnected"))
.asScala.map(_ => ())
}
I then have a Scala test that simply tests this like this:
"MqttClientFactory#publish" should "connect to a local MQTT broker and publish" in {
val mqttConfig = MqttConfig("cpo-platform-test", "test.mosquitto.org", 1883)
val published = MqttClientFactory.publish(
mqttConfig,
"cpo-test-topic",
MqttQos.EXACTLY_ONCE
)
whenReady(published, timeout(Span(100, Seconds))) { Unit => {
val client = MqttClientFactory.asyncMqttClient(mqttConfig)
println("In here ****************** ")
client
.connect()
.thenCompose(_ => client.subscribeWith().topicFilter("cpo-test-topic").qos(MqttQos.EXACTLY_ONCE).callback(println).send())
}
}
}
When I ran this, it results in the following error on the place where I'm waiting for the Future to complete in the whenReady(......)
The future returned an exception of type: java.util.concurrent.CompletionException, with message: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected..
ScalaTestFailureLocation: com.openelectrons.cpo.mqtt.MqttClientFactoryTest at (MqttClientFactoryTest.scala:29)
I tried several brokers on my local machine, the eclipse mosquitto broker, the cedalo broker and all of them return the same message. What am I doing wrong? It is so annoying to have a simple connection to get it working. Any help?
EIDT: Further details added:
def asyncMqttClient(mqttCfg: MqttConfig): Mqtt5AsyncClient = {
Mqtt5Client.builder()
.identifier(mqttCfg.appName)
.serverHost(mqttCfg.serverHost)
.serverPort(mqttCfg.serverPort)
.simpleAuth()
.username(mqttCfg.user.getOrElse(""))
.password(mqttCfg.pass.getOrElse("").getBytes("UTF-8"))
.applySimpleAuth()
.buildAsync()
}
I use the following docker compose to start my local mqtt mosquitto server:
version: "3.7"
services:
mqtt5:
image: eclipse-mosquitto
container_name: mqtt5
ports:
- 1883:1883 #default mqtt port
- 9001:9001 #default mqtt port for websockets
volumes:
- /opt/softwares/mosquitto/mqtt5/config:/mosquitto/config
The MQTT broker is successfully started as shown in the screenshot below:
EDIT:
Here is my mosquitto.conf:
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
Here is a screenshot of the logs:
EDIT:
joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_pub -t /test/message -m 'Hello World!'
joesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 tail -f /mosquitto/log/mosquitto.log
1696296934: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696298735: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696300536: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696302337: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696304138: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696305939: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696307740: Saving in-memory database to /mosquitto/data//mosquitto.db.
1696309170: New connection from 127.0.0.1:39422 on port 1883.
1696309170: New client connected from 127.0.0.1:39422 as auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C (p2, c1, k60).
1696309170: Client auto-8817AB58-2BA0-33D2-5AB0-A6176558E97C disconnected.
^Cjoesan@joesan-InfinityBook-S-14-v5:~$ docker exec -it mqtt5 mosquitto_sub -v -t /test/message
/test/message Hello World!
With the scala test, I see the following logs:
1696310903: New connection from 192.168.208.1:57752 on port 1883.
1696310903: New client connected from 192.168.208.1:57752 as cpo-platform-test (p5, c1, k60).
1696310903: Client cpo-platform-test closed its connection.


Here you have a simple POC that I was able to run in my local. It doesn't validate anything. I only start a eclipse-mosquitto container, connect to the service using the hivemq-mqtt-client, publish a message, subscribe to the topic and print the received message to
stdout.build.sbtdocker-compose.yaml/absolute/path/to/mosquitto/config/mosquitto.confDummyMosquittoTest.scala