replicate/replicate-python

Async Error: RuntimeError: Event loop is closed

Leoputera2407 opened this issue · 1 comments

Hi,

I'm encountering bug when I run an async_run call in a loop. I'm running the SDXL model for my task, and I got this bug

 |   File "/Users/Laputa9072/.pyenv/versions/3.11.4/lib/python3.11/asyncio/runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/Users/Laputa9072/.pyenv/versions/3.11.4/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/video_enhancer.py", line 380, in create_images_async
  |     async with asyncio.TaskGroup() as tg:
  |   File "/Users/Laputa9072/.pyenv/versions/3.11.4/lib/python3.11/asyncio/taskgroups.py", line 147, in __aexit__
  |     raise me from None
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/openai_wrapper.py", line 289, in create_image
    |     response = await replicate.async_run(
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/replicate/client.py", line 153, in async_run
    |     return await async_run(self, ref, input, **params)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/replicate/run.py", line 84, in async_run
    |     prediction = await client.predictions.async_create(
    |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/replicate/prediction.py", line 311, in async_create
    |     resp = await self._client._async_request(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/replicate/client.py", line 84, in _async_request
    |     resp = await self._async_client.request(method, path, **kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1530, in request
    |     return await self.send(request, auth=auth, follow_redirects=follow_redirects)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1617, in send
    |     response = await self._send_handling_auth(
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
    |     response = await self._send_handling_redirects(
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
    |     response = await self._send_single_request(request)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpx/_client.py", line 1719, in _send_single_request
    |     response = await transport.handle_async_request(request)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/replicate/client.py", line 253, in handle_async_request
    |     response = await self._wrapped_transport.handle_async_request(request)  # type: ignore
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 366, in handle_async_request
    |     resp = await self._pool.handle_async_request(req)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 234, in handle_async_request
    |     await self._close_expired_connections()
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 195, in _close_expired_connections
    |     await connection.aclose()
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpcore/_async/connection.py", line 173, in aclose
    |     await self._connection.aclose()
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpcore/_async/http11.py", line 253, in aclose
    |     await self._network_stream.aclose()
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 54, in aclose
    |     await self._stream.aclose()
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 193, in aclose
    |     await self.transport_stream.aclose()
    |   File "/Users/Laputa9072/Downloads/tiktok-content-creator/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1261, in aclose
    |     self._transport.close()
    |   File "/Users/Laputa9072/.pyenv/versions/3.11.4/lib/python3.11/asyncio/selector_events.py", line 860, in close
    |     self._loop.call_soon(self._call_connection_lost, None)
    |   File "/Users/Laputa9072/.pyenv/versions/3.11.4/lib/python3.11/asyncio/base_events.py", line 761, in call_soon
    |     self._check_closed()
    |   File "/Users/Laputa9072/.pyenv/versions/3.11.4/lib/python3.11/asyncio/base_events.py", line 519, in _check_closed
    |     raise RuntimeError('Event loop is closed')
    | RuntimeError: Event loop is closed
    +------------------------------------    

Here's my code

async def create_image(prompt):
    prompt += " ultra-realistic style that is true-to-life. Natural and believable composition dreamy glow.dslr, ultra quality, sharp focus, tack sharp, dof, film grain, Fujifilm XT3, crystal clear, 8K UHD"
    # Assuming client.chat.completions.create is a function call to an API or service that processes the request
    response = await replicate.async_run(
        "stability-ai/sdxl:39ed52f2a78e934b3ba6e2a89f5b1c712de7dfea535525255b1aa35c5565e08b",
        input={
            "width": 1152,
            "height": 648,
            "prompt": prompt,
            "refine": "expert_ensemble_refiner",
            "scheduler": "KarrasDPM",
            "lora_scale": 0.0,
            "num_outputs": 1,
            "guidance_scale": 7.5,
            "apply_watermark": False,
            "high_noise_frac": 0.8,
            "prompt_strength": 1,
            "negative_prompt": "words, letters, text, writing, disfigured, ugly, bad, immature, cartoon, fingers",
            "num_inference_steps": 42
        }
    )
    image_url = response[0]  # Adjusted to access the URL correctly

    async with aiohttp.ClientSession() as session:
        async with session.get(image_url) as resp:
            if resp.status == 200:
                folder = os.path.join(os.getcwd(), 'images')
                if not os.path.exists(folder):
                    os.makedirs(folder)
                
                # Generate a random file name
                filename = os.path.join(folder, str(random.randint(0, 10000)) + '.jpg')
                
                # Write the content asynchronously
                with open(filename, 'wb') as out_file:
                    while True:
                        chunk = await resp.content.read(1024)
                        if not chunk:
                            break
                        out_file.write(chunk)

                print(f"Image successfully downloaded: {filename}")
                return filename
            else:
                # Handle error or unsuccessful status
                print(f"Failed to download image, status code: {resp.status}")
                return None


async def create_images_async(prompts):
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(
                openai_wrapper.create_image(prompt) 
            ) for prompt in prompts
        ]

    filenames = await asyncio.gather(*tasks)
    return filenames


prompts = ["stuff animals", "cool food"]
for _ in range(2):
      asyncio.run(create_images_async(prompts))

It works on the first loop of the range, however, when it ran on the second iteration of the loop. It always failed.

We've tried to implement a graceful shutdown as mentioned here (https://stackoverflow.com/questions/73418725/python-3-10-5-asyncio-runtimeerror-event-loop-is-closed), and also a func wrapper that suppresses those runtime errors(https://pythonalgos.com/2022/01/09/runtimeerror-event-loop-is-closed-asyncio-fix/) -- although this seems to be more of a Windows issue as written in the article. Any ideas on how to fix them?

I'm running on

replicate==0.20.0
python version 3.11.4
Darwin MacBook-Pro.local 20.6.0 Darwin Kernel Version 20.6.0

Hi @Leoputera2407. I don't see anything to indicate a problem in the Replicate library itself. The error you're seeing happens because asyncio.run() is designed to create a new event loop and close it at the end of the call. When you call it multiple times in a loop, it tries to create a new event loop after the previous one has been closed.

I can't help you debug your async code, but my recommendation would be to start small with something that works, like the async sample code in this project's README, and then iterate towards a full, working solution.