How to create a DGS GraphQL Subscription to an ActiveMQ Topic

837 Views Asked by At

I have a bit of a complicated technology stack. I am leveraging Netflix DGS to provide a GraphQL service. Behind the scenes are a bunch of JMS components sending and receiving data from various services. I have everything working outside of a GraphQL subscription.

Specifically what I am trying to do is is create a GraphQL subscription for messages from an ActiveMQ topic.

So I have a SubscriptionDataFetcher as follows:

@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {

    private final Publisher<SurveyResult> surveyResultsReactiveSource;

    @Autowired
    public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
        this.surveyResultsReactiveSource = surveyResultsReactiveSource;
    }

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        return surveyResultsReactiveSource;
    }
}

Inside my Spring configuration, I am using the following Spring Integration Flow:

@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
    SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

    return Flux.from(
            IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .log()
                    .toReactivePublisher())
            .map((message) -> converter.fromMessage(message, SurveyResult.class));
}

I will say a few things:

  1. I have a separate @JmsListener that is receiving these messages off the topic
  2. I do not see more than one consumer, even after a web socket connection is established.
  3. If I hook up a Mongo Reactive Spring Data Repository to this GraphQL subscription, data is received by the client.

When I connect the client to the subscription, I see the following logs:

PublishSubscribeChannel      : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler          : Subscription started for 1

I suspect that the message listener container isn't activated when the web socket connection is established. Am I supposed to "activate" the channel adapter? What am I missing?

Tech Stack:

    // spring boots - version 2.4.3
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-activemq"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
    implementation "org.springframework.boot:spring-boot-starter-integration"
    implementation 'org.springframework.boot:spring-boot-starter-security'

    // spring integration
    implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'

    // dgs
    implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
    implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'

Update 1:

For what its worth, if I update the subscription to the following I get results on the client side.

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        // repository is a ReactiveMongoRepository
        return repository.findAll();
    }

Update 2:

This is the finalized bean in case it helps someone out based on accepted solution. I needed the listener on a topic, not a queue.

    @Bean
    public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
        SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
                Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
                .jmsMessageConverter(converter))
                .toReactivePublisher();
    }
1

There are 1 best solutions below

2
On BEST ANSWER

Your problem is here:

return Flux.from(
        IntegrationFlows.from(

And the framework just doesn't see that inner IntegrationFlow instance to parse and register properly.

To make it working you need to consider to declare that IntegrationFlow as a top-level bean.

Something like this:

@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {  
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .transform([transform to SurveyResult])
                    .toReactivePublisher();
}

Now the framework knows that this logical IntegrationFlow container has to be parsed and all the beans have to be registered and started.

You probably need to rethink your SurveyResultMessageConverter logic to a plain transform() if you can't supply a Jms.messageDrivenChannelAdapter with your converter.

Then in your SurveyResultsSubscriptionDataFetcher you just need to have:

 return surveyResultsReactiveSource.map(Message::getPayload);