How did parameterizing this function make my Zio project stop working?

428 Views Asked by At

I've started writing a server that uses zio-http to forward messages from a Pulsar topic to a WebSocket. It was working fine, but I realized I was creating the consumer again when the socket was being closed, so I refactored that code to accept the consumer as a parameter. Now, the messages don't get forwarded. In fact, they don't even get put into the Queue, which puzzles me even more. I've only been using Zio for a few days, so I can't yet get my head around what I've done wrong.

Here is the socket code before the change:

package com.example.dashback

import zio.*
import zio.stream.{ZSink, ZStream}
import zhttp.http.*
import zhttp.socket.*
import zhttp.service.ChannelEvent
import zhttp.service.ChannelEvent.UserEvent.HandshakeComplete
import zhttp.service.ChannelEvent.{ChannelUnregistered, UserEventTriggered}
import com.example.dashback.pulsar.PulsarService
import org.apache.pulsar.client.api.{Message, Schema}

def log(s: String) = println(s"\u001b[38;5;226;48;5;19m${s}\u001b[0m")
def zlog(s: String) = ZIO.succeed(log(s))

object WebSocketHandler:
    private def createSocket(pulsar: PulsarService) = {
        val consumerName = s"dashboard-events-${scala.util.Random.nextInt(Int.MaxValue).toHexString}"
        val consumer = pulsar.consumer[String](consumerName, Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)

        val socket = Http.collectZIO[WebSocketChannelEvent] {
            case ChannelEvent(channel, UserEventTriggered(HandshakeComplete)) =>
                for
                    c <- consumer
                    s = ZStream.fromQueue(c.queue)
                    f <- s.foreach { msg =>
                        channel.writeAndFlush(WebSocketFrame.text(msg.getValue))
                    }
                yield s

            case ChannelEvent(channel, ChannelUnregistered) =>
                for
                    _ <- ZIO.log("WebSocket closed; closing consumer")
                    c <- consumer
                    _ <- ZIO.attempt(c.consumer.close())
                yield ()

            case e =>
                for
                    _ <- ZIO.log(e.toString)
                yield ()
        }

        socket.toSocketApp.toResponse
    }

    val app: Http[PulsarService & JwtService, Throwable, Request, Response] = Http.collectZIO[Request] {
        case r @ Method.GET -> !! / "ws" =>
            for
                pulsar <- ZIO.service[PulsarService]
                s <- createSocket(pulsar)
            yield s
    }

And here is the code after the change, where I create the consumer outside of the WebSocket message handler and pass it in as an argument, so that the same one get used both at connection and disconnection:

package com.example.dashback

import zio.*
import zio.stream.{ZSink, ZStream}
import zhttp.http.*
import zhttp.socket.*
import zhttp.service.ChannelEvent
import zhttp.service.ChannelEvent.UserEvent.HandshakeComplete
import zhttp.service.ChannelEvent.{ChannelUnregistered, UserEventTriggered}
import com.example.dashback.pulsar.{PulsarConsumer, PulsarService}
import org.apache.pulsar.client.api.{Consumer, Message, Schema}

def log(s: String) = println(s"\u001b[38;5;226;48;5;19m${s}\u001b[0m")
def zlog(s: String) = ZIO.succeed(log(s))

object WebSocketHandler:
    private def createSocket(pulsar: PulsarService) = {
        val consumerName = s"dashboard-events-${scala.util.Random.nextInt(Int.MaxValue).toHexString}"
        val consumer = pulsar.consumer[String](consumerName, Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)

        def socket(c: PulsarConsumer[String]) = Http.collectZIO[WebSocketChannelEvent] {
            case ChannelEvent(channel, UserEventTriggered(HandshakeComplete)) =>
                for
                    _ <- zlog("We get here")
                    _ <- ZStream.fromQueue(c.queue).foreach { msg =>
                        channel.writeAndFlush(WebSocketFrame.text(msg.getValue))
                    }.fork
                    _ <- channel.writeAndFlush(WebSocketFrame.text("Hello\n"))
                yield ()

            case ChannelEvent(channel, ChannelUnregistered) =>
                for
                    _ <- ZIO.log("WebSocket closed; closing consumer")
                    _ <- ZIO.attempt(c.consumer.close())
                    }
                yield ()

            case e =>
                for
                    _ <- ZIO.log(e.toString)
                yield ()
        }

        for
            c <- consumer
            s <- socket(c).toSocketApp.toResponse
        yield s
    }

    val app: Http[PulsarService & JwtService, Throwable, Request, Response] = Http.collectZIO[Request] {
        case r @ Method.GET -> !! / "ws" =>
            for
                pulsar <- ZIO.service[PulsarService]
                s <- createSocket(pulsar)
            yield s
    }

