Micronaut 3: How to use PubSubEmulatorContainer

1.5k Views Asked by At

Update: Link to repo is moved to answer because repo is now updated with code from answer below.

Problem description

Current code is working, but it is using gcloud beta emulators pubsub from google/cloud-sdk for integration tests.

  • Integration tests are slow due to the size of the google/cloud-sdk image
  • pubsub emulator has to run on a fixed port, there seems to be no way to tell Micronaut which port the emulator is running on

I'll need to set the following environment variable in maven-surefire-plugin.

<environmentVariables>
    <PUBSUB_EMULATOR_HOST>localhost:8085</PUBSUB_EMULATOR_HOST>
</environmentVariables>

How this can be done in Spring Boot

According to Test Containers | Gcloud Module, the correct way of implementing integration tests with PubSubEmulatorContainer in Spring Boot is like this: https://github.com/saturnism/testcontainers-gcloud-examples/blob/main/springboot/pubsub-example/src/test/java/com/example/springboot/pubsub/PubSubIntegrationTests.java

This will bring up the container on a random port, and that is possible because of DynamicPropertyRegistry in Spring. It seems that Micronaut is missing this possibility.

Doc: https://www.testcontainers.org/modules/gcloud/


I'm looking for a working example of a JUnit5 or Spock integration test implemented in Micronaut 3.x that is using PubSubEmulatorContainer like described in the above doc.

Related doc: https://micronaut-projects.github.io/micronaut-gcp/latest/guide/#emulator


There are some comments on GitHub around configuring TransportChannelProvider. I'm able to inject an instance and inspect it, but I still haven't found out exactly what to do.

These are the closest leads so far: https://github.com/micronaut-projects/micronaut-gcp/issues/257 https://github.com/micronaut-projects/micronaut-gcp/pull/259

2

There are 2 best solutions below

0
On BEST ANSWER

Update 2023-07-23 Bumped to Micronaut 4.0.1, refactored away Lombok in Java-demo. Link: pubsub-emulator-demo repo

Update 2023-05-01 Updated pubsub-emulator-demo repo with Kotlin/Kotest-example.

TL;DR

We'll need to start the testcontainer first, get emulator host address and then call ApplicationContext.run like this:

applicationContext = ApplicationContext.run(               
["pubsub.emulator.host": emulatorHost])

Small Github repo with example code: https://github.com/roar-skinderviken/pubsub-emulator-demo

Long answer with code

I finally managed to make a working solution using Micronaut 3.0.2 and Spock. A related Micronaut PR got me on track, together with this article: Micronaut Testing Best Practices https://objectcomputing.com/files/9815/9259/7089/slide_deck_Micronaut_Testing_Best_Practices_webinar.pdf

First a PubSubEmulator class (Groovy)

package no.myproject.testframework.testcontainers

import org.testcontainers.containers.PubSubEmulatorContainer
import org.testcontainers.utility.DockerImageName

class PubSubEmulator {
    static PubSubEmulatorContainer pubSubEmulatorContainer

