I am trying to insert .csv files located into gcp buckets into my Mysql database which is a cloud SQL instance I have set up in google cloud platform. It runs flawlessly on my local machine (When I run the notebook I have created) but when I try to run it on Apache Airflow, I got these errors from the logs:

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 181, in execute
    return_value = self.execute_callable()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 198, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/workflow_test2.py", line 482, in split_data_to_MySql_db
    conn = engine.connect()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3325, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3404, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3374, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2208, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3371, in _wrap_pool_connect
    return fn()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 327, in connect
    return _ConnectionFairy._checkout(self)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 894, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 493, in checkout
    rec = pool._do_get()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 146, in _do_get
    self._dec_overflow()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 143, in _do_get
    return self._create_connection()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 273, in _create_connection
    return _ConnectionRecord(self)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 388, in __init__
    self.__connect()
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 691, in __connect
    pool.logger.debug("Error on connect(): %s", e)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 686, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/create.py", line 574, in connect
    return dialect.connect(*cargs, **cparams)
  File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/pooling.py", line 323, in connect
    return MySQLConnection(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/connection.py", line 173, in __init__
    self.connect(**kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/abstracts.py", line 1363, in connect
    self._open_connection()
  File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/connection.py", line 353, in _open_connection
    self._socket.open_connection()
  File "/opt/python3.8/lib/python3.8/site-packages/mysql/connector/network.py", line 760, in open_connection
    raise InterfaceError(
sqlalchemy.exc.InterfaceError: (mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on '35.188.119.171:3306' (110 Connection timed out)
(Background on this error at: https://sqlalche.me/e/14/rvf5)
[2024-01-17, 09:32:47 CET] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=poke_dag, task_id=split_data_to_MySql_db_task, execution_date=20240117T081250, start_date=20240117T083032, end_date=20240117T083247
[2024-01-17, 09:32:47 CET] {standard_task_runner.py:104} ERROR - Failed to execute job 502 for task split_data_to_MySql_db_task ((mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on '35.188.119.171:3306' (110 Connection timed out)
(Background on this error at: https://sqlalche.me/e/14/rvf5); 908)
[2024-01-17, 09:32:47 CET] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2024-01-17, 09:32:47 CET] {taskinstance.py:2656} INFO - 0 downstream tasks scheduled from follow-on schedule check

I know that the connection cannot be established between my db and airflow but I can't understand why. More over here is my code to export the data from the .csv files:

def split_data_to_MySql_db():

    from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey,text
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker


     # Récupération de ebay_sales_data
    object_name = 'split_data/ebay_sales_data.csv'
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(object_name)
    content = blob.download_as_text()
    ebay_sales_data_df = pd.read_csv(StringIO(content)) 

     # Récupération de ebay_seller_data
    object_name = 'split_data/ebay_seller_data.csv'
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(object_name)
    content = blob.download_as_text()
    ebay_seller_df = pd.read_csv(StringIO(content))  

     # Récupération de pokemon_card_data
    object_name = 'split_data/pokemon_card_data.csv'
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(object_name)
    content = blob.download_as_text()
    pokemon_card_df = pd.read_csv(StringIO(content)) 

    # Déclaration des paramètres nécessaires pour la connexion à la base de données
    DB_USER = '<USERNAME>'
    DB_PASSWORD = '<PASSWORD>'
    DB_HOST = '<HOST>'
    DB_PORT = '<PORT>'
    DB_NAME = '<DATABASE>'

    # Créez une connexion à la base de données MySQL
    engine = create_engine(f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}', echo=True) 

    # Créez une session pour insérer les données
    Session = sessionmaker(bind=engine)
    session = Session()

    # Obtenez une connexion de l'engine
    conn = engine.connect()

    # Désactivez temporairement la vérification des clés étrangères
    conn.execute(text('SET FOREIGN_KEY_CHECKS=0;'))

    # Insérez les données des DataFrames dans la base de données
    ebay_seller_df.to_sql('ebay_seller', con=engine, if_exists='append', index=False)
    pokemon_card_df.to_sql('pokemon_card', con=engine, if_exists='append', index=False)
    ebay_sales_data_df.to_sql('ebay_sales_data', con=engine, if_exists='append', index=False)


    # Réactivez la vérification des clés étrangères
    conn.execute(text('SET FOREIGN_KEY_CHECKS=1;'))

    # Fermez la session et la connexion à la base de données
    session.close()
    conn.close()
    engine.dispose() 

N.B. : I am using this function in my DAGs task as a PythonOperator and not a sqloperator

Thank you for your help !

Because my python codes is running in the cloud and on the airflow web server, I wanted to allow connections to the Cloud SQL for the airflow web server ip but I cannot find it. It might not be the good solutions tho.

1

There are 1 best solutions below

5
Jack Wotherspoon On

As one of the comments above mentions, you can leverage the Cloud SQL Python Connector package to connect without needing to allowlist IP addresses and for default secure connections.

Basic usage looks as follows:

from google.cloud.sql.connector import Connector
import sqlalchemy

# initialize Connector object
connector = Connector()

# function to return the database connection
def getconn():
    conn = connector.connect(
        "project:region:instance",
        "pymysql",
        user="my-user",
        password="my-password",
        db="my-db-name"
    )
    return conn

# create connection pool
engine = sqlalchemy.create_engine(
    "mysql+pymysql://",
    creator=getconn,
)

connector.close()

One thing I want to point out about your above code (besides needing to change your secrets as you have leaked them) is that SQLAlchemy has two different APIs, SQLAlchemy Core and SQLAlchemy ORM which you seem to be using both. You rarely ever want to use both APIs and this post does a good job explaining the difference between the two. It looks as though you just want a simple query builder for your use-case so you can most likely remove all the Session ORM code from your code.