Twint to Kafka -> Dont receive any events in my topic

103 Views Asked by At

i'm trying to load some tweets via TWINT into a kafka topic with the confluent-kafka[avro] Producer. I don't get any errors but my topic wont receive any events from twint. I even get succes msg, when debugging(with try and except).

My Code:

import twint
import sys
import json
from time import sleep
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

# Define Avro schema
value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
         { "name": "id",        "type": "long"   },
         { "name": "tweet",     "type": "string" },
         { "name": "datetime",  "type": "string" },
         { "name": "username",  "type": "string" },
         { "name": "user_id",   "type": "long"   },
         { "name": "name",      "type": "string" }

   ]
}
"""
key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

kafka_broker = 'host.docker.internal:9092'
schema_registry = 'http://host.docker.internal:8081'

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)

producer = AvroProducer({
    'bootstrap.servers': kafka_broker,
    'schema.registry.url': schema_registry
    }, default_key_schema=key_schema, default_value_schema=value_schema)


module = sys.modules["twint.storage.write"]
   

def Json(obj, config):
    
    tweet = obj.__dict__
    tweet_new = {}
    tweet_new['id'] = tweet['id']
    tweet_new['tweet'] = tweet['tweet']
    tweet_new['datetime'] = tweet['datetime']
    tweet_new['username'] = tweet['username']
    tweet_new['user_id'] = tweet['user_id']
    tweet_new['name'] = tweet['name']
    print(tweet_new)
    try:
        producer.produce(topic='tweets_test', key={"name": "Key"}, value=tweet_new)
    except Exception as e:
        print(f"Exception while producing record value - {tweet_new} to topic - tweets_test: {e}")
    else:
        print(f"Successfully producing record value - {tweet_new} to topic - tweets_test")
    try:
        producer.flush()
    except Exception as e:
        print(f"Exception while flush record value - {tweet_new} to topic - tweets_test: {e}")
    else:
        print(f"Successfully flushed record value - {tweet_new} to topic - tweets_test")


module.Json = Json

c2 = twint.Config()
c2.Search = "corona OR regen OR \"stark regen\" OR \"sturm\" OR überschwemmung OR landunter OR @hagel OR @regen OR @sturm OR flut"
c2.Store_json = True
c2.Custom["user"] = ["id", "tweet", "user_id", "username", "hashtags"]
c2.User_full = True
c2.Output = "tweets.json"
c2.Since = '2019-05-20'
c2.Hide_output = True
twint.run.Search(c2)

When i run it i get the following output:

    {'id': 1513818741057937408, 'tweet': 'RKI: Bundesweite Sieben-Tage-Inzidenz steigt leicht auf 1087 | #corona #rki #test ', 'datetime': '2022-04-12 09:58:07 UTC', 'username': 'flashupde', 'user_id': 1179376986516606978, 'name': 'FLASH UP'}
Successfully producing record value - {'id': 1513818741057937408, 'tweet': 'RKI: Bundesweite Sieben-Tage-Inzidenz steigt leicht auf 1087 | #corona #rki #test  ', 'datetime': '2022-04-12 09:58:07 UTC', 'username': 'flashupde', 'user_id': 1179376986516606978, 'name': 'FLASH UP'} to topic - tweets_test
Successfully flushed record value - {'id': 1513818741057937408, 'tweet': 'RKI: Bundesweite Sieben-Tage-Inzidenz steigt leicht auf 1087 | #corona #rki #test  ', 'datetime': '2022-04-12 09:58:07 UTC', 'username': 'flashupde', 'user_id': 1179376986516606978, 'name': 'FLASH UP'} to topic - tweets_test

Any help how i can debug it better or any advice would be awesome.

0

There are 0 best solutions below