    static init() {
        if (pubSubEmulatorContainer == null) {
            pubSubEmulatorContainer = new PubSubEmulatorContainer(
                    DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators"))
            pubSubEmulatorContainer.start()
        }
    }
}

Then a fixture for PubSubEmulator (Groovy)

package no.myproject.testframework.testcontainers

trait PubSubEmulatorFixture {
    Map<String, Object> getPubSubConfiguration() {
        if (PubSubEmulator.pubSubEmulatorContainer == null || !PubSubEmulator.pubSubEmulatorContainer.isRunning()) {
            PubSubEmulator.init()
        }
        [
                "pubsub.emulator-host": PubSubEmulator.pubSubEmulatorContainer.getEmulatorEndpoint()
        ]
    }
}

Then a specification class (Groovy) that starts the container, creates a topic and a subscription.

The clue here is to pass in pubsub.emulator.host as part of the configuration when calling ApplicationContext.run.

Remaining part of the code is very similar to the Spring Boot example I linked to in my question.

package no.myproject.testframework

import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.FixedTransportChannelProvider
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.TopicAdminClient
import com.google.cloud.pubsub.v1.TopicAdminSettings
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.PushConfig
import com.google.pubsub.v1.TopicName
import io.grpc.ManagedChannelBuilder
import io.micronaut.context.ApplicationContext
import no.myproject.configuration.GcpConfigProperties
import no.myproject.configuration.PubSubConfigProperties
import no.myproject.testframework.testcontainers.PubSubEmulatorFixture
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

abstract class PubSubSpecification extends Specification
        implements PubSubEmulatorFixture, EnvironmentFixture {

    @AutoCleanup
    @Shared
    EmbeddedServer embeddedServer

    @AutoCleanup
    @Shared
    ApplicationContext applicationContext

    def setupSpec() {

        // start the pubsub emulator
        def emulatorHost = getPubSubConfiguration().get("pubsub.emulator-host")

        // start a temporary applicationContext in order to read config
        // keep any pubsub subscriptions out of context at this stage
        applicationContext = ApplicationContext.run()

        def gcpConfigProperties = applicationContext.getBean(GcpConfigProperties)
        def pubSubConfigProperties = applicationContext.getBean(PubSubConfigProperties)

        def channel = ManagedChannelBuilder.forTarget("dns:///" + emulatorHost)
                .usePlaintext()
                .build()

        def channelProvider =
                FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))

        // START creating topic

        def topicAdminClient =
                TopicAdminClient.create(
                        TopicAdminSettings.newBuilder()
                                .setCredentialsProvider(NoCredentialsProvider.create())
                                .setTransportChannelProvider(channelProvider)
                                .build())

        def topic = TopicName.of(
                gcpConfigProperties.getProjectId(),
                pubSubConfigProperties.getTopicName())

        try {
            topicAdminClient.createTopic(topic)
        } catch (AlreadyExistsException) {
            // this is fine, already created
            topicAdminClient.getTopic(topic)
        }

        // START creating subscription

        pubSubConfigProperties.getSubscriptionNames().forEach(it -> {
            def subscription =
                    ProjectSubscriptionName.of(gcpConfigProperties.getProjectId(), it)

            def subscriptionAdminClient =
                    SubscriptionAdminClient.create(
                            SubscriptionAdminSettings.newBuilder()
                                    .setTransportChannelProvider(channelProvider)
                                    .setCredentialsProvider(NoCredentialsProvider.create())
                                    .build())

            try {
                subscriptionAdminClient
                        .createSubscription(
                                subscription,
                                topic,
                                PushConfig.getDefaultInstance(),
                                100)

                System.out.println("Subscription created " + subscriptionAdminClient.getSubscription(subscription))
            } catch (AlreadyExistsException) {
                // this is fine, already created
                subscriptionAdminClient.getSubscription(subscription)
            }
        })

        channel.shutdown()

        // stop the temporary applicationContext
        applicationContext.stop()

        // start the actual applicationContext
        embeddedServer = ApplicationContext.run(
                EmbeddedServer,
                [
                        'spec.name'           : "PubSubEmulatorSpec",
                        "pubsub.emulator.host": emulatorHost
                ],
                environments)

        applicationContext = embeddedServer.applicationContext
    }
}

Then a factory class (Groovy) for mocking credentials

package no.myproject.pubsub

import com.google.auth.oauth2.AccessToken
import com.google.auth.oauth2.GoogleCredentials
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires

import javax.inject.Singleton


@Factory
@Requires(property = 'spec.name', value = 'PubSubEmulatorSpec')
class EmptyCredentialsFactory {

    @Singleton
    @Replaces(GoogleCredentials)
    GoogleCredentials mockCredentials() {
        return GoogleCredentials.create(new AccessToken("", new Date()))
    }
}

And finally, a Spock test spec.

package no.myproject.pubsub

import no.myproject.testframework.PubSubSpecification

