FAUST: AttributeError: 'NoneType' object has no attribute 'topic_partition'

957 Views Asked by At

i'm having some trouble to do a simple task with python faust, please take a look on the problem and see if you can help me.

Steps to reproduce

i used this code:

import faust

from settings import KAFKA_SERVER

app = faust.App('streams', broker=KAFKA_SERVER, producer_acks=0, store='rocksdb://')


class ProjetoRateio(faust.Record):
    codigoProjeto: str
    combinacao: str
    grade: str
    quantidade: int


projeto_rateio_topic = app.topic(
    'gua-poc-sent-rateio',
    # key_type=str,
    value_type=ProjetoRateio,
    # value_serializer='raw',
)

grade_total = app.Table('grade_total', default=int,
                        partitions=1)


@app.agent(projeto_rateio_topic)
async def projeto_rateio(rateios):
    async for rateio in rateios:
        # grade_total[f'{rateio.codigoProjeto}.{rateio.combinacao}.{rateio.grade}'] += rateio.quantidade
        grade_total[rateio.codigoProjeto] += rateio.quantidade

and got the error descibed on the title

Expected behavior

A kafka table populated

Actual behavior

Exception in callback Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)
handle: <Handle Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)>
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/topics.py", line 474, in _on_published
    message.message.callback(message)
  File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/tables/base.py", line 353, in _on_changelog_sent
    self.data.set_persisted_offset(res.topic_partition, res.offset)
AttributeError: 'NoneType' object has no attribute 'topic_partition'

Versions

  • Python version 3.8
  • Faust-streaming version 0.6.4
  • Operating system windows 10, Ubuntu 20.04 (through wsl)
  • Kafka version 2.0.0 (msk)
  • RocksDB version: 5.0

Am i doing something wrong?

1

There are 1 best solutions below

0
On

Having a similar problem, the problematic configuration is producer_acks=0, try setting it to either 1 or -1 based on the behaviour you want:

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

  • 0: Producer will not wait for any acknowledgment from the server at all. The message will immediately be considered sent (Not recommended).
  • 1: The broker leader will write the record to its local log but will respond without awaiting full acknowledgment from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
  • -1: The broker leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

This block was taken from faust-streaming github here.