pR0Ps/zipstream-ng

Support for async

Opened this issue ยท 8 comments

Hi there,

Your project looks very interesting. I've been using https://github.com/kbbdy/zipstream so far to do synchronous and asynchronous zip streaming. However, I've been severely limited by the size restriction imposed by the Zip32 format. Also, the project seems abandoned. So my questions are:

  • Is Zip64 already supported? I couldn't find info on this from the README.
  • Are you planning to support async?

Thank you for your answers.

pR0Ps commented

Yes, Zip64 extensions are supported and automatically used if required (can be due to the size or amount of files).

Async support is something that I would definitely like to add, I just haven't done it because I haven't had a need for it myself yet. Also because I don't use a lot of async Python I want to make sure I'm familiar with the details of it first.

Cool, just to give you a use-case where async is very nice: When running an asynchronous API, for example, with FastAPI and I want to pack up a bunch of files into one archive for easy download. Both the streaming and async are very convenient since doing it synchronously would block one of the workers until the download is complete.

@pR0Ps Are we planning to implement the Async version? I would very much like to contribute on this.

Yes, I'm still planing to support async.

I had a deeper look into making the library fully async-compatible at one point and kinda bounced off due to a combination of not being super happy with the state of async file IO in Python and what it would take to implement both a sync and async interface for the library. I'll write up some of my thoughts here for reference.

For file IO, there isn't an asyncio-provided method to do it and the library that seems most-used for it (aiofiles) essentially wraps file IO calls to run them in a threadpool. This makes it easy to use for aysnc file IO, but there's no inherent benefit to being async there since it's just a wrapper.

As far as I can tell, the IO-related functions that are required to be made async to support a fully-async interface for zipstream-ng are:

  • open
  • BufferedReader.read
  • os.stat
  • os.walk
  • os.path.exists
  • os.path.isdir

All of those except os.walk are provided by aiofiles.

Even with all of those interfaces fully implemented by something like aiofiles, the code would have to be restructured in such a way that each function that uses any of the above functions would have to be made async as well. This essentially means that almost every function provided by the ZipStream class will have to be async since they mostly all use one of the above functions at some point.

Given that, the sync interface would either have to be its own thing, or it would have to wrap the async interface. I'm not a huge fan of implementing it using a generic async --> sync wrapper because it would mean that the sync interface would involve translating through two wrappers (sync file io --> async file io, async function call --> sync function call). I'd feel differently if the IO itself was natively asynchronous since using a wrapper would only involve a single (required) translation to a synchronous API.

As for making it its own thing, I don't really want to maintain two implementations of basically the same thing. This is essentially what the mentioned kbbdy/zipstream project does, but because its interface is much smaller (you're expected to build the list of files to zip yourself first and the only external interface is basically .stream()), it's not as big of an issue.

Moving forward

With all that being said, I recognize the usefulness of having the library expose an async interface and don't want "perfect" to be the enemy of "working".

I think a reasonable compromise (at least for now) would be to provide some sort of higher-level sync --> async wrapper over the interface that exposes enough of the functionality so that common tasks are easy in an async context without having to deal with rewriting/refactoring the entire thing.

I've started a feature/async with a first draft of this higher-level wrapping. Feel free to test it and suggest improvements!

Have you looked at the anyio library? It makes async work quite convenient and you can send sync tasks into a thread or process pool and conversely async tasks into the async loop from a synchronous call.

I envisioned adding the capability to include remotely located files using methods like add_url, which would internally download the byte stream asynchronously. Alternatively, a more general function like add_async_stream could be used, accepting an async generator as an argument. While this functionality doesn't need to be part of the main repository, it could be offered as an extension. To fully leverage the asynchronous nature of these operations, the library itself would need to be asynchronous. Making the library async-compatible would also facilitate the integration of similar future extensions.

Thanks for taking the initiatives ๐Ÿš€.

@Midnighter I had a look and unless I'm misunderstanding, it seems like anyio is more of a translation layer to allow using asyncio and trio async backends, not something that would help with converting a synchronous interface to async. I've already used the asyncio.to_thread functionality to send synchronous tasks to a threadpool to expose sync functions as async versions on the feature/async branch. Please let me know if I've overlooked something though.

@alenpaulvarghese Yes, being able to support adding asynchronous iterators is something that I think is required before I would declare the library async-compatible. I attempted this with the following diff against the feature/async branch, but couldn't get it to work (ran into what seems like a deadlock - if you have suggestions, please let me know).

diff --git a/zipstream/ng.py b/zipstream/ng.py
index 37c603d..c761b1b 100644
--- a/zipstream/ng.py
+++ b/zipstream/ng.py
@@ -1249,6 +1249,15 @@ async def _to_async_iter(it):
                 break
             yield x

+    def _to_sync_iter(it, loop):
+        i = it.__aiter__()
+        while True:
+            try:
+                value = asyncio.run_coroutine_threadsafe(i.__anext__(), loop).result()
+            except StopAsyncIteration:
+                break
+            yield value
+
     def _make_delegate_call(name):
         @functools.wraps(getattr(ZipStream, name))
         def method(self, *args, **kwargs):
@@ -1318,4 +1327,6 @@ async def add_path(self, *args, **kwargs):

         @functools.wraps(ZipStream.add)
         async def add(self, data, *args, **kwargs):
+            if hasattr(data, "__aiter__"):
+                data = _to_sync_iter(data, asyncio.get_running_loop())
             return await to_thread(self._zip.add, data, *args, **kwargs)

I'd leave specific methods like add_url up to the users of the library though, as that's a bit out of scope. Ideally you would use your library of choice to manage all the HTTP stuff, and just pass this library the async generator that yields back the content. Something like:

zs = ZipStream()
async with get_my_url(some_url) as response:
    await zs.add(response.async_stream())

@pR0Ps I couldn't replicate the deadlock myself. Here is the code I tested, and it worked flawlessly.

from zipstream import AsyncZipStream
import httpx
import asyncio


async def iterator():
    client = httpx.AsyncClient()
    async with client.stream("GET", "https://mirror.nforce.com/pub/speedtests/10mb.bin") as response:
        async for chunk in response.aiter_bytes():
            yield chunk


async def main():
    async_zip = AsyncZipStream()
    await async_zip.add(iterator(), arcname="10mb.bin")
    with open("10mb.bin.zip", "wb") as fp:
        async for chunk in async_zip:
            fp.write(chunk)


asyncio.run(main())