qubole/rubix

S3 connection leak with PrestoS3FileSystem

Opened this issue · 1 comments

S3 connections can leak in the case CachingInputStream is reading data via DirectReadRequestChain. The general flow of events in such read is:

  • CachingInputStream gets a read
  • It then creates a DirectReadReadRequestChain (along with other Chains), passing a PrestoS3InputStream object to it
  • It then executes this DirectReadReadRequestChain in the new thread

Now if query is cancelled while DirectReadReadRequestChain is in progress then following happens on presto Driver thread:

  • CachingInputStream.readInternal gets InterruptedException on Future.get()
  • CachingInputStream marks cancelled on DirectReadReadRequestChain in response and returns by rethrowing this exception
  • Eventually recordreader closes the CachingInputStream
  • CachingInputStream.close calls close() on PrestoS3InputStream object passed to DirectReadReadRequestChain
  • PrestoS3InputStream.close closes s3Stream if it is not null

What can happen in parallel to this PrestoS3InputStream.close, in the thread executing DirectReadReadRequestChain:

  • DirectReadReadRequestChain execution might have crossed over the check for cancelled in current iteration and issued read on PrestoS3InputStream
  • This read on PrestoS3InputStream could need to close and reopen existing s3Stream via PrestoS3InputStream.seekStream due to long jump in read position
  • This method does it as follows: close s3Stream -> set s3Stream to null -> open new s3Stream

A race between these two threads can cause PrestoS3InputStream.close to execute (in Driver thread) when s3Stream is set null by seekStream (in DirectReadReadRequestChain thread).
Which means the underlying s3Stream created by seekStream in DirectReadReadRequestChain thread never gets closed as DirectReadReadRequestChain expects CachingInputStream
to close the underlying streams during its close().

The fix for this is to let the Chains open and close stream themselves. Tried this and could see no connections are leaked.

The fix for this is to let the Chains open and close stream themselves: while this solves the issue, this can have impact on performance when streaming reads api is used for sequential read. In such case, in current code we will continue using the same InputStream to read from remote across multiple invocations of read call but with the change to let Chains open and close stream themselves we would open new Stream for every read.