PubsubMessage and MessageMetadata not found in pubsub lite receving message

527 Views Asked by At


i am following below documentation of PubSub lite and trying to recieving message which i have publish using below code. Getting error in callback function.
https://cloud.google.com/pubsub/lite/docs/subscribing
Publishing message

#code for publishing message which runs fine
import json
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
project_number = 97466088
cloud_region = "us-west1"
zone_id = "b"
topic_id = "mytestopic"
# num_messages = 100

location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    f = open('fivekey_10000.json')
    data = json.load(f)
    #file = open("data_new_file.txt", "r")
    #file1 = file.read().split("\n")
    for message in data.items():
        data = message[0].encode("utf-8")
        ordering_key = message[1]
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data=data, ordering_key=ordering_key)
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Published messages with ordering keys to {topic_path}."
)
1

There are 1 best solutions below

0
On

I tried publishing messages to the topic using ordering keys as per this documentation.

To receive the messages from subscription you need to request messages from Lite subscription. This can be done through the gcloud command line or any client library. Do note that Pub/Sub lite topics and subscriptions are zonal resources ie. they need to be in the same zone and project.

While receiving the messages, the callback function takes Pubsub Message as it’s only argument and acknowledges the message displaying the message data and metadata respectively.

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

If you’re still facing issues please provide more information on the error along with the traceback.