I am listening to n
different TCP streams and would like to copy
the received data into n
different tables after some simple post-processing. Latency is important, and using this approach sequentially I've managed to get insertions down to < 1 ms.
Using libpq
from Python, I've attempted to open n
different database connections with
db = ps_abst.PGconn(
ps_impl.PQconnectdb(f"user={settings.db.user}".encode(pg_encoding)))
from n
different threading.Thread
targets and simply put_copy_data(data)
the post-processed data after executing the relevant copy
query with db.exec_(query)
. ps_abst
and ps_impl
are imports from psycopg.
It seems, however, that this approach is incorrect as the copy
operation either fails silently or I receive a warning that an ongoing copy
operation is in progress. The TCP data can, on the other hand, be printed to stdout
without issues.
Does anybody know where I'm messing up? Thanks a lot!