[Bug] Worker hangs after polling workflow task queue
Opened this issue · 9 comments
What are you really trying to do?
To connect a worker to a Temporal server with an authorization
header.
Describe the bug
Starting a worker with an incorrect token, causing the server to respond with Request unauthorized
, causes the worker to hang indefinitely.
Minimal Reproduction
import asyncio
from temporalio import activity, client, workflow, worker
@activity.defn
async def a() -> None:
pass
@workflow.defn
class Workflow:
@workflow.run
async def run(self) -> None:
pass
async def main():
c = await client.Client.connect(
"my_temporal_host:7233",
rpc_metadata={"authorization": "wrong_token"},
tls=True,
)
w = worker.Worker(
c,
task_queue="default",
activities=[a],
workflows=[Workflow],
)
await w.run()
if __name__ == "__main__":
asyncio.run(main())
Output:
2024-09-03T07:26:48.707Z [temporal-worker] 2024-09-03T07:26:48.707200Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 52 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:26:48 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:26:49.544Z [temporal-worker] 2024-09-03T07:26:49.544345Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 52 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:26:49 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:27:51.707Z [temporal-worker] 2024-09-03T07:27:51.707200Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 53 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:27:51 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:27:52.968Z [temporal-worker] 2024-09-03T07:27:52.967933Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 52 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:27:52 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:00.036Z [temporal-worker] 2024-09-03T07:28:00.036155Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 53 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:00 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:00.687Z [temporal-worker] 2024-09-03T07:28:00.687484Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 53 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:00 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:03.579Z [temporal-worker] 2024-09-03T07:28:03.578940Z WARN temporal_sdk_core::worker::workflow::wft_poller: Error while polling for workflow tasks error=Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:03.970Z [temporal-worker] 2024-09-03T07:28:03.583738Z ERROR temporal_sdk_core::worker::workflow::workflow_stream: Workflow processing encountered fatal error and must shut down TonicError(Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None })
2024-09-03T07:28:03.970Z [temporal-worker] 2024-09-03T07:28:03.969933Z WARN temporal_sdk_core::worker::workflow::wft_poller: Error while polling for workflow tasks error=Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:03.970Z [temporal-worker] 2024-09-03T07:28:03.969960Z ERROR temporal_sdk_core::worker::workflow::workflow_stream: Workflow processing encountered fatal error and must shut down TonicError(Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None })
2024-09-03T07:28:04.023Z [temporal-worker] Worker failed, shutting down
2024-09-03T07:28:04.023Z [temporal-worker] Traceback (most recent call last):
2024-09-03T07:28:04.023Z [temporal-worker] File "/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 143, in run
2024-09-03T07:28:04.023Z [temporal-worker] act = await self._bridge_worker().poll_workflow_activation()
2024-09-03T07:28:04.023Z [temporal-worker] File "/lib/python3.10/site-packages/temporalio/bridge/worker.py", line 141, in poll_workflow_activation
2024-09-03T07:28:04.023Z [temporal-worker] await self._ref.poll_workflow_activation()
2024-09-03T07:28:04.023Z [temporal-worker] RuntimeError: Poll failure: Unhandled grpc error when workflow polling: Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:04.023Z [temporal-worker]
2024-09-03T07:28:04.023Z [temporal-worker] The above exception was the direct cause of the following exception:
2024-09-03T07:28:04.023Z [temporal-worker]
2024-09-03T07:28:04.023Z [temporal-worker] Traceback (most recent call last):
2024-09-03T07:28:04.023Z [temporal-worker] File "/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 153, in run
2024-09-03T07:28:04.023Z [temporal-worker] raise RuntimeError("Workflow worker failed") from err
2024-09-03T07:28:04.023Z [temporal-worker] RuntimeError: Workflow worker failed
Environment/Versions
- Temporal Version: Server v1.23.1, SDK v1.7.0
Additional context
Related issue: #459
causes the worker to hang indefinitely.
We do check that the worker can connect to the namespace using "describe namespace". So you have a token that works on some calls but not others? Is this self-hosted or cloud?
The stack trace seems to be showing it raising an exception. Are you sure that worker.run()
does not raise an exception here? It does take a minute because we retry all polling errors just in case they are spurious.
This issue is two-fold, I reported the one relevant to the SDK here:
- I have a worker which sends the correct token to a self-hosted Temporal server (v1.23.1). There is an issue at the ingress level which sometimes causes the server to respond with a
504 Bad Gateway
. - As the worker continues to receive this response from the ingress, it exhibits the behavior reported in this bug, eventually hanging and failing to re-connect.
The second issue is what i'm reporting here. The worker ends up raising a RuntimeError
and not recovering from it.
The worker ends up raising a RuntimeError and not recovering from it.
Some client errors we can detect as recoverable. For ones we can't, we still try to recover for a minute before failing the worker. We intentionally fail the worker instead of letting it operate in a failed state on something that is not quickly/obviously recoverable. Can you confirm whether worker.run()
does or does not raise an exception here? We start a worker shutdown, but a worker shutdown sends cancellation to activities and has to wait for activities to complete (see https://github.com/temporalio/sdk-python?tab=readme-ov-file#worker-shutdown).
I'm not sure if you're referring to something different, but the logs shared in the original bug report show that worker.run()
is raising a RuntimeError
.
Ah, I see it in the trace now. This is intentional behavior. A fatal error (or at least one we can't tell is non-fatal) that doesn't fix itself after a minute will cause the worker to fail and shutdown instead of pretending to work silently while continuing to fail. You may restart the worker if you wish, though many prefer not to blindly restart but rather investigate.
In our case the worker is failing after a number of intermittent network issues, so we would rather it restarts instead. Is there any recommendation around how to restarting the worker? I tried a simple while True
loop to catch the RuntimeError
exception raised and re-run await worker.run()
, which seemed to complain (I don't have the logs from that right now but I can get them).
I tried a simple
while True
loop
This should work. Similarly you can consider having whatever is monitoring the process/pod/container do the restart at an outer level. I would recommend at least also alerting or something on fatal worker error or you won't know your worker isn't working.
I did try the following to retry the worker:
while True:
try:
await worker.run()
except RuntimeError as e:
print(f"RuntimeError caught: {e}. Retrying...")
await asyncio.sleep(5) # wait for 5 seconds before retrying
except Exception as e:
print(f"An unexpected error occurred: {e}")
raise
After running for a while, the worker crashed with the following error when trying to restart:
File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 143, in run
act = await self._bridge_worker().poll_workflow_activation()
File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/bridge/worker.py", line 141, in poll_workflow_activation
await self._ref.poll_workflow_activation()
RuntimeError: Poll failure: Unhandled grpc error when workflow polling: Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Thu, 05 Sep 2024 09:43:21 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 153, in run
raise RuntimeError("Workflow worker failed") from err
RuntimeError: Workflow worker failed
RuntimeError caught: Workflow worker failed. Retrying...
An unexpected error occurred: 'NoneType' object has no attribute 'validate'
Traceback (most recent call last):
File "/home/kelkawi/Desktop/CanonicalRepos/hr-automation/boarding/worker.py", line 219, in <module>
asyncio.run(run_worker())
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/home/kelkawi/Desktop/CanonicalRepos/hr-automation/boarding/worker.py", line 207, in run_worker
await worker.run()
File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/worker/_worker.py", line 478, in run
await self._bridge_worker.validate()
File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/bridge/worker.py", line 133, in validate
await self._ref.validate()
AttributeError: 'NoneType' object has no attribute 'validate'
A worker is meant for one run/shutdown. If you want to run a new worker you will need to recreate it again. Unfortunately some validation we added is happening before this check, we will fix that.