Add DBAPI executemany INSERT batching support
Opened this issue · 2 comments
Is your feature request related to a problem? Please describe.
It appears that the BigQuery DBAPI does not support multi-row INSERT batching for more performant python-based DML transactions. Current executemany INSERT statements are executed one at a time, leading to massive slowdowns in batch INSERT DML operations.
Example multi-row insert from BiqQuery documentation:
https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#insert_examples
INSERT dataset.Inventory (product, quantity)
VALUES('top load washer', 10),
('front load washer', 20),
('dryer', 30),
('refrigerator', 10),
('microwave', 20),
('dishwasher', 30),
('oven', 5)
Describe the solution you'd like
Add multi-row INSERT batching support.
MySQL DBAPI example: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py#L194
Describe alternatives you've considered
I will probably crudely make a patch to my sqlalchemy-bigquery DBAPI cursor to enable this support for my project that needs this performance boost for my ORM based application.
Additional context
sqlalchemy-bigquery related ticket: googleapis/python-bigquery-sqlalchemy#497
sqlalchemy related discussion: sqlalchemy/sqlalchemy#12038
I just made a crude version of what this could look like from within the sqlalchemy-bigquery dialect "do_executemany" function that seems to be working. I have not implemented any paging to handle query character length limits yet however:
#Hotfix for this issue: https://github.com/googleapis/python-bigquery/issues/2048
import copy
import re
from typing import List
from google.cloud.bigquery import dbapi as bq_dbapi
from sqlalchemy_bigquery.base import BigQueryDialect
def do_executemany(self, cursor, statement:str, parameters:List[dict], context=None):
#NOTE: Borrowed from MySQL DBAPI: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py#L157
if not parameters:
return
#: Regular expression for :meth:`Cursor.executemany`.
#: executemany only supports simple bulk insert.
#: You can use it to load large dataset.
RE_INSERT_VALUES = re.compile(
r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)"
+ r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
+ r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
re.IGNORECASE | re.DOTALL,
)
m = RE_INSERT_VALUES.match(statement)
if m:
q_prefix = m.group(1) % ()
q_values = m.group(2).rstrip()
q_postfix = m.group(3) or ""
assert q_values[0] == "(" and q_values[-1] == ")"
new_sql_stmt = q_prefix
new_sql_params = {}
set_idx = 0
for param_set in parameters:
#formatted_operation, parameter_types = bq_dbapi.cursor._format_operation(statement, param_set)
#query_parameters = bq_dbapi._helpers.to_query_parameters(param_set, parameter_types)
set_params = {}
set_values = copy.copy(q_values)
for param_k,param_v in param_set.items():
new_param_k = f'{param_k}__{set_idx}'
set_params[new_param_k] = param_v
set_values = set_values.replace(f'%({param_k}:', f'%({new_param_k}:')
new_sql_stmt = f'{new_sql_stmt}{set_values},'
new_sql_params.update(set_params)
set_idx += 1
new_sql_stmt = new_sql_stmt[:-1] # remove trailing comma
new_sql_stmt = f'{new_sql_stmt}{q_postfix}'
rowcount = cursor.execute(new_sql_stmt, new_sql_params)
else:
#Current implementation of this function only supports serial inserting.
rowcount = cursor.executemany(statement, parameters)
BigQueryDialect.do_executemany = do_executemany
Looks like I immediately hit issues with not batching this function in chucks of 10k parameter sets due to this BigQuery quota: "A GoogleSQL query can have up to 10,000 parameters". Looks like I may need to implement that after all 😅