noxdafox/pebble

error while running @concurrent.process repetitively

makesnosense opened this issue ยท 4 comments

Hey.

I use pebble to run a script every 1000 seconds, with timeout of 300. It works just fine for ~two days, then dies completely with the error below, which does not even seem to be handled or catched by the logger. After restarting, it works just fine (for some time more).
Maybe I do something wrong? Anyways, I would be very grateful for any suggestions on how to further investigate the issue.

Exception in thread Thread-81 (_worker_handler):
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.12/site-packages/pebble/concurrent/process.py", line 137, in _worker_handler
    if result.status == SUCCESS:
       ^^^^^^^^^^^^^
AttributeError: 'AttributeError' object has no attribute 'status'

The code I run is very-very similar to one of the examples that you provide:

logg = configure_logger(__name__)

@concurrent.process(timeout=300)
def main():
#some code


if __name__ == '__main__':
    while True:
        try:
            future = main()
            results = future.result()
        except TimeoutError as error:
            logg.info(f"Timeout {error.args[1]} seconds")
        except Exception as error:
            logg.error(error, exc_info=True)
            logg.info(f'There's an error {error.__repr__()}')
        
        time.sleep(1000)

Hello,

will try to reproduce the issue. What OS are you using?

I use official Python docker image, python:3.12.2-bookworm, linux amd64, which is Debian 12.5

Host is Ubuntu 23.10, usual DO droplet with 2 GB Memory, if that matters.

I'm testing the new version after d32346d and it seems to handle arising exception as it should ๐Ÿ‘ and overall program doesn't crash. (I have occasional error coming from google sheets API & gspread lib)

Looks good on my side, thank you!

Full error log just in case:

01.03.24 06:00:10 ERROR module: run __main__ func: <module> line: 57 msg: 'dict' object has no attribute 'text'
  โ”† File "/app/run.py", line 53, in <module>
  โ”†     49   if __name__ == '__main__':
  โ”†     50       while True:
  โ”†     51           try:
  โ”†     52               future = main()
  โ”† --> 53               results = future.result()
  โ”†     54           except TimeoutError as error:
  โ”†     ..................................................
  โ”†      __name__ = '__main__'
  โ”†      future = <ProcessFuture at 0x7f464b201c70 state=finished raised Attri
  โ”†                buteError>
  โ”†      results = None
  โ”†      future.result = <method 'Future.result' of <ProcessFuture at 0x7f464b201c70
  โ”†                       state=finished raised AttributeError> _base.py:428>
  โ”†      error = AttributeError("'dict' object has no attribute 'text'")
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 456, in result
  โ”†     428  def result(self, timeout=None):
  โ”†  (...)
  โ”†     452
  โ”†     453              if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
  โ”†     454                  raise CancelledError()
  โ”†     455              elif self._state == FINISHED:
  โ”† --> 456                  return self.__get_result()
  โ”†     457              else:
  โ”†     ..................................................
  โ”†      self = None
  โ”†      timeout = None
  โ”†      self._state = # AttributeError
  โ”†           self = None
  โ”†      CANCELLED = 'CANCELLED'
  โ”†      CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
  โ”†      FINISHED = 'FINISHED'
  โ”†      self.__get_result = # AttributeError
  โ”†           self = None
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
  โ”†     398  def __get_result(self):
  โ”†     399      if self._exception:
  โ”†     400          try:
  โ”† --> 401              raise self._exception
  โ”†     402          finally:
  โ”†     ..................................................
  โ”†      self = None
  โ”†      self._exception = # AttributeError
  โ”†           self = None
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/site-packages/pebble/concurrent/process.py", line 178, in _get_result
  โ”†     163  def _get_result(
  โ”†     164          future: ProcessFuture,
  โ”†     165          pipe: multiprocessing.Pipe,
  โ”†     166          timeout: float
  โ”†     167  ) -> Any:
  โ”†  (...)
  โ”†     174                  return Result(FAILURE, TimeoutError('Task Timeout', timeout))
  โ”†     175              if future.cancelled():
  โ”†     176                  return Result(FAILURE, CancelledError())
  โ”†     177
  โ”† --> 178          return pipe.recv()
  โ”†     179      except (EOFError, OSError):
  โ”†     ..................................................
  โ”†      future = <ProcessFuture at 0x7f464b201c70 state=finished raised Attri
  โ”†                buteError>
  โ”†      pipe = <multiprocessing.connection.Connection object at 0x7f4649be4
  โ”†              770>
  โ”†      multiprocessing.Pipe = <method 'BaseContext.Pipe' of <multiprocessing.context.Defau
  โ”†                              ltContext object at 0x7f4650658530> context.py:60>
  โ”†      timeout = 500
  โ”†      FAILURE = 1
  โ”†      future.cancelled = <method 'Future.cancelled' of <ProcessFuture at 0x7f464b201c
  โ”†                          70 state=finished raised AttributeError> _base.py:383>
  โ”†      pipe.recv = <method '_ConnectionBase.recv' of <multiprocessing.connectio
  โ”†                   n.Connection object at 0x7f4649be4770> connection.py:246>
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 251, in recv
  โ”†     246  def recv(self):
  โ”†     247      """Receive a (picklable) object"""
  โ”†     248      self._check_closed()
  โ”†     249      self._check_readable()
  โ”†     250      buf = self._recv_bytes()
  โ”† --> 251      return _ForkingPickler.loads(buf.getbuffer())
  โ”†     ..................................................
  โ”†      self = <multiprocessing.connection.Connection object at 0x7f4649be4
  โ”†              770>
  โ”†      self._check_closed = <method '_ConnectionBase._check_closed' of <multiprocessing.
  โ”†                            connection.Connection object at 0x7f4649be4770> connection.p
  โ”†                            y:135>
  โ”†      self._check_readable = <method '_ConnectionBase._check_readable' of <multiprocessin
  โ”†                              g.connection.Connection object at 0x7f4649be4770> connection
  โ”†                              .py:139>
  โ”†      buf = <_io.BytesIO object at 0x7f46491c19e0>
  โ”†      self._recv_bytes = <method 'Connection._recv_bytes' of <multiprocessing.connect
  โ”†                          ion.Connection object at 0x7f4649be4770> connection.py:429>
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 43, in __init__
  โ”†     42   def __init__(self, response: Response):
  โ”† --> 43       super().__init__(self._extract_text(response))
  โ”†     44       self.response: Response = response
  โ”†     ..................................................
  โ”†      self = APIError({'code': 503, 'message': 'The service is currently
  โ”†              unavailable.', 'status': 'UNAVAILABLE'})
  โ”†      response = {'code': 503,
  โ”†                  'message': 'The service is currently unavailable.',
  โ”†                  'status': 'UNAVAILABLE'}
  โ”†      self._extract_text = <method 'APIError._extract_text' of {'code': 503, 'message':
  โ”†                             'The service is currently unavailable.', 'status': 'UNAVAIL
  โ”†                            ABLE'} exceptions.py:46>
  โ”†      self.response = # AttributeError
  โ”†           self = APIError({'code': 503, 'message': 'The service is cur
  โ”†            rently unavailable.', 'status': 'UNAVAILABLE'})
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 49, in _extract_text
  โ”†     46   def _extract_text(
  โ”†     47       self, response: Response
  โ”†     48   ) -> Union[Dict[str, Union[int, str]], str]:
  โ”† --> 49       return self._text_from_detail(response) or response.text
  โ”†     ..................................................
  โ”†      self = APIError({'code': 503, 'message': 'The service is currently
  โ”†              unavailable.', 'status': 'UNAVAILABLE'})
  โ”†      response = {'code': 503,
  โ”†                  'message': 'The service is currently unavailable.',
  โ”†                  'status': 'UNAVAILABLE'}
  โ”†      self._text_from_detail = <method 'APIError._text_from_detail' of {'code': 503, 'messa
  โ”†                                ge': 'The service is currently unavailable.', 'status': 'UNA
  โ”†                                VAILABLE'} exceptions.py:51>
  โ”†      response.text = # AttributeError
  โ”†           response = {'code': 503,
  โ”†               'message': 'The service is currently unavailable.',
  โ”†               'status': 'UNAVAILABLE'}
  โ”†     ..................................................
  โ”†
  โ”† ---- (full traceback above) ----
  โ”† File "/app/run.py", line 53, in <module>
  โ”†     results = future.result()
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 456, in result
  โ”†     return self.__get_result()
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
  โ”†     raise self._exception
  โ”† File "/usr/local/lib/python3.12/site-packages/pebble/concurrent/process.py", line 178, in _get_result
  โ”†     return pipe.recv()
  โ”† File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 251, in recv
  โ”†     return _ForkingPickler.loads(buf.getbuffer())
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 43, in __init__
  โ”†     super().__init__(self._extract_text(response))
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 49, in _extract_text
  โ”†     return self._text_from_detail(response) or response.text
  โ”†
  โ”† AttributeError: 'dict' object has no attribute 'text'
```01.03.24 06:00:10 ERROR module: run __main__ func: <module> line: 57 msg: 'dict' object has no attribute 'text'
  โ”† File "/app/run.py", line 53, in <module>
  โ”†     49   if __name__ == '__main__':
  โ”†     50       while True:
  โ”†     51           try:
  โ”†     52               future = main()
  โ”† --> 53               results = future.result()
  โ”†     54           except TimeoutError as error:
  โ”†     ..................................................
  โ”†      __name__ = '__main__'
  โ”†      future = <ProcessFuture at 0x7f464b201c70 state=finished raised Attri
  โ”†                buteError>
  โ”†      results = None
  โ”†      future.result = <method 'Future.result' of <ProcessFuture at 0x7f464b201c70
  โ”†                       state=finished raised AttributeError> _base.py:428>
  โ”†      error = AttributeError("'dict' object has no attribute 'text'")
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 456, in result
  โ”†     428  def result(self, timeout=None):
  โ”†  (...)
  โ”†     452
  โ”†     453              if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
  โ”†     454                  raise CancelledError()
  โ”†     455              elif self._state == FINISHED:
  โ”† --> 456                  return self.__get_result()
  โ”†     457              else:
  โ”†     ..................................................
  โ”†      self = None
  โ”†      timeout = None
  โ”†      self._state = # AttributeError
  โ”†           self = None
  โ”†      CANCELLED = 'CANCELLED'
  โ”†      CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
  โ”†      FINISHED = 'FINISHED'
  โ”†      self.__get_result = # AttributeError
  โ”†           self = None
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
  โ”†     398  def __get_result(self):
  โ”†     399      if self._exception:
  โ”†     400          try:
  โ”† --> 401              raise self._exception
  โ”†     402          finally:
  โ”†     ..................................................
  โ”†      self = None
  โ”†      self._exception = # AttributeError
  โ”†           self = None
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/site-packages/pebble/concurrent/process.py", line 178, in _get_result
  โ”†     163  def _get_result(
  โ”†     164          future: ProcessFuture,
  โ”†     165          pipe: multiprocessing.Pipe,
  โ”†     166          timeout: float
  โ”†     167  ) -> Any:
  โ”†  (...)
  โ”†     174                  return Result(FAILURE, TimeoutError('Task Timeout', timeout))
  โ”†     175              if future.cancelled():
  โ”†     176                  return Result(FAILURE, CancelledError())
  โ”†     177
  โ”† --> 178          return pipe.recv()
  โ”†     179      except (EOFError, OSError):
  โ”†     ..................................................
  โ”†      future = <ProcessFuture at 0x7f464b201c70 state=finished raised Attri
  โ”†                buteError>
  โ”†      pipe = <multiprocessing.connection.Connection object at 0x7f4649be4
  โ”†              770>
  โ”†      multiprocessing.Pipe = <method 'BaseContext.Pipe' of <multiprocessing.context.Defau
  โ”†                              ltContext object at 0x7f4650658530> context.py:60>
  โ”†      timeout = 500
  โ”†      FAILURE = 1
  โ”†      future.cancelled = <method 'Future.cancelled' of <ProcessFuture at 0x7f464b201c
  โ”†                          70 state=finished raised AttributeError> _base.py:383>
  โ”†      pipe.recv = <method '_ConnectionBase.recv' of <multiprocessing.connectio
  โ”†                   n.Connection object at 0x7f4649be4770> connection.py:246>
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 251, in recv
  โ”†     246  def recv(self):
  โ”†     247      """Receive a (picklable) object"""
  โ”†     248      self._check_closed()
  โ”†     249      self._check_readable()
  โ”†     250      buf = self._recv_bytes()
  โ”† --> 251      return _ForkingPickler.loads(buf.getbuffer())
  โ”†     ..................................................
  โ”†      self = <multiprocessing.connection.Connection object at 0x7f4649be4
  โ”†              770>
  โ”†      self._check_closed = <method '_ConnectionBase._check_closed' of <multiprocessing.
  โ”†                            connection.Connection object at 0x7f4649be4770> connection.p
  โ”†                            y:135>
  โ”†      self._check_readable = <method '_ConnectionBase._check_readable' of <multiprocessin
  โ”†                              g.connection.Connection object at 0x7f4649be4770> connection
  โ”†                              .py:139>
  โ”†      buf = <_io.BytesIO object at 0x7f46491c19e0>
  โ”†      self._recv_bytes = <method 'Connection._recv_bytes' of <multiprocessing.connect
  โ”†                          ion.Connection object at 0x7f4649be4770> connection.py:429>
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 43, in __init__
  โ”†     42   def __init__(self, response: Response):
  โ”† --> 43       super().__init__(self._extract_text(response))
  โ”†     44       self.response: Response = response
  โ”†     ..................................................
  โ”†      self = APIError({'code': 503, 'message': 'The service is currently
  โ”†              unavailable.', 'status': 'UNAVAILABLE'})
  โ”†      response = {'code': 503,
  โ”†                  'message': 'The service is currently unavailable.',
  โ”†                  'status': 'UNAVAILABLE'}
  โ”†      self._extract_text = <method 'APIError._extract_text' of {'code': 503, 'message':
  โ”†                             'The service is currently unavailable.', 'status': 'UNAVAIL
  โ”†                            ABLE'} exceptions.py:46>
  โ”†      self.response = # AttributeError
  โ”†           self = APIError({'code': 503, 'message': 'The service is cur
  โ”†            rently unavailable.', 'status': 'UNAVAILABLE'})
  โ”†     ..................................................
  โ”†
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 49, in _extract_text
  โ”†     46   def _extract_text(
  โ”†     47       self, response: Response
  โ”†     48   ) -> Union[Dict[str, Union[int, str]], str]:
  โ”† --> 49       return self._text_from_detail(response) or response.text
  โ”†     ..................................................
  โ”†      self = APIError({'code': 503, 'message': 'The service is currently
  โ”†              unavailable.', 'status': 'UNAVAILABLE'})
  โ”†      response = {'code': 503,
  โ”†                  'message': 'The service is currently unavailable.',
  โ”†                  'status': 'UNAVAILABLE'}
  โ”†      self._text_from_detail = <method 'APIError._text_from_detail' of {'code': 503, 'messa
  โ”†                                ge': 'The service is currently unavailable.', 'status': 'UNA
  โ”†                                VAILABLE'} exceptions.py:51>
  โ”†      response.text = # AttributeError
  โ”†           response = {'code': 503,
  โ”†               'message': 'The service is currently unavailable.',
  โ”†               'status': 'UNAVAILABLE'}
  โ”†     ..................................................
  โ”†
  โ”† ---- (full traceback above) ----
  โ”† File "/app/run.py", line 53, in <module>
  โ”†     results = future.result()
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 456, in result
  โ”†     return self.__get_result()
  โ”† File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
  โ”†     raise self._exception
  โ”† File "/usr/local/lib/python3.12/site-packages/pebble/concurrent/process.py", line 178, in _get_result
  โ”†     return pipe.recv()
  โ”† File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 251, in recv
  โ”†     return _ForkingPickler.loads(buf.getbuffer())
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 43, in __init__
  โ”†     super().__init__(self._extract_text(response))
  โ”† File "/usr/local/lib/python3.12/site-packages/gspread/exceptions.py", line 49, in _extract_text
  โ”†     return self._text_from_detail(response) or response.text
  โ”†
  โ”† AttributeError: 'dict' object has no attribute 'text'

Issue fixed in release 5.0.7, please re-open this issue if the problem persists.