fsspec/s3fs

Confused about using s3fs asynchronously

pb222xd opened this issue · 7 comments

After reading s3fs' documentation on asynchronous development https://s3fs.readthedocs.io/en/latest/#async, I tried a lot but only the following code seems to work

import asyncio
from s3fs import S3FileSystem


async def main():
    s3 = S3FileSystem(key="sA3HIztxVB31tvWV9uQv",
                      secret="up8OZN7yje6I4HK6seNkXfccdSzmQA86PudmkfpR",
                      client_kwargs={'endpoint_url': "http://127.0.0.1:9000",
                                     'region_name': "auto"})
    session = await s3.set_session()
    # noinspection PyProtectedMember
    await s3._put("test.txt", "test2.txt")  # pylint: disable=protected-access
    await session.close()


if __name__ == '__main__':
    asyncio.run(main())

Is there any potential problem with my implementation? Thank you very much.

Your implementation looks good.

🫡

Sorry to bother you, but there is another point that I don’t quite understand. What is the difference between the two?

class MyS3:
    def __init__(self):
        self.s3 = S3FileSystem(key="AMIECjhoqrJSQZw9zl3S",
                               secret="SNNcoLOzxG8fX9rTjXABaiBkMlDfxAiWkQGmTH5N",
                               client_kwargs={'endpoint_url': "http://127.0.0.1:9000",
                                              'region_name': "auto"})
        self.session = None

    async def __aenter__(self):
        self.session = await self.s3.set_session(refresh=True)
        return self.s3

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()


class MyS3:
    def __init__(self):
        self.s3 = S3FileSystem(key="AMIECjhoqrJSQZw9zl3S",
                               secret="SNNcoLOzxG8fX9rTjXABaiBkMlDfxAiWkQGmTH5N",
                               client_kwargs={'endpoint_url': "http://127.0.0.1:9000",
                                              'region_name': "auto"})
        self.session = None

    async def __aenter__(self):
        self.session = await self.s3.set_session()
        return self.s3

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

Thank you very much for answering my question

You mean omitting await self.session.close() ? This should ideally happen automatically for you when the instance gets cleaned up, but because that can happen in any thread and potentially at interpreter shutdown time, you may see (harmless) warnings about unclosed sockets and not run coroutines.

I noticed:

if self._s3 is not None and not refresh:

In other words, I can reuse the session like the second code without calling session.close( ) and let session be garbage collected?
Thank you again for your patient answer

Correct, if you don't want to manage instances yourself, you can reuse them. fsspec caches all instances unless explicitly avoided (such as skip_instance_cache=True), so if you use the same init arguments, you get exactly the same object back, and can expect the connection pool etc to still be there. It's up to you whether you want to use the context formalism for state hygene or not.

Thank you very much again🫡