/bqfetch

A lightweight tool to fetch tables from BigQuery as pandas DataFrame very fast using BigQuery Storage API combined with multiprocessing

Primary LanguagePythonMIT LicenseMIT

Last commit Languages Release date
Python version Python version

bqfetch

A lightweight tool to fetch tables from BigQuery as pandas DataFrames very fast using BigQuery Storage API combined with multiprocessing. This module also aims to fetch large tables that cannot fit into memory, by chunking the table in a smart and scalable way.

Installation

pip install bqfetch
pip install -r requirements.txt

Algorithm

  • Fetch all distinct values from the given index column.
  • Divide these indices in chunks based on the available memory and the number of cores on the machine.
  • If multiprocessing:
    • Each chunk will be divided in multiple sub-chunks based on the parameter nb_cores and the available memory.
    • For each sub-chunk, create a temporary table containing all the matching rows in the whole table.
    • Fetch these temporary tables using BigQuery Storage as dataframes.
    • Merge the dataframes.
    • Delete temporary tables.
  • If !multiprocessing:
    • Same process with only one temporary table and no parallel processes created.

Use case

Fetching a huge table of users using multiple cores

id Name Age
187 Bartolomé 30
188 Tristan 22
... ... ...
>>> table = BigQueryTable("PROJECT", "DATASET", "TABLE")
>>> fetcher = BigQueryFetcher('/path/to/service_account.json', table)
>>> chunks = fetcher.chunks('id', by_chunk_size_in_GB=5)

>>> for chunk in chunks:
        df = fetcher.fetch(chunk, nb_cores=-1, parallel_backend='billiard')
        # ...
  • First, we have to create a BigQueryTable object which contains the path to the BigQuery table stored in GCP.
  • A fetcher is created, given in parameter the absolute path to the service_account.json file, the file is mandatory in order to do operations in GCP.
  • Chunks the whole table, given the column name and the chunk size. In this case, choosing the id column is perfect because this each value of this column appears the same number of times: 1 time. Concerning the chunks size, if by_chunk_size_in_GB=5, each chunk that will be fetched on the machine will be of size 5GB. Thus it has to fit into memory. You need to save 1/3 more memory because the size of a DataFrame object is larger than the raw fetched data.
  • For each chunk, fetch it.
    • nb_cores=-1 will use the number of cores available on the machine.
    • parallel_backend='billiard' | 'joblib' | 'multiprocessing' specify the backend framework to use.

Fetch by number of chunks

It is also possible to use by_nb_chunks instead of by_chunk_size_in_GB. It will divided the table in N, so you cannot control more flexibly the size of each chunk.

>>> table = BigQueryTable("PROJECT", "DATASET", "TABLE")
>>> fetcher = BigQueryFetcher('/path/to/service_account.json', table)
>>> chunks = fetcher.chunks('id', by_nb_chunks=10)

>>> for chunk in chunks:
        df = fetcher.fetch(chunk, nb_cores=-1, parallel_backend='billiard')
        # ...

Verbose mode

>>> chunks = fetcher.chunks(column='id', by_nb_chunks=1, verbose=True)
# Available memory on device:  7.04GB
# Size of table:               2.19GB
# Prefered size of chunk:      3GB
# Size per chunk:              3GB
# Nb chunks:                   1
  
# Nb values in "id":           96
# Chunk size:                  3GB
# Nb chunks:                   1
  
>>> for chunk in chunks:
>>>        df = fetcher.fetch(chunk=chunk, nb_cores=1, parallel_backend='joblib', verbose=True)
# Use multiprocessing :        False
# Nb cores:                    1
# Parallel backend:            joblib

# Time to fetch:               43.21s
# Nb lines in dataframe:       3375875
# Size of dataframe:           2.83GB

Warning

We recommend to use this tool only in the case where the table to fetch contains a column that can be easily chunked (divided in small parts). Thus the perfect column to achieve this is a column containing distinct values or values that appear ~ the same number of time. If some values appear thousands of times and some only fews, then the chunking will not be reliable because we need to make the assumption that each chunk will be approximatively the same size in order to estimate the needed memory necessary to fetch in an optimize way the table.

A good index colum:

This column contains distinct values so can be divided in chunks easily.

Card number
4390 3849 ...
2903 1182 ...
0562 7205 ...
...

A bad index colum:

This column can contains a lot of variance between values so the chunking will not be reliable.

Age
18
18
64
18
...

More cores != faster

I remind you that adding more cores to the fetching process will not necessarily gain performance and most of the time it will even be slower. The reason is that the fetching is directly dependent on the Internet bandwidth available on your network, not the number of working cores or the computer power. However, you can do your own tests and in some cases the multiprocessing can gain time (ex: in the case where cloud machines allow only an amount of bandwidth by core, multiplying the number of cores will also multiplying the bandwidth, ex: GCP compute engines).

Contribution

bqfetch is open to new contributors, especially for bug fixing or implementation of new features. Do not hesitate to open an issue/pull request :)

License

MIT

Copyright (c) 2021-present, Tristan Bilot