I'm working on automating some query extraction using python and pyodbc, and then converting to parquet format, and send to AWS S3.
My script solution is working fine so far, but I have faced a problem. I have a Schema, let us call it SCHEMA_A, and inside of it several tables, TABLE_1, TABLE_2 .... TABLE_N.
All those tables inside that schema are accessible by using the same credentials.
So I'm using a script like this one to automate the task.
def get_stream(cursor, batch_size=100000):
while True:
row = cursor.fetchmany(batch_size)
if row is None or not row:
break
yield row
cnxn = pyodbc.connect(driver='pyodbc driver here',
host='host name',
database='schema name',
user='user name,
password='password')
print('Connection stabilished ...')
cursor = cnxn.cursor()
print('Initializing cursos ...')
if len(sys.argv) > 1:
table_name = sys.argv[1]
cursor.execute('SELECT * FROM {}'.format(table_name))
else:
exit()
print('Query fetched ...')
row_batch = get_stream(cursor)
print('Getting Iterator ...')
cols = cursor.description
cols = [col[0] for col in cols]
print('Initalizin batch data frame ..')
df = pd.DataFrame(columns=cols)
start_time = time.time()
for rows in row_batch:
tmp = pd.DataFrame.from_records(rows, columns=cols)
df = df.append(tmp, ignore_index=True)
tmp = None
print("--- Batch inserted inn%s seconds ---" % (time.time() - start_time))
start_time = time.time()
I run a code similar to that inside Airflow tasks, and works just fine for all other tables. But then I have two tables, lets call TABLE_I and TABLE_II that yields the following error when I execute cursor.fetchmany(batch_size)
:
ERROR - ('ODBC SQL type -151 is not yet supported. column-index=16 type=-151', 'HY106')
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1310, in _execute_task
result = task_copy.execute(context=context)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 58, in fetch_data
for rows in row_batch:
File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 27, in stream
row = cursor.fetchmany(batch_size)
Inspecting those tables with SQLElectron, and Querying the first few lines, I have realized that both TABLE_I and TABLE_II have a Column called 'Geolocalizacao', when I use SQL server language to find the DATA TYPE of that column with:
SELECT DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = 'TABLE_I' AND
COLUMN_NAME = 'Geolocalizacao';
It yields:
DATA_TYPE
geography
Seraching here on stack overflow I found this solution: python pyodbc SQL Server Native Client 11.0 cannot return geometry column
By the description of the user, it seem work fine by adding:
def unpack_geometry(raw_bytes):
# adapted from SSCLRT information at
# https://learn.microsoft.com/en-us/openspecs/sql_server_protocols/ms-ssclrt/dc988cb6-4812-4ec6-91cd-cce329f6ecda
tup = struct.unpack('<i2b3d', raw_bytes)
# tup contains: (unknown, Version, Serialization_Properties, X, Y, SRID)
return tup[3], tup[4], tup[5]
and then:
cnxn.add_output_converter(-151, unpack_geometry)
After creating the connection. But It's not working for the GEOGRAPHY DATA TYPE, when I use this code (add import struct
on python script), it gives me the following error:
Traceback (most recent call last):
File "benchmark.py", line 79, in <module>
for rows in row_batch:
File "benchmark.py", line 39, in get_stream
row = cursor.fetchmany(batch_size)
File "benchmark.py", line 47, in unpack_geometry
tup = struct.unpack('<i2b3d', raw_bytes)
struct.error: unpack requires a buffer of 30 bytes
An example of values that this column have, follows the given template:
{"srid":4326,"version":1,"points":[{}],"figures":[{"attribute":1,"pointOffset":0}],"shapes":[{"parentOffset":-1,"figureOffset":0,"type":1}],"segments":[]}
I honestly don't know how to adapt the code for this given structure, can someone help me? It's been working fine for all other tables, but I have those two tables with this column that are giving me a lot o headeach.
Hi this is what I have done:
and then on connection:
this will return value as SSMS.