Python clickhouse-driver: ValueError: Parameters are expected in dict form

3.5k Views Asked by At

I have some ETL that saves data to clickhouse with clickhouse-driver.

Save function looks exactly like this:

def insert_data(data: Iterable[Dict], table: str, client: Client = None):
    columns = get_table_cols(table)
    client = client or get_ch_client(0)
    query = f"insert into {table} ({', '.join(columns)}) values"
    data = map(lambda row: {key: row[key] for key in columns}, data)
    client.execute(query, data)

Function that calls insert_data looks like this:

def save_data(data: DataFrame, client: Client):

    mapper = get_mapper(my_table_map)
    data = map(lambda x: {col_new: getattr(x, col_old)
                          for col_old, col_new in map_dataframe_to_ch.items()},
               data.collect())
    data = map(mapper, data)
    insert_data(data, 'my_table_name', client)

get_mapper returns a map function that looks like this:

def map_row(row: Dict[str, Any]) -> Dict[str, Any]:
    nonlocal map_
    return {key: map_[key](val) for key, val in row.items()}

So basically in the end i have some nested generators that produce dictionaries. And to confirm this, if i put print(next(data)) just before client.execute i get exactly the dict that i expect. Here is an example with sensitive info hidden:

{'account_currency': '***', 
 'instrument': '***',
 'operation': 'open',
 'event_time': datetime.datetime(2020, 7, 7, 19, 11, 49),
 'country': 'CN',
 'region': 'Asia and Pacific',
 'registration_source': '***',
 'account_type': '***',
 'platform': '***',
 'server_key': '***'}

Table schema is as follows:

"account_currency": "String",
"instrument": "String",
"operation": "String",
"event_time": "DateTime",
"country": "String",
"region": "String",
"registration_source": "String",
"account_type": "String",
"platform": "String",
"server_key": "String"

But for whatever reason is i get this error:

  File "src/etl/usd_volume/prepare_users.py", line 356, in <module>
    main()
  File "src/etl/usd_volume/prepare_users.py", line 348, in main
    save_data(data, client)
  File "src/etl/usd_volume/prepare_users.py", line 302, in save_data
    insert_data(data, 'report_traded_volume_usd', client)
  File "/root/data/src/common/clickhouse_helper.py", line 14, in insert_data
    client.execute(query, data)
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 224, in execute
    columnar=columnar
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 341, in process_ordinary_query
    query = self.substitute_params(query, params)
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 422, in substitute_params
    raise ValueError('Parameters are expected in dict form')

According to docs:

:param params: substitution parameters for SELECT queries and data for INSERT queries. Data for INSERT can be list, tuple or :data:~types.GeneratorType. Defaults to None (no parameters or data).

So clearly my data fits these requirements.

However in the source code there is only this check:

def substitute_params(self, query, params):
    if not isinstance(params, dict):
        raise ValueError('Parameters are expected in dict form')

    escaped = escape_params(params)
    return query % escaped

I didn't really find where they check for it to be a generator. Clickhouse-driver version is 0.1.4

Any help on this problem is greatly appreciated.

1

There are 1 best solutions below

1
On BEST ANSWER

Okay, further research on source code revealed the root cause.

The function that throws error substitute_params is called within the process_ordinary_query method of Client class. This method is basically called for any query other than INSERT.

The sign of query being either INSERT or any other is checked by this part of execute method:

is_insert = isinstance(params, (list, tuple, types.GeneratorType))

if is_insert:
    rv = self.process_insert_query(
        query, params, external_tables=external_tables,
        query_id=query_id, types_check=types_check,
        columnar=columnar
    )
else:
    rv = self.process_ordinary_query(
        query, params=params, with_column_types=with_column_types,
        external_tables=external_tables,
        query_id=query_id, types_check=types_check,
        columnar=columnar
    )

The crux is isinstance(params, (list, tuple, types.GeneratorType))

types.GeneratorType is defined as follows:

def _g():
    yield 1
GeneratorType = type(_g())

Which leads to this:

>>>GeneratorType
<class 'generator'>

Obviously, for my data which is map:

>>>type(map(...))
<class 'map'>
>>>isinstance(map(...), GeneratorType)
False

So the simplest solution to avoid this problem is to simply convert data into generator with generator comprehension. And that solves problem completely.

>>>data = (i for i in data)
>>>isinstance(data, GeneratorType)
True

Or if you are going to execute INSERT queries exclusively, you could just call process_insert_query directly, that will lift the need to convert data to generator.

I think this is a bit of an ambiguous type checking by clickhouse-driver, but this is what we have.