laughingman7743/PyAthena

EntityTooSmall when calling the CompleteMultipartUpload operation

vikalfc opened this issue · 7 comments

Hi, after closing this issue #483, I've increased my PyAthena version to 3.8.2.

But now I'm having trouble when I want to use data.to_csv(), and the data is large, resulting in the following error:
"An error occurred (EntityTooSmall) when calling the CompleteMultipartUpload operation: Your proposed upload is smaller than the minimum allowed size."

Note: My file is around 60.1 MB, and I had no trouble writing with version 2.17.0.

Can you please check? TY :)

I have checked the CSV upload with the following simple code and it is fine.
https://github.com/laughingman7743/PyAthena/blob/master/tests/pyathena/filesystem/test_s3.py#L736-L747

import logging
import uuid

import fsspec
import pandas


logging.getLogger("pyathena").addHandler(logging.StreamHandler())
logging.getLogger("pyathena").setLevel(logging.DEBUG)

fsspec.register_implementation("s3", "pyathena.filesystem.s3.S3FileSystem", clobber=True)
fsspec.register_implementation("s3a", "pyathena.filesystem.s3.S3FileSystem", clobber=True)

bucket = "laughingman7743-athena"

# small file
path_small = f"s3://{bucket}/tmp/{uuid.uuid4()}"
df = pandas.DataFrame({"a": [1] * (100 * 2**20), "b": [2] * (100 * 2**20)})
df.to_csv(path_small, index=False)

# large file
path_large = f"s3://{bucket}/tmp/{uuid.uuid4()}"
df = pandas.DataFrame({"a": [1] * (100 * 2 ** 20), "b": [2] * (100 * 2 ** 20)})
df.to_csv(path_large, index=False)

Please share the code that reproduces the error and the stack trace of the error.

Versions

pandas==2.2.2
PyAthena==3.8.2
s3fs==2024.5.0

And just note the same code with PyAthena==2.17.0 is working just fine

Okay, so I have connection

from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
bucket = "my_test_bucket"
region_name = "eu-central-1"
cursor = connect(
    s3_staging_dir=f"s3://{bucket}/tmp/outputs/,
    region_name=region_name,
    cursor_class=PandasCursor,
).cursor(schema_name="my_schema")

I do some query

data = cursor.execute("""
SELECT 
    column_1,
    column_2,
    column_3,
    column_4,
    column_5,
    column_6,
    column_7,
    column_8,
    column_9,
    column_10,
    column_11,
    column_12,
    column_13,
    column_14,
    column_15,
    column_16,
    column_17,
    FROM my_columns AS l
    LEFT JOIN my_extra_columns AS cm ON l.id=cm.id
    LEFT JOIN my_extra_extra_columns AS rel ON l.id=rel.id
""").as_pandas()

I got my data with shape
719303 rows × 17 columns

This is what I'm doing after

data.to_csv(f"s3://{bucket}/tmp/exports/test.csv")

And here is what I got

ClientError                               Traceback (most recent call last)
/var/folders/7h/vnq03l3s05q6p4pdzmhdnk200000gn/T/ipykernel_21465/3217657796.py in <module>
----> 1 data.to_csv("s3://my_test_bucket/tmp/exports/test.csv")

~/opt/anaconda3/lib/python3.9/site-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
    331                     stacklevel=find_stack_level(),
    332                 )
--> 333             return func(*args, **kwargs)
    334
    335         # error: "Callable[[VarArg(Any), KwArg(Any)], Any]" has no

~/opt/anaconda3/lib/python3.9/site-packages/pandas/core/generic.py in to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, lineterminator, chunksize, date_format, doublequote, escapechar, decimal, errors, storage_options)
   3965         )
   3966
-> 3967         return DataFrameRenderer(formatter).to_csv(
   3968             path_or_buf,
   3969             lineterminator=lineterminator,

~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/formats/format.py in to_csv(self, path_or_buf, encoding, sep, columns, index_label, mode, compression, quoting, quotechar, lineterminator, chunksize, date_format, doublequote, escapechar, errors, storage_options)
   1012             formatter=self.fmt,
   1013         )
-> 1014         csv_formatter.save()
   1015
   1016         if created_buffer:

~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/formats/csvs.py in save(self)
    268             )
    269
--> 270             self._save()
    271
    272     def _save(self) -> None:

~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/common.py in __exit__(self, exc_type, exc_value, traceback)
    155         traceback: TracebackType | None,
    156     ) -> None:
--> 157         self.close()
    158
    159

~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/common.py in close(self)
    142             self.created_handles.remove(self.handle)
    143         for handle in self.created_handles:
--> 144             handle.close()
    145         self.created_handles = []
    146         self.is_wrapped = False

~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in close(self)
   1021
   1022     def close(self) -> None:
-> 1023         super().close()
   1024         self._executor.shutdown()
   1025

~/opt/anaconda3/lib/python3.9/site-packages/fsspec/spec.py in close(self)
   1947         else:
   1948             if not self.forced:
