maciejzj/it-jobs-meta

Pipeline is broken because location schema was changed

Closed this issue · 2 comments

Data pipeline fails on geolocation step of ETL.

Traceback suggests that there is a problem with 'city' field (traceback included below).

Python traceback (long text):
2023-03-13 00:00:00,294 [INFO] Started data pipeline
2023-03-13 00:00:00,605 [INFO] Attempting to perform data ingestion step
2023-03-13 00:00:18,594 [INFO] Data ingestion succeeded
2023-03-13 00:00:18,596 [INFO] Attempting to archive raw data in data lake
2023-03-13 00:00:19,525 [INFO] Data archival succeeded, stored under "1678665618_nofluffjobs" key
2023-03-13 00:00:19,525 [INFO] Attempting to perform data warehousing step
2023-03-13 00:00:21,057 [ERROR] Transform function failed
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 311, in transform_str_or_callable
    return obj.apply(func, args=args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4433, in apply
    return SeriesApply(self, func, convert_dtype, args, kwargs).apply()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1082, in apply
    return self.apply_standard()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1137, in apply_standard
    mapped = lib.map_infer(
  File "pandas/_libs/lib.pyx", line 2870, in pandas._libs.lib.map_infer
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 316, in <lambda>
    lambda location_dict: [
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 317, in <listcomp>
    self._geolocator(loc['city'])
KeyError: 'city'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3621, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 136, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 163, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'places'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 223, in transform
    result = self.transform_str_or_callable(func)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 313, in transform_str_or_callable
    return func(obj, *args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 318, in <lambda>
    for loc in location_dict['places']
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 958, in __getitem__
    return self._get_value(key)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 1069, in _get_value
    loc = self.index.get_loc(label)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3623, in get_loc
    raise KeyError(key) from err
KeyError: 'places'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_pipeline.py", line 78, in run
    etl_pipeline.run(data_as_json)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 200, in run
    data = self.transform(data)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 212, in transform
    data = self._transformation_engine.extract_locations(data)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 315, in extract_locations
    data['city'] = data['location'].transform(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4320, in transform
    ).transform()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 227, in transform
    raise ValueError("Transform function failed") from err
ValueError: Transform function failed
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 311, in transform_str_or_callable
    return obj.apply(func, args=args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4433, in apply
    return SeriesApply(self, func, convert_dtype, args, kwargs).apply()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1082, in apply
    return self.apply_standard()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 1137, in apply_standard
    mapped = lib.map_infer(
  File "pandas/_libs/lib.pyx", line 2870, in pandas._libs.lib.map_infer
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 316, in <lambda>
    lambda location_dict: [
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 317, in <listcomp>
    self._geolocator(loc['city'])
KeyError: 'city'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3621, in get_loc
    return self._engine.get_loc(casted_key)
  File "pandas/_libs/index.pyx", line 136, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 163, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'places'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 223, in transform
    result = self.transform_str_or_callable(func)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 313, in transform_str_or_callable
    return func(obj, *args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 318, in <lambda>
    for loc in location_dict['places']
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 958, in __getitem__
    return self._get_value(key)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 1069, in _get_value
    loc = self.index.get_loc(label)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3623, in get_loc
    raise KeyError(key) from err
KeyError: 'places'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/it-jobs-meta", line 8, in <module>
    sys.exit(main())
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/__main__.py", line 43, in main
    data_pipeline.schedule(parser.args['schedule'])
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_pipeline.py", line 50, in schedule
    self.run()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_pipeline.py", line 78, in run
    etl_pipeline.run(data_as_json)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 200, in run
    data = self.transform(data)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 212, in transform
    data = self._transformation_engine.extract_locations(data)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/it_jobs_meta/data_pipeline/data_etl.py", line 315, in extract_locations
    data['city'] = data['location'].transform(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/series.py", line 4320, in transform
    ).transform()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pandas/core/apply.py", line 227, in transform
    raise ValueError("Transform function failed") from err
ValueError: Transform function failed

Extracted an example of new JSON (as dict) places entry format. There are new 'province'-oriented elements places that lack the 'city' key used by the pipeline:

{
    'places': [
        {'city': 'Remote', 'url': 'c-developer-sii-polska-remote-12'},
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'city': 'Warszawa',
            'street': '',
            'postalCode': '',
            'url': 'c-developer-sii-polska-warszawa-3',
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'city': 'Wrocław',
            'street': '',
            'postalCode': '',
            'url': 'c-developer-sii-polska-wroclaw-4',
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'kuyavian-pomeranian',
            'url': 'c-developer-sii-polska-kuyavian-pomeranian',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'lodz',
            'url': 'c-developer-sii-polska-lodz-1',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'lublin',
            'url': 'c-developer-sii-polska-lublin',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'lubusz',
            'url': 'c-developer-sii-polska-lubusz',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'lesser-poland',
            'url': 'c-developer-sii-polska-lesser-poland',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'opole',
            'url': 'c-developer-sii-polska-opole',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'subcarpathian',
            'url': 'c-developer-sii-polska-subcarpathian',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'podlaskie',
            'url': 'c-developer-sii-polska-podlaskie',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'pomeranian',
            'url': 'c-developer-sii-polska-pomeranian',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'silesian',
            'url': 'c-developer-sii-polska-silesian',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'holy-cross',
            'url': 'c-developer-sii-polska-holy-cross',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'warmian-masurian',
            'url': 'c-developer-sii-polska-warmian-masurian',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'greater-poland',
            'url': 'c-developer-sii-polska-greater-poland',
            'provinceOnly': True,
        },
        {
            'country': {'code': 'POL', 'name': 'Poland'},
            'province': 'west-pomeranian',
            'url': 'c-developer-sii-polska-west-pomeranian',
            'provinceOnly': True,
        },
    ],
    'fullyRemote': True,
    'covidTimeRemotely': False,
}

The solution would be to extract only the places entries with 'city' key present:

data['city'] = data['location'].transform(
    lambda location_dict: [
        self._geolocator(loc['city'])
        for loc in location_dict['places'] >>>if 'city' in loc<<<
    ]
)