I have some ETL that saves data to clickhouse with clickhouse-driver.
Save function looks exactly like this:
def insert_data(data: Iterable[Dict], table: str, client: Client = None):
columns = get_table_cols(table)
client = client or get_ch_client(0)
query = f"insert into {table} ({', '.join(columns)}) values"
data = map(lambda row: {key: row[key] for key in columns}, data)
client.execute(query, data)
Function that calls insert_data
looks like this:
def save_data(data: DataFrame, client: Client):
mapper = get_mapper(my_table_map)
data = map(lambda x: {col_new: getattr(x, col_old)
for col_old, col_new in map_dataframe_to_ch.items()},
data.collect())
data = map(mapper, data)
insert_data(data, 'my_table_name', client)
get_mapper
returns a map function that looks like this:
def map_row(row: Dict[str, Any]) -> Dict[str, Any]:
nonlocal map_
return {key: map_[key](val) for key, val in row.items()}
So basically in the end i have some nested generators that produce dictionaries. And to confirm this, if i put print(next(data))
just before client.execute
i get exactly the dict that i expect. Here is an example with sensitive info hidden:
{'account_currency': '***',
'instrument': '***',
'operation': 'open',
'event_time': datetime.datetime(2020, 7, 7, 19, 11, 49),
'country': 'CN',
'region': 'Asia and Pacific',
'registration_source': '***',
'account_type': '***',
'platform': '***',
'server_key': '***'}
Table schema is as follows:
"account_currency": "String",
"instrument": "String",
"operation": "String",
"event_time": "DateTime",
"country": "String",
"region": "String",
"registration_source": "String",
"account_type": "String",
"platform": "String",
"server_key": "String"
But for whatever reason is i get this error:
File "src/etl/usd_volume/prepare_users.py", line 356, in <module>
main()
File "src/etl/usd_volume/prepare_users.py", line 348, in main
save_data(data, client)
File "src/etl/usd_volume/prepare_users.py", line 302, in save_data
insert_data(data, 'report_traded_volume_usd', client)
File "/root/data/src/common/clickhouse_helper.py", line 14, in insert_data
client.execute(query, data)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 224, in execute
columnar=columnar
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 341, in process_ordinary_query
query = self.substitute_params(query, params)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 422, in substitute_params
raise ValueError('Parameters are expected in dict form')
According to docs:
:param params: substitution parameters for SELECT queries and data for INSERT queries. Data for INSERT can be
list
,tuple
or :data:~types.GeneratorType
. Defaults toNone
(no parameters or data).
So clearly my data fits these requirements.
However in the source code there is only this check:
def substitute_params(self, query, params):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
escaped = escape_params(params)
return query % escaped
I didn't really find where they check for it to be a generator. Clickhouse-driver version is 0.1.4
Any help on this problem is greatly appreciated.
Okay, further research on source code revealed the root cause.
The function that throws error
substitute_params
is called within theprocess_ordinary_query
method ofClient
class. This method is basically called for any query other than INSERT.The sign of query being either INSERT or any other is checked by this part of
execute
method:The crux is
isinstance(params, (list, tuple, types.GeneratorType))
types.GeneratorType
is defined as follows:Which leads to this:
Obviously, for my data which is
map
:So the simplest solution to avoid this problem is to simply convert
data
into generator with generator comprehension. And that solves problem completely.Or if you are going to execute INSERT queries exclusively, you could just call
process_insert_query
directly, that will lift the need to convert data to generator.I think this is a bit of an ambiguous type checking by clickhouse-driver, but this is what we have.