I am trying to insert .csv files located into gcp buckets into my Mysql database which is a cloud SQL instance I have set up in google cloud platform. It runs flawlessly on my local machine (When I run the notebook I have created) but when I try to run it on Apache Airflow, I got these errors from the logs:
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 181, in execute
return_value = self.execute_callable()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 198, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/workflow_test2.py", line 482, in split_data_to_MySql_db
conn = engine.connect()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3325, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3404, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3374, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2208, in _handle_dbapi_exception_noconnection
util.raise_(
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3371, in _wrap_pool_connect
return fn()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 327, in connect
return _ConnectionFairy._checkout(self)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
rec = pool._do_get()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 146, in _do_get
self._dec_overflow()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 143, in _do_get
return self._create_connection()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
return _ConnectionRecord(self)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
self.__connect()
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 691, in __connect
pool.logger.debug("Error on connect(): %s", e)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/create.py", line 574, in connect
return dialect.connect(*cargs, **cparams)
File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/pooling.py", line 323, in connect
return MySQLConnection(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/connection.py", line 173, in __init__
self.connect(**kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/abstracts.py", line 1363, in connect
self._open_connection()
File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/connection.py", line 353, in _open_connection
self._socket.open_connection()
File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/network.py", line 760, in open_connection
raise InterfaceError(
sqlalchemy.exc.InterfaceError: (mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on '35.188.119.171:3306' (110 Connection timed out)
(Background on this error at: https://sqlalche.me/e/14/rvf5)
[2024-01-17, 09:32:47 CET] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=poke_dag, task_id=split_data_to_MySql_db_task, execution_date=20240117T081250, start_date=20240117T083032, end_date=20240117T083247
[2024-01-17, 09:32:47 CET] {standard_task_runner.py:104} ERROR - Failed to execute job 502 for task split_data_to_MySql_db_task ((mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on '35.188.119.171:3306' (110 Connection timed out)
(Background on this error at: https://sqlalche.me/e/14/rvf5); 908)
[2024-01-17, 09:32:47 CET] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2024-01-17, 09:32:47 CET] {taskinstance.py:2656} INFO - 0 downstream tasks scheduled from follow-on schedule check
I know that the connection cannot be established between my db and airflow but I can't understand why. More over here is my code to export the data from the .csv files:
def split_data_to_MySql_db():
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey,text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Récupération de ebay_sales_data
object_name = 'split_data/ebay_sales_data.csv'
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
content = blob.download_as_text()
ebay_sales_data_df = pd.read_csv(StringIO(content))
# Récupération de ebay_seller_data
object_name = 'split_data/ebay_seller_data.csv'
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
content = blob.download_as_text()
ebay_seller_df = pd.read_csv(StringIO(content))
# Récupération de pokemon_card_data
object_name = 'split_data/pokemon_card_data.csv'
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(object_name)
content = blob.download_as_text()
pokemon_card_df = pd.read_csv(StringIO(content))
# Déclaration des paramètres nécessaires pour la connexion à la base de données
DB_USER = '<USERNAME>'
DB_PASSWORD = '<PASSWORD>'
DB_HOST = '<HOST>'
DB_PORT = '<PORT>'
DB_NAME = '<DATABASE>'
# Créez une connexion à la base de données MySQL
engine = create_engine(f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}', echo=True)
# Créez une session pour insérer les données
Session = sessionmaker(bind=engine)
session = Session()
# Obtenez une connexion de l'engine
conn = engine.connect()
# Désactivez temporairement la vérification des clés étrangères
conn.execute(text('SET FOREIGN_KEY_CHECKS=0;'))
# Insérez les données des DataFrames dans la base de données
ebay_seller_df.to_sql('ebay_seller', con=engine, if_exists='append', index=False)
pokemon_card_df.to_sql('pokemon_card', con=engine, if_exists='append', index=False)
ebay_sales_data_df.to_sql('ebay_sales_data', con=engine, if_exists='append', index=False)
# Réactivez la vérification des clés étrangères
conn.execute(text('SET FOREIGN_KEY_CHECKS=1;'))
# Fermez la session et la connexion à la base de données
session.close()
conn.close()
engine.dispose()
N.B. : I am using this function in my DAGs task as a PythonOperator and not a sqloperator
Thank you for your help !
Because my python codes is running in the cloud and on the airflow web server, I wanted to allow connections to the Cloud SQL for the airflow web server ip but I cannot find it. It might not be the good solutions tho.
As one of the comments above mentions, you can leverage the Cloud SQL Python Connector package to connect without needing to allowlist IP addresses and for default secure connections.
Basic usage looks as follows:
One thing I want to point out about your above code (besides needing to change your secrets as you have leaked them) is that SQLAlchemy has two different APIs, SQLAlchemy Core and SQLAlchemy ORM which you seem to be using both. You rarely ever want to use both APIs and this post does a good job explaining the difference between the two. It looks as though you just want a simple query builder for your use-case so you can most likely remove all the
SessionORM code from your code.