Postgres COPY stream using pg8000 (error : could not determine data type of parameter $1)

1.1k Views Asked by At

I am trying to implement a COPY statement to push a pandas dataframe over to a CloudSQL Postgres database in an Airflow DAG. I have the one limitation : I can only use the pg8000 driver. I am using this as a reference https://github.com/tlocke/pg8000#copy-from-and-to-a-file (which I found in this thread https://news.ycombinator.com/item?id=25402430)

Here is my code

    def getconn() -> pg8000.native.Connection:
        conn: pg8000.native.Connection = connector.connect(
            PG_CONFIG["host"],
            "pg8000",
            user=PG_CONFIG["user"],
            password=PG_CONFIG["password"],
            db=PG_CONFIG["database"]
        )
        return conn
    engine = sqlalchemy.create_engine("postgresql+pg8000://",creator=getconn)
    engine.dialect.description_encoding = None

    stream_in = StringIO()
    csv_writer = csv.writer(stream_in)
    csv_writer.writerow([1, "electron"])
    csv_writer.writerow([2, "muon"])
    csv_writer.writerow([3, "tau"])
    stream_in.seek(0)

    conn = engine.connect()
    conn.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")    
    conn.execute("COPY temp_table FROM STDIN WITH (FORMAT CSV)", stream=stream_in)

I have tried everything I can think of (using DELEMITER option, passing text instead of csv...) but I keep getting the below error "could not determine data type of parameter $1"

[SQL: COPY winappsx.aa FROM STDIN WITH (FORMAT CSV)]
[parameters: {'stream': <_io.StringIO object at 0x7f86a58d7dc8>}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)
Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/dbapi.py", line 454, in execute
    statement, vals=vals, input_oids=self._input_oids, stream=stream
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 632, in execute_unnamed
    self.handle_messages(context)
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 769, in handle_messages
    raise self.error
pg8000.exceptions.DatabaseError: {'S': 'ERROR', 'V': 'ERROR', 'C': '42P18', 'M': 'could not determine data type of parameter $1', 'F': 'postgres.c', 'L': '1363', 'R': 'exec_parse_message'}

I know the connection works because the table gets created properly. The error occurs on the COPY statement.

I suspect there is an issue in the way the stream parameter is provided but cannot find the correct syntax. This may help https://www.kite.com/python/docs/pg8000.Cursor.execute

Thank you for your help!

1

There are 1 best solutions below

0
On

A friend found the answer ;-)

Instead of a normal SQLAlchemy connection, we make one that uses the pg8000 APIs. This is from https://docs.sqlalchemy.org/en/13/core/connections.html#working-with-raw-dbapi-connections

Now that we have the pg8000 connection, I looked at this section of the pg8000 examples: https://github.com/tlocke/pg8000#copy-from-and-to-a-file-1 Make a cursor from the pg8000-conn, and then user the cursor.execute function. This connPG8K.cursor.execute() on line 120 uses pg8000 and will then be able to use the stream input in the function. The sqlAlchemy conn.execute didn't have a stream input option and was probably failing for this reason.

Here is the code:

stream_in = StringIO()
    csv_writer = csv.writer(stream_in)
    csv_writer.writerow([1, "electron"])
    csv_writer.writerow([2, "muon"])
    csv_writer.writerow([3, "tau"])
    csv_writer.writerow([4, "sean is the best"])
    stream_in.seek(0)
    
   # Creates a connection with sqlalchemy methods
    conn = engine.connect()
    # Get the connection from pg8000 library
    connPG8K = engine.raw_connection()
    # Get cursor from pg8000 to be able to run commands
    cursor = connPG8K.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")        
    cursor.execute("COPY temp_table FROM STDIN WITH (FORMAT csv, DELIMITER)", stream=stream_in)
    connPG8K.commit()