DataTalksClub/data-engineering-zoomcamp

I want to use Mage pipeline to load data from partitioned object in gcs and export it to bigquery. The object's name is other_data and partitioned by year, month and day. But it seems like a wrong object name when I use the name 'other_data/*/*/*/*' because I've tried to load from specific path 'other_data/year=2024/month=10/day=2/40949e85e0734000910e2c0179278e00-0.parquet' and it worked

phuccodetrau opened this issue · 0 comments

Data loader:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test

@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

bucket_name = 'weather_bigdata_20241'
object_key = 'other_data/*/*/*/*'

return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
    bucket_name,
    object_key,
)

Data exporter:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#bigquery
"""
table_id = 'strong-ward-437213-j6.bigdata_20241.test_parquet'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
    df,
    table_id,
    if_exists='replace',  # Specify resolution policy if table name already exists
)

Issue:
GoogleCloudStorage initialized

└─ Loading data frame from bucket 'weather_bigdata_20241' at key 'other_data'...

DONE

BigQuery initialized

├─ Connecting to BigQuery warehouse...DONE

└─ Exporting data to table 'strong-ward-437213-j6.bigdata_20241.test_parquet'...


BadRequest Traceback (most recent call last)

File /home/src/magic-zoomcamp/data_exporters/weather_bq.py:23, in export_data_to_big_query(df, **kwargs)

 20 config_path = path.join(get_repo_path(), 'io_config.yaml')

 21 config_profile = 'default'

---> 23 BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(

 24     df,

 25     table_id,

 26     if_exists='replace',  # Specify resolution policy if table name already exists

 27 )

File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:354, in BigQuery.export(self, df, table_id, database, if_exists, overwrite_types, query_string, verbose, unique_conflict_method, unique_constraints, write_disposition, create_dataset, **configuration_params)

352 if verbose:

353     with self.printer.print_msg(f'Exporting data to table \'{table_id}\''):

--> 354 __process(database=database, write_disposition=write_disposition)

355 else:

356     __process(database=database)

File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:343, in BigQuery.export..__process(database, write_disposition)

341     elif if_exists == ExportWritePolicy.FAIL:

342         write_disposition = WriteDisposition.WRITE_EMPTY

--> 343 self.__write_table(

344     df,

345     table_id,

346     overwrite_types=overwrite_types,

347     write_disposition=write_disposition,

348     create_dataset=create_dataset,

349     **configuration_params,

350 )

File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:403, in BigQuery.__write_table(self, df, table_id, overwrite_types, create_dataset, **configuration_params)

400 if type(df) is DataFrame:

401     df.columns = df.columns.str.replace(' ', '_')

--> 403 return self.client.load_table_from_dataframe(df, table_id, job_config=config).result()

File /usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py:952, in _AsyncJob.result(self, retry, timeout)

949     self._begin(retry=retry, timeout=timeout)

951 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}

--> 952 return super(_AsyncJob, self).result(timeout=timeout, **kwargs)

File /usr/local/lib/python3.10/site-packages/google/api_core/future/polling.py:261, in PollingFuture.result(self, timeout, retry, polling)

256 self._blocking_poll(timeout=timeout, retry=retry, polling=polling)

258 if self._exception is not None:

259     # pylint: disable=raising-bad-type

260     # Pylint doesn't recognize that this is valid in this case.

--> 261 raise self._exception

263 return self._result

BadRequest: 400 Table test_parquet_eec7e159_f7d0_4966_81c2_355a00ea9287_source does not have a schema.