Pipeline is broken because location schema was changed
Closed this issue · 2 comments
maciejzj commented
Data pipeline fails on geolocation step of ETL.
Traceback suggests that there is a problem with 'city' field (traceback included below).
Python traceback (long text):
2023-03-13 00:00:00,294 [INFO] Started data pipeline
2023-03-13 00:00:00,605 [INFO] Attempting to perform data ingestion step
2023-03-13 00:00:18,594 [INFO] Data ingestion succeeded
2023-03-13 00:00:18,596 [INFO] Attempting to archive raw data in data lake
2023-03-13 00:00:19,525 [INFO] Data archival succeeded, stored under "1678665618_nofluffjobs" key
2023-03-13 00:00:19,525 [INFO] Attempting to perform data warehousing step
2023-03-13 00:00:21,057 [ERROR] Transform function failed
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 311, in transform_str_or_callable
return obj.apply(func, args=args, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4433, in apply
return SeriesApply(self, func, convert_dtype, args, kwargs).apply()
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1082, in apply
return self.apply_standard()
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1137, in apply_standard
mapped = lib.map_infer(
File "pandas/_libs/lib.pyx", line 2870, in pandas._libs.lib.map_infer
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 316, in <lambda>
lambda location_dict: [
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 317, in <listcomp>
self._geolocator(loc['city'])
KeyError: 'city'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3621, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 136, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 163, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'places'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 223, in transform
result = self.transform_str_or_callable(func)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 313, in transform_str_or_callable
return func(obj, *args, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 318, in <lambda>
for loc in location_dict['places']
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 958, in __getitem__
return self._get_value(key)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 1069, in _get_value
loc = self.index.get_loc(label)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3623, in get_loc
raise KeyError(key) from err
KeyError: 'places'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_pipeline.py", line 78, in run
etl_pipeline.run(data_as_json)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 200, in run
data = self.transform(data)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 212, in transform
data = self._transformation_engine.extract_locations(data)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 315, in extract_locations
data['city'] = data['location'].transform(
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4320, in transform
).transform()
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 227, in transform
raise ValueError("Transform function failed") from err
ValueError: Transform function failed
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 311, in transform_str_or_callable
return obj.apply(func, args=args, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4433, in apply
return SeriesApply(self, func, convert_dtype, args, kwargs).apply()
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1082, in apply
return self.apply_standard()
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1137, in apply_standard
mapped = lib.map_infer(
File "pandas/_libs/lib.pyx", line 2870, in pandas._libs.lib.map_infer
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 316, in <lambda>
lambda location_dict: [
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 317, in <listcomp>
self._geolocator(loc['city'])
KeyError: 'city'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3621, in get_loc
return self._engine.get_loc(casted_key)
File "pandas/_libs/index.pyx", line 136, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/index.pyx", line 163, in pandas._libs.index.IndexEngine.get_loc
File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'places'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 223, in transform
result = self.transform_str_or_callable(func)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 313, in transform_str_or_callable
return func(obj, *args, **kwargs)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 318, in <lambda>
for loc in location_dict['places']
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 958, in __getitem__
return self._get_value(key)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 1069, in _get_value
loc = self.index.get_loc(label)
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3623, in get_loc
raise KeyError(key) from err
KeyError: 'places'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/.local/bin/it-jobs-meta", line 8, in <module>
sys.exit(main())
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/__main__.py", line 43, in main
data_pipeline.schedule(parser.args['schedule'])
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_pipeline.py", line 50, in schedule
self.run()
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_pipeline.py", line 78, in run
etl_pipeline.run(data_as_json)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 200, in run
data = self.transform(data)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 212, in transform
data = self._transformation_engine.extract_locations(data)
File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 315, in extract_locations
data['city'] = data['location'].transform(
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4320, in transform
).transform()
File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 227, in transform
raise ValueError("Transform function failed") from err
ValueError: Transform function failed
maciejzj commented
Extracted an example of new JSON (as dict) places entry format. There are new 'province'-oriented elements places that lack the 'city' key used by the pipeline:
{
'places': [
{'city': 'Remote', 'url': 'c-developer-sii-polska-remote-12'},
{
'country': {'code': 'POL', 'name': 'Poland'},
'city': 'Warszawa',
'street': '',
'postalCode': '',
'url': 'c-developer-sii-polska-warszawa-3',
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'city': 'Wrocław',
'street': '',
'postalCode': '',
'url': 'c-developer-sii-polska-wroclaw-4',
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'kuyavian-pomeranian',
'url': 'c-developer-sii-polska-kuyavian-pomeranian',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'lodz',
'url': 'c-developer-sii-polska-lodz-1',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'lublin',
'url': 'c-developer-sii-polska-lublin',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'lubusz',
'url': 'c-developer-sii-polska-lubusz',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'lesser-poland',
'url': 'c-developer-sii-polska-lesser-poland',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'opole',
'url': 'c-developer-sii-polska-opole',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'subcarpathian',
'url': 'c-developer-sii-polska-subcarpathian',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'podlaskie',
'url': 'c-developer-sii-polska-podlaskie',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'pomeranian',
'url': 'c-developer-sii-polska-pomeranian',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'silesian',
'url': 'c-developer-sii-polska-silesian',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'holy-cross',
'url': 'c-developer-sii-polska-holy-cross',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'warmian-masurian',
'url': 'c-developer-sii-polska-warmian-masurian',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'greater-poland',
'url': 'c-developer-sii-polska-greater-poland',
'provinceOnly': True,
},
{
'country': {'code': 'POL', 'name': 'Poland'},
'province': 'west-pomeranian',
'url': 'c-developer-sii-polska-west-pomeranian',
'provinceOnly': True,
},
],
'fullyRemote': True,
'covidTimeRemotely': False,
}
maciejzj commented
The solution would be to extract only the places entries with 'city' key present:
data['city'] = data['location'].transform(
lambda location_dict: [
self._geolocator(loc['city'])
for loc in location_dict['places'] >>>if 'city' in loc<<<
]
)