-> 1949                 self.flush(force=True)
   1950
   1951             if self.fs is not None:

~/opt/anaconda3/lib/python3.9/site-packages/fsspec/spec.py in flush(self, force)
   1811                 raise
   1812
-> 1813         if self._upload_chunk(final=force) is not False:
   1814             self.offset += self.buffer.seek(0, 2)
   1815             self.buffer = io.BytesIO()

~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in _upload_chunk(self, final)
   1098
   1099         if self.autocommit and final:
-> 1100             self.commit()
   1101         return True
   1102

~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in commit(self)
   1131                 )
   1132             parts.sort(key=lambda x: x["PartNumber"])
-> 1133             self.fs._complete_multipart_upload(
   1134                 bucket=self.bucket,
   1135                 key=self.key,

~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in _complete_multipart_upload(self, bucket, key, upload_id, parts, **kwargs)
    930
    931         _logger.debug(f"Complete multipart upload {upload_id} to s3://{bucket}/{key}.")
--> 932         response = self._call(
    933             self._client.complete_multipart_upload,
    934             **request,

~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in _call(self, method, **kwargs)
    942         else:
    943             func = method
--> 944         response = retry_api_call(
    945             func, config=self._retry_config, logger=_logger, **kwargs, **self.request_kwargs
    946         )

~/opt/anaconda3/lib/python3.9/site-packages/pyathena/util.py in retry_api_call(func, config, logger, *args, **kwargs)
     82         reraise=True,
     83     )
---> 84     return retry(func, *args, **kwargs)

~/opt/anaconda3/lib/python3.9/site-packages/tenacity/__init__.py in __call__(self, fn, *args, **kwargs)
    377         retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
    378         while True:
--> 379             do = self.iter(retry_state=retry_state)
    380             if isinstance(do, DoAttempt):
    381                 try:

~/opt/anaconda3/lib/python3.9/site-packages/tenacity/__init__.py in iter(self, retry_state)
    312         is_explicit_retry = fut.failed and isinstance(fut.exception(), TryAgain)
    313         if not (is_explicit_retry or self.retry(retry_state)):
--> 314             return fut.result()
    315
    316         if self.after is not None:

~/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py in result(self, timeout)
    437                     raise CancelledError()
    438                 elif self._state == FINISHED:
--> 439                     return self.__get_result()
    440
    441                 self._condition.wait(timeout)

~/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py in __get_result(self)
    389         if self._exception:
    390             try:
--> 391                 raise self._exception
    392             finally:
    393                 # Break a reference cycle with the exception in self._exception

~/opt/anaconda3/lib/python3.9/site-packages/tenacity/__init__.py in __call__(self, fn, *args, **kwargs)
    380             if isinstance(do, DoAttempt):
    381                 try:
--> 382                     result = fn(*args, **kwargs)
    383                 except BaseException:  # noqa: B902
    384                     retry_state.set_exception(sys.exc_info())  # type: ignore[arg-type]

~/opt/anaconda3/lib/python3.9/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    563                 )
    564             # The "self" in this scope is referring to the BaseClient.
--> 565             return self._make_api_call(operation_name, kwargs)
    566
    567         _api_call.__name__ = str(py_operation_name)

~/opt/anaconda3/lib/python3.9/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
   1019             )
   1020             error_class = self.exceptions.from_code(error_code)
-> 1021             raise error_class(parsed_response, operation_name)
   1022         else:
   1023             return parsed_response

ClientError: An error occurred (EntityTooSmall) when calling the CompleteMultipartUpload operation: Your proposed upload is smaller than the minimum allowed size
 

With a very similar workflow to above, I'm getting essentially the same error and stack trace on v3.8.2. Behavior is strange since I have no problems uploading other dataframes of similar size. I'll add more details if I'm able.

https://github.com/laughingman7743/PyAthena/blob/master/pyathena/filesystem/s3.py#L1024-L1027
If the file size is smaller than the block size, the file is not uploaded in multipart.
https://github.com/laughingman7743/PyAthena/blob/master/pyathena/filesystem/s3.py#L994-L997
When writing, the block size is validated to be no smaller than the minimum size for multipart uploads.

I am not sure in what case this error occurs. 🤔

For now, I can confirm that my data with a shape of
34158 rows × 17 columns is okay (probably because the file size is smaller than the block size).

However, 127434 rows × 17 columns throws the same error as before.

If I can help with something, I'll be glad to do it.

@laughingman7743 Not sure if this helps, but I used your logging code to try and understand where my upload is going wrong. Surprisingly, I do get a message logging that the multipart upload completed, but immediately after that the EntityTooSmall error is thrown. I can confirm that the csv I attempted to save is not present. We may end up just downgrading PyAthena versions but I can try to provide more information if possible.

https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L2343-L2351
The error is probably caused by the last part of the multipart request being less than 5 MB.
If the last part is less than 5 MB, it will need to be merged with the previous part and uploaded.