Deserializtion/Parsing error KafkaProtobuf Python

58 Views Asked by At

Serialization code (Go lang)

1. Producer

func NewProducer(kafkaBrokerURL string, kafkaSchemaRegistryUrl string) {
    producerConfig := getKafkaProducerConfig(config.EnvConfig)

    producer, err := confluent_kafka.NewProducer(producerConfig)
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Producer")
        log.Panicf("Unable to create Kafka Producer")
    }

    client, err := schemaregistry.NewClient(schemaregistry.NewConfig(kafkaSchemaRegistryUrl))
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Client")
        log.Panicf("Unable to create Kafka Client")
    }

    serializer, err := protobuf.NewSerializer(client, serde.ValueSerde, protobuf.NewSerializerConfig())
    if err != nil {
        log.WithFields(log.Fields{"err": err}).Error("Failed to create Kafka Serializer")
        log.Panicf("Unable to create Kafka Serializer")
    }

    KafkaProducerInstance = &KafkaProducer{
        producer:   producer,
        serializer: serializer,
    }

    log.Info("Created Kafka Producer and Serializer")
}

2. Sending Kafka Message

func producerHelper[kdt KafkaMesageDataTypes](message kdt, topicName string) {
    deliveryChan := make(chan confluent_kafka.Event)
    payload, err := KafkaProducerInstance.serializer.Serialize(topicName, &message)
    if err != nil {
        log.Errorf("Failed to serialize payload: %v\n", err)
        close(deliveryChan)
        return
    }

    err = KafkaProducerInstance.producer.Produce(&confluent_kafka.Message{
        TopicPartition: confluent_kafka.TopicPartition{Topic: &topicName, Partition: confluent_kafka.PartitionAny},
        Value:          payload,
    }, deliveryChan)

    if err != nil {
        log.Errorf("Failed to Produce: %v\n", err)
        close(deliveryChan)
        return
    }

    e := <-deliveryChan
    m := e.(*confluent_kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
        close(deliveryChan)
        return
    } else {
        log.Infof("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}

Trying to consumer the meesage (Diff, Application in Python)

from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2  # replace with your generated module name
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",  # Replace with your Kafka server address
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
from confluent_kafka import Consumer, KafkaError
import KafkaDiagnoseResult_pb2
from google.protobuf.message import DecodeError

# Kafka consumer configuration
conf = {
    'bootstrap.servers': "localhost:9092/v3/",
    'group.id': "myGroup",
    'auto.offset.reset': 'earliest'
}

# Create a consumer instance
consumer = Consumer(conf)

# Subscribe to a topic
consumer.subscribe(['diagnosis']) 
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Deserialize the message
        try:
            data = KafkaDiagnoseResult_pb2.KafkaDiagnoseRequest() 
            data.ParseFromString(msg.value())
        except DecodeError as e:
            print(f"Error parsing message: {e}")
            print(f"Raw message data: {msg.value()}")

        print("Received message: ", data)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Error

Error parsing message

I am trying to debug it but unable to.

  1. The proto file in both the applications is the same
  2. I have using proton to generate the pb2 file.

Your help is appreciated.

Thank you

I can get the message in raw format:

Raw Format message.

Raw message data: b'\x00\x00\x00\x00\x02\x02\x08\n$1775100a-1a47-48b2-93b7-b7a331be59b4\x12\tcompleted'

  • I have tried decoding it using UTF-8, it fails, as not all the fields are read.
   print(" Decode 1: ", dict_str)
   print("Decode 2: ", ast.literal_eval(dict_str))

Output from above code:

Unparsed Message:  b'\x00\x00\x00\x00\x02\x02\x08\n$ccb0ad7e-abb2-4af6-90d1-187381f9d47e\x12\tcompleted'
 Decode 1:  
$ccb0ad7e-abb2-4af6-90d1-187381f9d47e   completed
Inner Exception Here source code string cannot contain null bytes
2

There are 2 best solutions below

0
On BEST ANSWER

Your Go client is serializing with the Schema Registry meaning your Python code must do the same. The records are not "just Protobuf" since there's a schema ID also encoded in the bytes, therefore a regular Protobuf parser will fail.

There's example code in the repo for consuming Protobuf with the Registry integration

https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/protobuf_consumer.py

2
On

From the error message RE source code cannot contain null bytes, I'm going to assume the problem lies with the file your python code lives in, not the code itself. Did you write it on a Windows machine maybe?

Try this, from your command prompt:

python -c 'import sys; sys.stdout.write(sys.stdin.read().replace("\0", ""))' < filename.py > filename.new.py

Replace the filenames with the filename of your source code, and repeat for each python file.

This should remove any null bytes in your source files.