pandas-dev/pandas

COMPAT: gbq schema compat

Closed this issue · 6 comments

Code Sample

def get_user_account_credentials():  # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L160
    from oauth2client.client import OAuth2WebServerFlow
    from oauth2client.file import Storage
    from oauth2client.tools import run_flow, argparser

    reauth = False

    flow = OAuth2WebServerFlow(
        client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd'
                   '.apps.googleusercontent.com'),
        client_secret='kOc9wMptUtxkcIFbtZCcrEAc',
        scope=['https://www.googleapis.com/auth/bigquery'],
        redirect_uri='urn:ietf:wg:oauth:2.0:oob')

    storage = Storage('bigquery_credentials.dat')
    credentials = storage.get()

    if credentials is None or credentials.invalid or reauth:
        credentials = run_flow(flow, storage, argparser.parse_args([]))

    return credentials

def _generate_bq_schema(df, default_type='STRING'):  # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L735
    """ Given a passed df, generate the associated Google BigQuery schema.
    Parameters
    ----------
    df : DataFrame
    default_type : string
        The default big query type in case the type of the column
        does not exist in the schema.
    """

    type_mapping = {
        'i': 'INTEGER',
        'b': 'BOOLEAN',
        'f': 'FLOAT',
        'O': 'STRING',
        'S': 'STRING',
        'U': 'STRING',
        'M': 'TIMESTAMP'
    }

    fields = []
    for column_name, dtype in df.dtypes.iteritems():
        fields.append({'name': column_name,
                       'type': type_mapping.get(dtype.kind, default_type)})

    return {'fields': fields}


def get_service(): # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L267
    from apiclient.discovery import build
    import httplib2    

    http = httplib2.Http()
    credentials = get_user_account_credentials()
    http = credentials.authorize(http)
    return build('bigquery', 'v2', http=http)


def verify_schema(dataset_id, table_id, schema):  # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L476
    from apiclient.errors import HttpError
    bigquery_service = get_service()

    try:
        return (bigquery_service.tables().get(
            projectId='publicdata',
            datasetId=dataset_id,
            tableId=table_id
        ).execute()['schema']) == schema

    except HttpError as ex:
        pass
        # self.process_http_error(ex)

def main():
    shakespeare_df = pd.DataFrame({'word': 'foo', 'word_count': [1], 'corpus': 'bar', 'corpus_date': [2016]}, 
                              columns=['word', 'word_count', 'corpus', 'corpus_date'])
    table_schema = _generate_bq_schema(shakespeare_df)

    print 'Is the schema verified?'
    print verify_schema('samples', 'shakespeare', table_schema)

if __name__ == '__main__':
    main()

Expected Output

We want verify_schema to return True since the dataframe's columns are in the correct order, are named correctly, and have the correct types. The BigQuery table schema, however, has description and mode in addition to name and type.

Therefore verify_schema should remove description and mode from the BigQuery table schema when comparing it to the dataframe's schema.

Solution:

def get_user_account_credentials():  # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L160
    from oauth2client.client import OAuth2WebServerFlow
    from oauth2client.file import Storage
    from oauth2client.tools import run_flow, argparser

    reauth = False

    flow = OAuth2WebServerFlow(
        client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd'
                   '.apps.googleusercontent.com'),
        client_secret='kOc9wMptUtxkcIFbtZCcrEAc',
        scope=['https://www.googleapis.com/auth/bigquery'],
        redirect_uri='urn:ietf:wg:oauth:2.0:oob')

    storage = Storage('bigquery_credentials.dat')
    credentials = storage.get()

    if credentials is None or credentials.invalid or reauth:
        credentials = run_flow(flow, storage, argparser.parse_args([]))

    return credentials

def _generate_bq_schema(df, default_type='STRING'):  # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L735
    """ Given a passed df, generate the associated Google BigQuery schema.
    Parameters
    ----------
    df : DataFrame
    default_type : string
        The default big query type in case the type of the column
        does not exist in the schema.
    """

    type_mapping = {
        'i': 'INTEGER',
        'b': 'BOOLEAN',
        'f': 'FLOAT',
        'O': 'STRING',
        'S': 'STRING',
        'U': 'STRING',
        'M': 'TIMESTAMP'
    }

    fields = []
    for column_name, dtype in df.dtypes.iteritems():
        fields.append({'name': column_name,
                       'type': type_mapping.get(dtype.kind, default_type)})

    return {'fields': fields}


def get_service(): # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L267
    from apiclient.discovery import build
    import httplib2    

    http = httplib2.Http()
    credentials = get_user_account_credentials()
    http = credentials.authorize(http)
    return build('bigquery', 'v2', http=http)


