I am using to build an application which gets data from kinesis data stream and sending that data to an external database using endpoint. I used python table API to achieve this, When i run the following code it works in my local system. As i had mutliple jar files, i had combine them together as a fat jar to read the dependency. however if i deploy the application in aws managed apache flink. the job is not running. what could be the potential issue? The cloud watch logs weren't that conclusive.
tfrom pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json
# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
statement_set = table_env.create_statement_set()
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda
is_local = (
True if os.environ.get("IS_LOCAL") else False
) # set this env var in your local environment
if is_local:
# only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///" + CURRENT_DIR + "/lib/mydep.jar",
)
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
def create_source_table(table_name, stream_name, region, stream_initpos):
"""
Defines the Kinesis source table schema and configuration.
"""
return f"""
CREATE TABLE {table_name} (
Trial_ID VARCHAR(255),
PC_Time_s DOUBLE,
Arduino_Time_ms BIGINT,
Resistivity_ohm DOUBLE,
Temperature_C DOUBLE,
PWM_value BIGINT
)
PARTITIONED BY (Trial_ID)
WITH (
'connector' = 'kinesis',
'stream' = '{stream_name}',
'aws.region' = '{region}',
'scan.stream.initpos' = '{stream_initpos}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
"""
def create_jdbc_sink_table(table_name, url, username, password):
return f"""
CREATE TABLE IF NOT EXISTS {table_name} (
Trial_ID VARCHAR(255),
PC_Time_s DOUBLE,
Arduino_Time_ms BIGINT,
Resistivity_ohm DOUBLE,
Temperature_C DOUBLE,
PWM_value BIGINT
) WITH (
'connector' = 'jdbc',
'url' = '{url}',
'table-name' = '{table_name}',
'username' = '{username}',
'password' = '{password}'
)
"""
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
# Table names
input_table_name = "input_table"
output_table_name = "trainingdata"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
# get application properties
props = get_application_properties()
input_property_map = property_map(props, input_property_group_key)
# Replace with your MySQL connection details
DATABASE_URL = "jdbc:mysql://***string**/db_name"
DATABASE_USERNAME = "username"
DATABASE_PASSWORD = "password"
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
# 4. Create a Kinesis source table
table_env.execute_sql(
create_source_table(input_table_name, input_stream, input_region, stream_initpos)
)
# 5. Create a JDBC sink table for MySQL database
table_env.execute_sql(
create_jdbc_sink_table(output_table_name, DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD)
)
# 6. Insert data from Kinesis stream to MySQL table
table_result = table_env.execute_sql(
" INSERT INTO {0} SELECT * FROM {1}".format(output_table_name, input_table_name)
)
# 7. Wait for job completion (local mode) or print job status
if is_local:
table_result.wait()
else:
# get job status through TableResult
print(table_result.get_job_client().get_job_status())
if __name__ == "__main__":
main()
I tried using fat jar and uploaded in to s3 as a zip of whole python dir, and uploaded to apache flink it is not working. However if i run the code form github here. it works.