How to explicitly declare charset=utf8 for Airflow connections

5k Views Asked by At

This sequence:

from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)

produces the following:

UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)

This sequence works great:

from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)

The key is in explicitly declaring the charset. I have attempted to do this in airflow as follows with {"charset": "utf8"}:

enter image description here

But this has not fixed the error. I've restarted my dev environment since making the changes and the admin panel lets me know that the edit was successful. How can I work with Airflow connections to my charsets as utf8?

3

There are 3 best solutions below

1
On BEST ANSWER

I realised that this is a bug in Airflow and I have reported it here: https://issues.apache.org/jira/browse/AIRFLOW-4824

For now I have a workaround with the following code:

def get_uri(hook):
    conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
    login = ''
    if conn.login:
        login = '{conn.login}:{conn.password}@'.format(conn=conn)
    host = conn.host
    if conn.port is not None:
        host += ':{port}'.format(port=conn.port)
    charset = ''
    if conn.extra_dejson.get('charset', False):
        chrs = conn.extra_dejson["charset"]
        if chrs.lower() == 'utf8' or chrs.lower() == 'utf-8':
            charset = '?charset=utf8'
    return '{conn.conn_type}://{login}{host}/{conn.schema}{charset}'.format(
        conn=conn, login=login, host=host, charset=charset)

And then use it as follows:

url = get_uri(sql_hook)
from sqlalchemy import create_engine
engine = create_engine(url)

The real solution will be to send a pull request to the project overriding get_uri in mysql_hook.py.

0
On

I fixed the problem by the way and work well (edited in file airflow.cfg):

sql_alchemy_conn = mysql://user:password@host:port/airflow?charset=utf8
0
On
from sqlalchemy import create_engine
from airflow.hooks.mysql_hook import MySqlHook

conn = MySqlHook(mysql_conn_id='conn_id')
uri = conn.get_uri()
engine = create_engine(uri+'?charset=utf8')
df.to_sql('test_table', engine, if_exists='append', index=False)

I fixed the problem by the code above.