import java.util.stream.IntStream

class PubSubIntegrationSpec extends PubSubSpecification {

    def NUMBER_OF_MESSAGES_IN_TEST = 5
    def DELAY_IN_MILLISECONDS_PER_MSG = 100

    def "when a number of messages is sent, same amount of messages is received"() {
        given:
        def documentPublisher = applicationContext.getBean(DocumentPublisher)
        def listener = applicationContext.getBean(IncomingDocListenerWithAck)
        def initialReceiveCount = listener.getReceiveCount()

        when:
        IntStream.rangeClosed(1, NUMBER_OF_MESSAGES_IN_TEST)
                .forEach(it -> documentPublisher.send("Hello World!"))

        // wait a bit in order to let all messages propagate through the queue
        Thread.sleep(NUMBER_OF_MESSAGES_IN_TEST * DELAY_IN_MILLISECONDS_PER_MSG)

        then:
        NUMBER_OF_MESSAGES_IN_TEST == listener.getReceiveCount() - initialReceiveCount
    }
}
0
On

The chosen answer is a good deal more complicated than necessary, and it also contains numerous typos. A better answer can be found via the Micronaut GCP codebase itself, with the key bit being:

class IntegrationTestSpec extends Specification {

    static CONTAINER_PORT = -1
    static CredentialsProvider CREDENTIALS_PROVIDER
    static TransportChannelProvider TRANSPORT_CHANNEL_PROVIDER
    static PubSubResourceAdmin pubSubResourceAdmin

    static GenericContainer pubSubContainer = new GenericContainer("google/cloud-sdk:292.0.0")
        .withCommand("gcloud", "beta", "emulators", "pubsub", "start", "--project=test-project",
                "--host-port=0.0.0.0:8085")
        .withExposedPorts(8085)
        .waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*Server started, listening on.*"))

    static {
        pubSubContainer.start()
        CONTAINER_PORT = pubSubContainer.getMappedPort(8085)
        CREDENTIALS_PROVIDER = NoCredentialsProvider.create()
        def host = "localhost:" + IntegrationTest.CONTAINER_PORT
        ManagedChannel channel = ManagedChannelBuilder.forTarget(host).usePlaintext().build()
        TRANSPORT_CHANNEL_PROVIDER =
                FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
        pubSubResourceAdmin = new PubSubResourceAdmin(TRANSPORT_CHANNEL_PROVIDER, CREDENTIALS_PROVIDER)
    }
}

You'd then just extend that class anywhere you wanted to make use of PubSub. The following is slightly cleaner example that I came up with which manages creating a topic as well during test startup:

@Slf4j
abstract class PubSubSpec extends Specification implements TestPropertyProvider {

    static final String cloudSdkName = System.getenv('CLOUD_SDK_IMAGE') ?: "gcr.io/google.com/cloudsdktool/cloud-sdk:emulators"
    static final DockerImageName cloudSdkImage = DockerImageName.parse(cloudSdkName)
    static final PubSubEmulatorContainer pubsubEmulator = new PubSubEmulatorContainer(cloudSdkImage)

    static {
        pubsubEmulator.start()
        ManagedChannel channel = ManagedChannelBuilder.forTarget(pubsubEmulator.getEmulatorEndpoint()).usePlaintext().build()
        try {
            TransportChannelProvider channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
            CredentialsProvider credentialsProvider = NoCredentialsProvider.create()
            TopicAdminClient topicClient = TopicAdminClient.create(
                TopicAdminSettings.newBuilder()
                    .setTransportChannelProvider(channelProvider)
                    .setCredentialsProvider(credentialsProvider)
                    .build()
            )
            TopicName topicName = TopicName.of("project-id", "project-topic")
            topicClient.createTopic(topicName)
        } finally {
            channel.shutdown()
        }
    }

    @Override
    Map<String, String> getProperties() {
        [
            "pubsub.emulator.host": pubsubEmulator.getEmulatorEndpoint()
        ]
    }
}