Integrating Confluent Schema Registry with Apache Atlas

659 Views Asked by At

Problem Definition

I am trying to integrate the data which exists in Confluent Schema Registry with Apache Atlas. For this purpose I have seen lots of links, they also talk about its possibility but they didn't give any technical information of how this integration was done.

Question

Would anyone help me to import the data (also metadata) from Schema Registry to Apache Atlas real-time? Is there any hook, even-listener or something like this to implement it?

Example

Here is what I have from Schema Registry:

{
   "subject":"order-value",
   "version":1,
   "id":101,
   "schema":"{\"type\":\"record\",\"name\":\"cart_closed\",\"namespace\":\"com.akbar.avro\",\"fields\":[{\"name\":\"_g\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"_s\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"_u\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"application_version\",\"type\":[\"int\",\"null\"],\"default\":null},{\"name\":\"client_time\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"event_fingerprint\",\"type\":[\"string\",\"null\"],\"default\":null},{\"name\":\"os\",\"type\":[\"string\",\"null\"],\"default\":null},{\"name\":\"php_session_id\",\"type\":[\"string\",\"null\"],\"default\":null},{\"name\":\"platform\",\"type\":[\"string\",\"null\"],\"default\":null},{\"name\":\"server_time\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"site\",\"type\":[\"string\",\"null\"],\"default\":null},{\"name\":\"user_agent\",\"type\":[\"string\",\"null\"],\"default\":null},{\"name\":\"payment_method_id\",\"type\":[\"int\",\"null\"],\"default\":null},{\"name\":\"page_view\",\"type\":[\"boolean\",\"null\"],\"default\":null},{\"name\":\"items\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"item\",\"fields\":[{\"name\":\"brand_id\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"category_id\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"discount\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"order_item_id\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"price\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"product_id\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"quantity\",\"type\":[\"int\",\"null\"],\"default\":null},{\"name\":\"seller_id\",\"type\":[\"long\",\"null\"],\"default\":null},{\"name\":\"variant_id\",\"type\":[\"long\",\"null\"],\"default\":null}]}}},{\"name\":\"cart_id\",\"type\":[\"long\",\"null\"],\"default\":null}]}"
}

How to import it in Apache Atlas?

What I have done

I checked the schema registry documentation in which it has the following architecture:

schema registry architecture

So I decided to set the Kafka url but I didn't find any where to set the Kafka configuration. I tried to change the atlas.kafka.bootstrap.servers variable in atlas-application.properties. I have also tried to call import-kafka.sh from hook-bin directory but it wasn't successful.

Error log

2021-04-25 15:48:34,162 ERROR - [main:] ~ Thread Thread[main,5,main] died (NIOServerCnxnFactory$1:92)
org.apache.atlas.exception.AtlasBaseException: EmbeddedServer.Start: failed!
    at org.apache.atlas.web.service.EmbeddedServer.start(EmbeddedServer.java:115)
    at org.apache.atlas.Atlas.main(Atlas.java:133)
Caused by: java.lang.NullPointerException
    at org.apache.atlas.util.BeanUtil.getBean(BeanUtil.java:36)
    at org.apache.atlas.web.service.EmbeddedServer.auditServerStatus(EmbeddedServer.java:128)
    at org.apache.atlas.web.service.EmbeddedServer.start(EmbeddedServer.java:111)
    ... 1 more

0

There are 0 best solutions below