Producing avro message using php-enqueue

1.1k Views Asked by At

I'm researching a way of producing avro messages from php to kafka using php-enqueue.

Their documentation states that you can use other formats, including Apache Avro.

By default the transport serializes messages to json format but you might want to use another format such as Apache Avro. For that you have to implement Serializer interface and set it to the context, producer or consumer. If a serializer set to context it will be injected to all consumers and producers created by the context.

<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
    public function toMessage($string) {}

    public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$context->setSerializer(new FooSerializer());

The serializer in the example is converting to and from strings. Avro format is binary as far as I understand, so how is a custom serializer supposed to work in this case?

1

There are 1 best solutions below

0
On BEST ANSWER

Php strings can contain binary data. Here's a partial implementation of producing an avro message using a schema id that had been registered in schema registry. The serialization to avro is done using jaumo/avro implementation.

public function toString(RdKafkaMessage $message): string
{
    ...

    $message = json_decode($message->getBody(), true);

    $encodedHeader = $this->createAvroHeader($schemaId);
    $encodedMessage = Serde::encodeMessage($parsedSchema, $message);

    return $encodedHeader . $encodedMessage;
}

private function createAvroHeader(int $schemaId): string
{
    $binarySchemaId = hex2bin(sprintf("%08s", dechex($schemaId)));
    return pack("C", 0) . $binarySchemaId;
}