I've been trying to test the interactions with Cloud PubSub using the emulator. It publishes the message to the topic but the receiver doesn't get triggered. Here's the code workthrough:
func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
t.Fatal(err)
}
cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var messageRecieved int32
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
topic.Publish(ctx, &pubsub.Message{
Data: []byte("Hello World"),
})
time.Sleep(5 * time.Second)
t.Log(messageRecieved)
if messageRecieved != 1 {
t.Fatal("Message was never sent")
}
}
This is also the code for creating the topic and subscription:
func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string)
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}
// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
return topic, sub, nil
}
Am currently trying to send the message from a different program to see whether it gets triggered.
Am sorry I didn't update this question early. I found that the problem was caused by the pointer to the subscription. It wasn't listening for messages. I needed to create a new pointer to the subscription that will listen for changes.
Here's the concept
Instead, it should be implemented first be created then listened to with a new pointer.