Integration test with cloud pubsub emulator. Sending and Receiving from the same code block

1.2k Views Asked by At

I've been trying to test the interactions with Cloud PubSub using the emulator. It publishes the message to the topic but the receiver doesn't get triggered. Here's the code workthrough:

func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
    t.Fatal(err)
}

cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()

var messageRecieved int32

sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
    t.Log(m.Data)
    atomic.AddInt32(&messageRecieved, 1)
    m.Ack()
})

topic.Publish(ctx, &pubsub.Message{
    Data: []byte("Hello World"),
})

time.Sleep(5 * time.Second)

t.Log(messageRecieved)
if messageRecieved != 1 {
    t.Fatal("Message was never sent")
}
}

This is also the code for creating the topic and subscription:

func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string) 
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
    return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
    return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}

// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
    Topic:            topic,
    AckDeadline:      10 * time.Second,
    ExpirationPolicy: time.Duration(0),
})
if err != nil {
    return nil, nil, err
}

  return topic, sub, nil
}

Am currently trying to send the message from a different program to see whether it gets triggered.

1

There are 1 best solutions below

0
On

Am sorry I didn't update this question early. I found that the problem was caused by the pointer to the subscription. It wasn't listening for messages. I needed to create a new pointer to the subscription that will listen for changes.

Here's the concept

// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
    Topic:            topic,
    AckDeadline:      10 * time.Second,
    ExpirationPolicy: time.Duration(0),
})
if err != nil {
    return nil, nil, err
}
...
// This subscription won't work for some reason
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
   t.Log(m.Data)
   atomic.AddInt32(&messageRecieved, 1)
   m.Ack()
})

Instead, it should be implemented first be created then listened to with a new pointer.

client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})

// This subscription would be able to receive messages
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
   t.Log(m.Data)
   atomic.AddInt32(&messageRecieved, 1)
   m.Ack()
})