I have created this table
CREATE SOURCE TABLE locations (
id VARCHAR PRIMARY KEY,
meta ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
) WITH (
KAFKA_TOPIC = 'locations',
VALUE_FORMAT = 'JSON_SR',
VALUE_SCHEMA_ID = 1
);
Then I produce data with headers:
kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--topic locations \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
--property parse.key=true \
--property parse.headers=true \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=1
x-correlationid:36d2784c-8233-11ee-b2ce-05ab766c73d2 asd {"profileId":"asd","latitude":0,"longitude":-1}
And I am able to see it in the table:
ksql> SELECT * FROM locations;
+------------------+------------------+------------------+------------------+------------------+
|ID |META |profileId |latitude |longitude |
+------------------+------------------+------------------+------------------+------------------+
|asd |[{KEY=x-correlatio|asd |0.0 |-1.0 |
| |nid, VALUE=MzZkMjc| | | |
| |4NGMtODIzMy0xMWVlL| | | |
| |WIyY2UtMDVhYjc2NmM| | | |
| |3M2Qy}] | | | |
Query terminated
ksql> describe locations;
Name : LOCATIONS
Field | Type
-----------------------------------------------------------------------
META | ARRAY<STRUCT<KEY VARCHAR(STRING), VALUE BYTES>> (headers)
ID | VARCHAR(STRING) (primary key)
profileId | VARCHAR(STRING)
latitude | DOUBLE
longitude | DOUBLE
-----------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
My problems start when I want to access the META
column
ksql> SELECT ID, META FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |META |
+--------------------------------------------------+--------------------------------------------------+
|asd |-1.0 |
META
returns the values from column longitude
?
Trying to access the data in column META
does also not work
ksql> SELECT ID, META[0]->KEY FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |KEY |
+--------------------------------------------------+--------------------------------------------------+
|asd |null |
Query terminated
ksql> SELECT ID, META[0]->KEY FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |KEY |
+--------------------------------------------------+--------------------------------------------------+
|asd |null |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'ascii') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |KSQL_COL_0 |
+--------------------------------------------------+--------------------------------------------------+
|asd |null |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'base64') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |KSQL_COL_0 |
+--------------------------------------------------+--------------------------------------------------+
|asd |null |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'hex') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |KSQL_COL_0 |
+--------------------------------------------------+--------------------------------------------------+
|asd |null |
Query terminated
ksql> SELECT ID, FROM_BYTES(META[0]->VALUE, 'utf8') FROM locations;
+--------------------------------------------------+--------------------------------------------------+
|ID |KSQL_COL_0 |
+--------------------------------------------------+--------------------------------------------------+
|asd |null |
Query terminated
How can I use column META
?