pyodbc ERROR - ('ODBC SQL type -151 is not yet supported. column-index=16 type=-151', 'HY106')

2.2k Views Asked by At

I'm working on automating some query extraction using python and pyodbc, and then converting to parquet format, and send to AWS S3.

My script solution is working fine so far, but I have faced a problem. I have a Schema, let us call it SCHEMA_A, and inside of it several tables, TABLE_1, TABLE_2 .... TABLE_N.

All those tables inside that schema are accessible by using the same credentials.

So I'm using a script like this one to automate the task.

def get_stream(cursor, batch_size=100000):
    while True:
        row = cursor.fetchmany(batch_size)
        if row is None or not row:
            break
        yield row


cnxn = pyodbc.connect(driver='pyodbc driver here',
                      host='host name',
                      database='schema name',
                      user='user name,
                      password='password')
print('Connection stabilished ...')

cursor = cnxn.cursor()
print('Initializing cursos ...')

if len(sys.argv) > 1:
    table_name = sys.argv[1]
    cursor.execute('SELECT * FROM {}'.format(table_name))
else:
    exit()
print('Query fetched ...')

row_batch = get_stream(cursor)
print('Getting Iterator ...')

cols = cursor.description
cols = [col[0] for col in cols]

print('Initalizin batch data frame ..')
df = pd.DataFrame(columns=cols)

start_time = time.time()
for rows in row_batch:
    tmp = pd.DataFrame.from_records(rows, columns=cols)
    df = df.append(tmp, ignore_index=True)
    tmp = None
    print("--- Batch inserted inn%s seconds ---" % (time.time() - start_time))
    start_time = time.time()

I run a code similar to that inside Airflow tasks, and works just fine for all other tables. But then I have two tables, lets call TABLE_I and TABLE_II that yields the following error when I execute cursor.fetchmany(batch_size):

ERROR - ('ODBC SQL type -151 is not yet supported.  column-index=16  type=-151', 'HY106')
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1310, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 58, in fetch_data
    for rows in row_batch:
  File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 27, in stream
    row = cursor.fetchmany(batch_size)

Inspecting those tables with SQLElectron, and Querying the first few lines, I have realized that both TABLE_I and TABLE_II have a Column called 'Geolocalizacao', when I use SQL server language to find the DATA TYPE of that column with:

SELECT DATA_TYPE 
FROM INFORMATION_SCHEMA.COLUMNS
WHERE 
     TABLE_NAME = 'TABLE_I' AND 
     COLUMN_NAME = 'Geolocalizacao';

It yields:

DATA_TYPE
geography

Seraching here on stack overflow I found this solution: python pyodbc SQL Server Native Client 11.0 cannot return geometry column

By the description of the user, it seem work fine by adding:

def unpack_geometry(raw_bytes):
    # adapted from SSCLRT information at
    #   https://learn.microsoft.com/en-us/openspecs/sql_server_protocols/ms-ssclrt/dc988cb6-4812-4ec6-91cd-cce329f6ecda
    tup = struct.unpack('<i2b3d', raw_bytes)
    # tup contains: (unknown, Version, Serialization_Properties, X, Y, SRID)
    return tup[3], tup[4], tup[5]

and then:

cnxn.add_output_converter(-151, unpack_geometry)

After creating the connection. But It's not working for the GEOGRAPHY DATA TYPE, when I use this code (add import struct on python script), it gives me the following error:

Traceback (most recent call last):
  File "benchmark.py", line 79, in <module>
    for rows in row_batch:
  File "benchmark.py", line 39, in get_stream
    row = cursor.fetchmany(batch_size)
  File "benchmark.py", line 47, in unpack_geometry
    tup = struct.unpack('<i2b3d', raw_bytes)
struct.error: unpack requires a buffer of 30 bytes

An example of values that this column have, follows the given template:

{"srid":4326,"version":1,"points":[{}],"figures":[{"attribute":1,"pointOffset":0}],"shapes":[{"parentOffset":-1,"figureOffset":0,"type":1}],"segments":[]}

I honestly don't know how to adapt the code for this given structure, can someone help me? It's been working fine for all other tables, but I have those two tables with this column that are giving me a lot o headeach.

1

There are 1 best solutions below

0
On

Hi this is what I have done:

from binascii import hexlify

def _handle_geometry(geometry_value):
    return f"0x{hexlify(geometry_value).decode().upper()}"

and then on connection:

cnxn.add_output_converter(-151, _handle_geometry)

this will return value as SSMS.