Want to use Faust with Event Hub kafka

143 Views Asked by At

We are migrating from kafka to eventhub and as eventhub itself uses kafka in background, we are trying to use our existing code using python "faust" with some connection changes to connect to eventhub, but it is somehow not working.(we don't want to use new eventhub python framework)

Code for existing Kafka(not eventhub) connection:

import faust
KAFKA_HOST_LIST = "..."
app = faust.App(f'app',
                broker= KAFKA_HOST_LIST,
                stream_wait_empty=False,
                store='memory://', stream_buffer_maxsize=100000, loghandlers=log.handlers)

Code changes I tried for connecting eventhub kafka connection via python faust:

import faust
import ssl

EVENTHUB_NAMESPACE = "iothub-ns...667"
SASL_USERNAME = "$ConnectionString"
SASL_PASSWORD = f"Endpoint=sb://{EVENTHUB_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=...;EntityPath=..."

app = faust.App(
    'app',
    broker=f"kafka://{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093",
    broker_credentials=faust.SASLCredentials(
        username=SASL_USERNAME,
        password=SASL_PASSWORD,
        ssl_context=ssl.create_default_context()
        #stream_wait_empty=False, #if i use this i get an error
        #store='memory://', #if i use this i get an error
        #stream_buffer_maxsize=100000, #if i use this i get an error
        #loghandlers=log.handlers #if i use this i get an error
        )
)
print(app)

BATCH_SIZE = 200
TIME_FRAME = 5


@app.agent()
async def process(stream):
    try:
        print("Agent Started!")
        diagnostics_stream = stream.take(BATCH_SIZE, within=TIME_FRAME)
        async for diagnostic_chunk in diagnostics_stream:
            print(diagnostic_chunk)

I run the above code via "faust --datadir=/app -A scriptname -l info worker --web-port=6069 -f /dev/null" And above code automatically ends(instead of running in streaming) with below output:

    <App: <non-finalized> 0x7f13e4459f10>
┌ƒaµS† v1.7.3─┬───────────────────────────────────────────────────────────────────────────────────────┐
│ id          │ app                                                │
│ transport   │ [URL('kafka://EVENTHUB_NAMESPACE.servicebus.windows.net:9093')]  │
│ store       │ memory:                                                                               │
│ web         │ http://ABC123:6069                                                          │
│ log         │ /dev/null (info)                                                                      │
│ pid         │ 1853917                                                                               │
│ hostname    │ ABC123                                                                      │
│ platform    │ CPython 3.7.17 (Linux x86_64)                                                         │
│ drivers     │                                                                                       │
│   transport │ aiokafka=1.0.6                                                                        │
│   web       │ aiohttp=3.8.3                                                                         │
│ datadir     │ /app                                                                                  │
│ appdir      │ /app/v1                                                                               │
└─────────────┴───────────────────────────────────────────────────────────────────────────────────────┘

Taken reference from here: https://github.com/robinhood/faust/issues/483

Can anyone help me to connect to eventhub kafka using faust

0

There are 0 best solutions below