How to SELECT and deserialize HEADER value in ksqlDB?

148 Views Asked by At

I have created this table

CREATE SOURCE TABLE locations (
    id VARCHAR PRIMARY KEY,
    meta ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
) WITH (
    KAFKA_TOPIC = 'locations',
    VALUE_FORMAT = 'JSON_SR',
    VALUE_SCHEMA_ID = 1
);

Then I produce data with headers:

kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--topic locations \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
--property parse.key=true \
--property parse.headers=true \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=1

x-correlationid:36d2784c-8233-11ee-b2ce-05ab766c73d2    asd {"profileId":"asd","latitude":0,"longitude":-1}

And I am able to see it in the table:

ksql> SELECT * FROM locations;
+------------------+------------------+------------------+------------------+------------------+
|ID                |META              |profileId         |latitude          |longitude         |
+------------------+------------------+------------------+------------------+------------------+
|asd               |[{KEY=x-correlatio|asd               |0.0               |-1.0              |
|                  |nid, VALUE=MzZkMjc|                  |                  |                  |
|                  |4NGMtODIzMy0xMWVlL|                  |                  |                  |
|                  |WIyY2UtMDVhYjc2NmM|                  |                  |                  |
|                  |3M2Qy}]           |                  |                  |                  |
Query terminated
ksql> describe locations;

Name                 : LOCATIONS
 Field     | Type
-----------------------------------------------------------------------
 META      | ARRAY<STRUCT<KEY VARCHAR(STRING), VALUE BYTES>> (headers)
 ID        | VARCHAR(STRING)  (primary key)
 profileId | VARCHAR(STRING)
 latitude  | DOUBLE
 longitude | DOUBLE
-----------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

My problems start when I want to access the META column

ksql> SELECT ID, META FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |META                                              |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |-1.0                                              |

META returns the values from column longitude?

Trying to access the data in column META does also not work

ksql> SELECT ID, META[0]->KEY FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KEY                                               |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, META[0]->KEY FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KEY                                               |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'ascii') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'base64') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'hex') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'utf8') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID                                                |KSQL_COL_0                                        |
+--------------------------------------------------+--------------------------------------------------+
|asd                                               |null                                              |
Query terminated

How can I use column META?

0

There are 0 best solutions below