I am working in real time data processing for the access logs data of the user. It basically tracks the user attendance in the office.
In the kafka stream, whenever user will swipe the door, log event will be coming in the kafka and then after consuming that event I will check if user has accessed for the first time or not. If yes then attendance time will be marked, if user data is already present in the kafka then, I will just simply increase the count like how many times user accessed. This is the simple use case I am working on right now.
I am very new to Flink, so I have prepared the basic flink code in python.
import json
import os
from pyflink.common import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def my_map(obj):
json_obj = json.loads(json.loads(obj))
return json.dumps(json_obj["name"])
def kafkaread():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
jars_path = "/home/vishal/flink/flink-pipeline/flink-env3.10/flink_test_1/"
jar_files = [
"file:///" + jars_path + "flink-sql-connector-kafka-3.1.0-1.18.jar",
]
jar_files_str = ";".join(jar_files)
env.add_jars(jar_files_str)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().set('table.exec.source.idle-timeout', '1000 ms')
// with below code I am taking the latest data which is coming in kafka
create_table_sql = """
CREATE TABLE access_logs_source (
`name` STRING,
`door` STRING,
`access_time` TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'access_logs_topic_5',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-group-access_logs_5',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
"""
// It will fetch all the data which is present in kafka along with latest data so that I can check if latest data is present here or not
historical_table_sql = """
CREATE TABLE access_logs_historical (
`name` STRING,
`door` STRING,
`access_time` TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'access_logs_topic_5',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-group-access_logs_5',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
"""
# Execute the SQL command to create the table
t_env.execute_sql(create_table_sql)
t_env.execute_sql(historical_table_sql)
# Define the SQL command to select data from the created table
select_sql = "SELECT name FROM access_logs_source"
# Execute the SQL command to select data from the table and print the result
result_table = t_env.execute_sql(select_sql)
# result_table.print()
result_rows = result_table.collect()
// new log data of the user
name_to_filter = ''
for row in result_rows:
# print(row[0])
name_to_filter = row[0]
# break
print('-------', name_to_filter)
# result_rows.close()
// checking if new log data is present in the kafka topic or not
check_query = """
SELECT * from access_logs_historical where name = '{0}'
"""
check_query = check_query.format(name_to_filter)
print('++++++++++ ', check_query)
# t_env.execute_sql(check_query)
table = t_env.sql_query(check_query)
table_result = table.execute()
table_result.print()
# Execute the Flink job
# t_env.execute("Create and Select Table Job")
if __name__ == '__main__':
kafkaread()
But the problem is, it is not printing anything if data with different name is coming into the kafka.
Suppose I am sending this event to kafka my_data = { 'name': 'Bhuvi19', 'door': 'door 220', 'access_time': '2024-03-05 13:08:00' }
Whenever I am sending the data with name = 'Bhuvi19', then it is priting the data, but whenever I am sending with different name like 'Bhuvi20', then it is not showing anything. I thought whevever I will send the new data into the kafka then process will run again to check the steps I have mentioned, but with new data it is kind of hang there.
Can you please help me how to achieve this?
Thanks.