sfu-db/connector-x

Python read_sql() crash: Too many open files when used with multiprocessing

miohtama opened this issue · 1 comments

What language are you using?

Python

What version are you using?

connectorx                      0.3.2    

What database are you using?

PosgreSQL.

Ubuntu Linux 22.04.

What dataframe are you using?

Arrow 2

Can you describe your bug?

I am running a loop that exports data from the database in slices.

The query I am using looks like:

    import connectorx as cx

    while cursor < end:

        query = \
            f"SELECT id, pair_id, amount0_in, amount0_out, amount1_in, amount1_out, denormalised_ts, tx_index, log_index, tx_hash, denorm_block_number, trader_address " \
            f"from swap " \
            f"WHERE id >= {cursor} and id < {cursor + slice} and denorm_chain_id = {chain_id} " \
            f"ORDER BY pair_id, denormalised_ts"

        pa_table: Table = cx.read_sql(
            db_url,
            query,
            partition_on="id",
            partition_num=worker_threads,
            return_type="arrow2"
        )

       # Do some transformation on pyarrow.Table here using multiprocessing

       cursor += slice

The loop is using multiprocessing module, but this is not touching ConnectionX, so I suspect some kind of interaction between these two.

After running a script for a while I get:

thread 'r2d2-worker-2' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 24, kind: Uncategorized, message: "Too many open files" }', /github/home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/postgres-0.19.4/src/config.rs:342:14
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
^[[B^[[B^[[B^[[B


Traceback (most recent call last):
  File "/root/mikko/oracle/scripts/generic/export-trades.py", line 89, in <module>
    stats = write_trades_cache(
  File "/root/mikko/oracle/dex_ohlcv/writer/parquet_trades.py", line 877, in write_trades_cache
    stats += write_uniswap_v2_trades_connectorx(
  File "/root/mikko/oracle/dex_ohlcv/writer/parquet_trades.py", line 520, in write_uniswap_v2_trades_connectorx
    pa_table: Table = cx.read_sql(
  File "/root/.cache/pypoetry/virtualenvs/dex-ohlcv-bhZziL_o-py3.10/lib/python3.10/site-packages/connectorx/__init__.py", line 297, in read_sql
    result = _read_sql(
RuntimeError: timed out waiting for connection

What are the steps to reproduce the behavior?

Run the export script that issues read_sql multiple times for long time.

I checked using lsof and it seems like (nameless?) FIFO pipes are increasing with each loop.

If there are ways to "reset" ConnectorX Python bindings and internals, I can see if this would help e.g. by manually purging/deleting any OS resources ConnectorX might hold.

I rewrote the loop to not to do fork() within the loop and the resource leakage is gone. However the downside for this is that as I cannot rely fork() to pass down data from parent process to child processes, I need to serialise the data between the parent process and child processes (child cannot just read it from the forked memory) and this slows down the data ETL.