Accessing pubsublite message attributes in beam pipeline - Java

113 Views Asked by At

We have been using PubSubLite in our Go program without any issues and I just started using the Java library with Beam.

Using the PubSubLite IO, we get PCollection of SequencedMessage specifically: https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SequencedMessage

Now, from it I can get the data by doing something like:

message.getMessage().getData().toByteArray()

and then doing the normal conversion.

But for attributes, I cannot seem to get it correctly, just the value. In Go, I could do:

msg.Attributes["attrKey"]

but when I do:

message.getMessage().getAttributesMap().get("attrKey")

I am getting an Object which I cannot seem to convert to just string value of it. As far as I understand, it returns a Map<String, AttributeValues> and they all seem to be just wrapper over the internal protobuf. Also, Map is an interface so how do I get to the actual implementation to get the underlying value of each of the attribute.

1

There are 1 best solutions below

0
On

The SequencedMessage attributes represent a multimap of string to bytes, not a map of string to string like in standard Pub/Sub. In the go client, by default the client will error if there are multiple values for a given key or if any of the values is not valid UTF-8, and thus presents a map[string]string interface.

When you call message.getMessage().getAttributesMap().get("attrKey"), you have a value of type AttributeValues which is a holder for a list of ByteStrings. To convert this to a single String, you would need to throw if the list is not of length 1, then call toStringUtf8 on the byte string element with index 0.

If you wish to interact with the standard Pub/Sub message format like you would in go, you can convert to this format by doing:

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsublite.CloudPubsubTransforms;

PCollection<SequencedMessage> messages = ...
PCollection<PubsubMessage> transformed = messages.apply(CloudPubsubTransforms.toCloudPubsubMessages());