def verify_schema(dataset_id, table_id, schema):  # https://github.com/pydata/pandas/blob/master/pandas/io/gbq.py#L476
    from apiclient.errors import HttpError
    bigquery_service = get_service()

    try:
        return (bigquery_service.tables().get(
            projectId='publicdata',
            datasetId=dataset_id,
            tableId=table_id
        ).execute()['schema']) == schema

    except HttpError as ex:
        pass
        # self.process_http_error(ex)

def verify_schema_modified(dataset_id, table_id, schema):
    from apiclient.errors import HttpError
    bigquery_service = get_service()
    try:
        original_schema = bigquery_service.tables().get(
            projectId='publicdata',
            datasetId=dataset_id,
            tableId=table_id
        ).execute()['schema']

        modified_schema_list = [{key:field[key] for key in field if key != 'mode' and key != 'description'}
                                for field in original_schema['fields']]

        return {'fields': modified_schema_list} == schema
    except HttpError as ex:
        pass
        #self.process_http_error(ex)

def main():
    shakespeare_df = pd.DataFrame({'word': 'foo', 'word_count': [1], 'corpus': 'bar', 'corpus_date': [2016]}, 
                              columns=['word', 'word_count', 'corpus', 'corpus_date'])
    table_schema = _generate_bq_schema(shakespeare_df)

    print 'Is the schema verified?'
    print verify_schema('samples', 'shakespeare', table_schema)
    print verify_schema_modified('samples', 'shakespeare', table_schema)

if __name__ == '__main__':
    main()

output of pd.show_versions()

INSTALLED VERSIONS
------------------
commit: 5fd6ed036324f5de28816e2b66348e8a56b96f04
python: 2.7.10.final.0
python-bits: 64
OS: Darwin
OS-release: 15.3.0
machine: x86_64
processor: i386
byteorder: little
LC_ALL: None
LANG: None

pandas: 0.18.1+8.g5fd6ed0
nose: 1.3.6
pip: 8.1.1
setuptools: 20.8.1
Cython: 0.22
numpy: 1.11.0
scipy: 0.13.0b1
statsmodels: None
xarray: None
IPython: 4.0.0
sphinx: 1.3.1
patsy: None
dateutil: 2.5.2
pytz: 2016.3
blosc: None
bottleneck: None
tables: None
numexpr: None
matplotlib: 1.3.1
openpyxl: None
xlrd: None
xlwt: None
xlsxwriter: None
lxml: None
bs4: None
html5lib: None
httplib2: 0.9.2
apiclient: 1.5.0
sqlalchemy: None
pymysql: None
psycopg2: None
jinja2: 2.8
boto: 2.38.0
pandas_datareader: None

what exactly is the problem? you are showing lots of code which makes this very hard to grok

Yeah, just trying to adhere to the contributing guidelines. Sorry if it's unclear!

Basically I'd like verify_schema code to change. Currently it takes the schema of a BigQuery table, which will usually have name, type, mode, and possibly a description.

The _generate_bq_schema code takes a pandas dataframe and creates a BigQuery-like schema that only has name and type (not mode or description).

I think that verify_schema should only compare the BigQuery table's schema fields name and type to the dataframe's "schema".

I just submitted my pull request here although I haven't completed all the checkboxes. That might explain it better.

Appending dataframes to existing BigQuery tables (that weren't created by the to_gbq method) unnecessarily raises the InvalidSchema PandasError due to a bug in the GbqConnector.verify_schema method.

Creating a dataframe will result in the following generated_schema:

df = pandas.DataFrame({'word': '', 'word_count': [1], 'corpus': '', 'corpus_date': [1]},
            columns=['word', 'word_count', 'corpus', 'corpus_date'])
generated_schema = pandas.io.gbq.generate_bq_schema(df)
generated_schema
{'fields': [{'name': 'word', 'type': 'STRING'},
            {'name': 'word_count', 'type': 'INTEGER'},
            {'name': 'corpus', 'type': 'STRING'},
            {'name': 'corpus_date', 'type': 'INTEGER'}]}

This generated_schema will fail the verify_schema function since the schema from the BigQuery table contains mode for each field and sometimes description.

{u'fields': [{u'description': u'A single unique word (where whitespace is the delimiter) extracted from a corpus.',
              u'mode': u'REQUIRED',
              u'name': u'word',
              u'type': u'STRING'},
             {u'description': u'The number of times this word appears in this corpus.',
              u'mode': u'REQUIRED',
              u'name': u'word_count',
              u'type': u'INTEGER'},
             {u'description': u'The work from which this word was extracted.',
              u'mode': u'REQUIRED',
              u'name': u'corpus',
              u'type': u'STRING'},
             {u'description': u'The year in which this corpus was published.',
              u'mode': u'REQUIRED',
              u'name': u'corpus_date',
              u'type': u'INTEGER'}]}

The solution is to modify verify_schema to strip the BigQuery table's schema of its mode and description entries.

can you rebase / update and show tests results.

Hi, any ETA on this? I'm currently running into this exact issue.