For reference, here is my Pulsar service:

package com.example.dashback.pulsar

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.jdk.CollectionConverters.*
import zio.*
import zio.stream.ZStream
import org.apache.pulsar.client.api.{Consumer, Message, MessageListener, PulsarClient, Schema, SubscriptionType}

import com.example.dashback.{log, zlog}

case class PulsarConsumer[T](consumer: Consumer[T], queue: Queue[Message[T]])

class PulsarServiceImpl extends PulsarService:
    private val client = PulsarClient.builder()
        .serviceUrl("pulsar://pulsar.example.com:6650")
        .build()

    def consumer[T](subscriptionName: String, topics: Seq[String], schema: Schema[T]): Task[PulsarConsumer[T]] =
        def createConsumer =
            ZIO.attempt {
                client.newConsumer(schema)
                    .topics(topics.asJava)
                    .subscriptionName(subscriptionName)
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe()
            }

        def receive(consumer: Consumer[T], queue: Queue[Message[T]]): Task[Message[T]] =
            ZIO.async { cb =>
                consumer.receiveAsync().thenAccept { msg =>
                    cb {
                        for
                            _ <- zlog("Offering message to queue")
                            _ <- queue.offer(msg)
                            _ <- zlog("Acknowledging message")
                            _ <- ZIO.attempt(consumer.acknowledge(msg))
                            _ <- zlog("Acknowledged")
                            v <- ZIO.attempt(msg)
                        yield v
                    }
                }
            }

        for
            q <- Queue.unbounded[Message[T]]
            c <- createConsumer
            _ <- receive(c, q).forever.fork
        yield PulsarConsumer(c, q)

object PulsarServiceImpl:
    val layer = ZLayer.succeed(new PulsarServiceImpl)

"Offering message to the queue" does not appear in the log after the change. I do get the manually sent "Hello" on the socket, though.

I've also tried an alternative version of the receive function, and "Putting test message in the queue" doesn't even appear in the log.

        def receive(consumer: Consumer[T], queue: Queue[Message[T]]): Task[Unit] =
            for
                _ <- zlog("[Pulsar] Putting test message in the queue")
                _ <- queue.offer(MessageImpl.create(new MessageMetadata, ByteBuffer.wrap("Test message".getBytes), schema, topics.head))

                _ <- zlog("[Pulsar] Attempting to receive a message")
                _ <- ZIO.async[Any, Throwable, Message[T]] { cb =>
                    log("[Pulsar] Calling receiveAsync")
                    consumer.receiveAsync().thenAccept { msg =>
                        log("A message was received")
                        cb {
                            for
                                _ <- zlog("Offering message to queue")
                                _ <- queue.offer(msg)
                                _ <- zlog("Acknowledging message")
                                _ <- ZIO.attempt(consumer.acknowledge(msg))
                                _ <- zlog("Acknowledged")
                                v <- ZIO.attempt(msg)
                            yield v
                        }
                    }
                }
            yield ()

Update: I commented out the WebSocket parts, and directly created a consumer in my run method, and that works fine, so the problem doesn't appear to be there. I get both the manually inserted test message and any messages I publish to the Pulsar topic.

    def run = for
        _ <- ZIO.log("Dashback starting")
        c <- (new PulsarServiceImpl).consumer("test", Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)
        _ <- ZStream.fromQueue(c.queue).foreach { msg =>
            zlog(msg.getValue)
        }
    yield ()
0

There are 0 best solutions below