pg8000 copy from CSV

2.1k Views Asked by At

I'm using pg8000 on an App Engine flask application so that I can be able to process a CSV file and insert it into a PSQL instance (hosted on AZURE).

Why am I using pg8000 and not psycopg2? -> Because app engine doesn't support psycopg2.

So far, the docs of pg8000 don't state a function that will do this like the one psycopg2 has. I haven't found an example that achieves this on SO or any other place, including the docs.

Anyone knows if this is possible?

3

There are 3 best solutions below

4
On BEST ANSWER

Looking at the source code, there does not seem to be a way to directly import CSVs, nor does the code appear to have any built-in wrapper around INSERT queries, making it possible to

You do have the option of manually using a CSV reader and using executemany:

import csv
import pg8000

conn = pg8000.connect(user="postgres", password="C.P.Snow")
cursor = conn.cursor()

command = 'INSERT INTO book (title) VALUES (%s), (%s) RETURNING id, title'
with open('my-data.csv', 'rb') as fl:
    data = list(csv.reader(fl))
    conn.executemany(command, data)

As a word of caution, depending on the size of your data, it may be better to use islice:

with open('my-data.csv', 'rb') as fl:
    reader = csv.reader(fl)
    slice = itertool.islice(reader, 100)
    while slice:
        conn.executemany(command, slice)
        slice = itertool.islice(reader, 100)
0
On

All these methods were too slow for me. When working with large data, it seems like the COPY command works really well. This runs in a couple of seconds VS the 10 minutes from other methods.

# Import pandas and sqlalchemy libraries
import pandas as pd
import sqlalchemy

# Create a database connection pool
pool = sqlalchemy.create_engine() # Fill with login/pg8000 connector

# Read data into dataframe from a csv file
df = pd.read_csv('data.csv')

# Alternatively, create a dataframe from a dictionary of data
# df = pd.Dataframe(data)

# Create an empty buffer to store the csv data
buffer = StringIO()

# Write the dataframe to the buffer without the index column
df.to_csv(buffer, index=False)

# Open a raw connection to the database
connPG8K = pool.raw_connection()

# Create a cursor object to execute queries
cursor = connPG8K.cursor()

# Reset the buffer position to the beginning
buffer.seek(0)

# Copy the data from the buffer to the database table using the csv format and header option. 
# Table must exist. Will overwrite that table.
cursor.execute('COPY "OverwriteTable" FROM STDIN WITH (FORMAT csv, HEADER);', stream=buffer)

# Commit the changes to the database
connPG8K.commit()

0
On

As suggested in another question here, you could use the next method before applying the logic on the csv files and before using the csv read method.

Sorry in advance for not inserting as a complement to the previous answer, but I don't have enough points to do so.

I'm having the same issue and I solved the problem using the below. Please notice that for me, the correct way of executing many is on cursor object, not on the conn.

conn = pg8000.connect(user='username', password='password', host='host', port=5432, database='database name')
cursor = conn.cursor()

command = "INSERT INTO public.salesforce_accounts (field1, field2, field3, field4, field5, field6) VALUES (%s, %s, %s, %s, %s, %s)"
with open('test.csv', 'r') as file:
    next(file)
    data = list(csv.reader(file))
    cursor.executemany(command, data)