I want to use Mage pipeline to load data from partitioned object in gcs and export it to bigquery. The object's name is other_data and partitioned by year, month and day. But it seems like a wrong object name when I use the name 'other_data/*/*/*/*' because I've tried to load from specific path 'other_data/year=2024/month=10/day=2/40949e85e0734000910e2c0179278e00-0.parquet' and it worked
phuccodetrau opened this issue · 0 comments
Data loader:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'weather_bigdata_20241'
object_key = 'other_data/*/*/*/*'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
Data exporter:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#bigquery
"""
table_id = 'strong-ward-437213-j6.bigdata_20241.test_parquet'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
table_id,
if_exists='replace', # Specify resolution policy if table name already exists
)
Issue:
GoogleCloudStorage initialized
└─ Loading data frame from bucket 'weather_bigdata_20241' at key 'other_data'...
DONE
BigQuery initialized
├─ Connecting to BigQuery warehouse...DONE
└─ Exporting data to table 'strong-ward-437213-j6.bigdata_20241.test_parquet'...
BadRequest Traceback (most recent call last)
File /home/src/magic-zoomcamp/data_exporters/weather_bq.py:23, in export_data_to_big_query(df, **kwargs)
20 config_path = path.join(get_repo_path(), 'io_config.yaml')
21 config_profile = 'default'
---> 23 BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
24 df,
25 table_id,
26 if_exists='replace', # Specify resolution policy if table name already exists
27 )
File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:354, in BigQuery.export(self, df, table_id, database, if_exists, overwrite_types, query_string, verbose, unique_conflict_method, unique_constraints, write_disposition, create_dataset, **configuration_params)
352 if verbose:
353 with self.printer.print_msg(f'Exporting data to table \'{table_id}\''):
--> 354 __process(database=database, write_disposition=write_disposition)
355 else:
356 __process(database=database)
File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:343, in BigQuery.export..__process(database, write_disposition)
341 elif if_exists == ExportWritePolicy.FAIL:
342 write_disposition = WriteDisposition.WRITE_EMPTY
--> 343 self.__write_table(
344 df,
345 table_id,
346 overwrite_types=overwrite_types,
347 write_disposition=write_disposition,
348 create_dataset=create_dataset,
349 **configuration_params,
350 )
File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:403, in BigQuery.__write_table(self, df, table_id, overwrite_types, create_dataset, **configuration_params)
400 if type(df) is DataFrame:
401 df.columns = df.columns.str.replace(' ', '_')
--> 403 return self.client.load_table_from_dataframe(df, table_id, job_config=config).result()
File /usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py:952, in _AsyncJob.result(self, retry, timeout)
949 self._begin(retry=retry, timeout=timeout)
951 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
--> 952 return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
File /usr/local/lib/python3.10/site-packages/google/api_core/future/polling.py:261, in PollingFuture.result(self, timeout, retry, polling)
256 self._blocking_poll(timeout=timeout, retry=retry, polling=polling)
258 if self._exception is not None:
259 # pylint: disable=raising-bad-type
260 # Pylint doesn't recognize that this is valid in this case.
--> 261 raise self._exception
263 return self._result
BadRequest: 400 Table test_parquet_eec7e159_f7d0_4966_81c2_355a00ea9287_source does not have a schema.