Flaky failures in test_concurrency
Closed this issue · 1 comments
musicinmybrain commented
On Fedora Linux 37, x86_64, I see the following maybe one time out of every three to five runs:
$ pytest tests/test_concurrency.py -v
============================================== test session starts ===============================================
platform linux -- Python 3.11.1, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /home/ben/src/forks/PyrateLimiter/_e/bin
/python3
cachedir: .pytest_cache
rootdir: /home/ben/src/forks/PyrateLimiter
plugins: forked-1.4.0, cov-3.0.0, xdist-2.5.0, asyncio-0.12.0
collected 3 items
tests/test_concurrency.py::test_concurrency[ThreadPoolExecutor-SQLiteBucket] PASSED [ 33%]
tests/test_concurrency.py::test_concurrency[ProcessPoolExecutor-SQLiteBucket] FAILED [ 66%]
tests/test_concurrency.py::test_filelock_concurrency PASSED [100%]
==================================================== FAILURES ====================================================
_______________________________ test_concurrency[ProcessPoolExecutor-SQLiteBucket] _______________________________
executor_class = <class 'concurrent.futures.process.ProcessPoolExecutor'>
bucket_class = <class 'pyrate_limiter.sqlite_bucket.SQLiteBucket'>
@pytest.mark.parametrize("bucket_class", [SQLiteBucket])
@pytest.mark.parametrize("executor_class", [ThreadPoolExecutor, ProcessPoolExecutor])
def test_concurrency(executor_class, bucket_class):
"""Make a fixed number of concurrent requests using a shared Limiter, and check the total time
they take to run
"""
logger.info(f"Testing {bucket_class.__name__} with {executor_class.__name__}")
# Set up limiter
bucket_kwargs = {
"path": join(gettempdir(), f"test_{executor_class.__name__}.sqlite"),
}
limiter = Limiter(
RequestRate(LIMIT_REQUESTS_PER_SECOND, Duration.SECOND),
bucket_class=bucket_class,
bucket_kwargs=bucket_kwargs,
)
# Set up request function
bucket_ids = [f"{executor_class.__name__}_bucket_{i}" for i in range(N_BUCKETS)]
start_time = perf_counter()
request_func = partial(_send_request, limiter, bucket_ids, start_time)
# Distribute requests across workers
with executor_class(max_workers=N_WORKERS) as executor:
list(executor.map(request_func, range(N_REQUESTS), timeout=300))
# Check total time, with debug logging
elapsed = perf_counter() - start_time
expected_min_time = (N_REQUESTS - 1) / LIMIT_REQUESTS_PER_SECOND
worker_type = "threads" if executor_class is ThreadPoolExecutor else "processes"
logger.info(
f"Ran {N_REQUESTS} requests with {N_WORKERS} {worker_type} in {elapsed:.2f} seconds\n"
f"With a rate limit of {LIMIT_REQUESTS_PER_SECOND}/second, expected at least "
f"{expected_min_time} seconds"
)
> assert elapsed >= expected_min_time
E assert 9.634221867017914 >= 10.0
tests/test_concurrency.py:68: AssertionError
---------------------------------------------- Captured stderr call ----------------------------------------------
INFO:pyrate_limiter.tests:Testing SQLiteBucket with ProcessPoolExecutor
INFO:pyrate_limiter.tests:Ran 101 requests with 7 processes in 9.63 seconds
With a rate limit of 10/second, expected at least 10.0 seconds
----------------------------------------------- Captured log call ------------------------------------------------
INFO pyrate_limiter.tests:test_concurrency.py:38 Testing SQLiteBucket with ProcessPoolExecutor
INFO pyrate_limiter.tests:test_concurrency.py:63 Ran 101 requests with 7 processes in 9.63 seconds
With a rate limit of 10/second, expected at least 10.0 seconds
============================================ short test summary info =============================================
FAILED tests/test_concurrency.py::test_concurrency[ProcessPoolExecutor-SQLiteBucket] - assert 9.634221867017914...
========================================== 1 failed, 2 passed in 50.16s ==========================================
While trying to package PyrateLimiter as an RPM for Fedora Linux Rawhide (the development version), we have also seen:
=================================== FAILURES ===================================
______________ test_concurrency[ProcessPoolExecutor-SQLiteBucket] ______________
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib64/python3.11/concurrent/futures/process.py", line 256, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/concurrent/futures/process.py", line 205, in _process_chunk
return [fn(*args) for args in chunk]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/concurrent/futures/process.py", line 205, in <listcomp>
return [fn(*args) for args in chunk]
^^^^^^^^^
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/tests/test_concurrency.py", line 75, in _send_request
with limiter.ratelimit(*bucket_ids, delay=True):
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/limit_context_decorator.py", line 66, in __enter__
self.delayed_acquire()
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/limit_context_decorator.py", line 82, in delayed_acquire
self.try_acquire()
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/limiter.py", line 92, in try_acquire
volume = bucket.size()
^^^^^^^^^^^^^
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/sqlite_bucket.py", line 95, in size
self._size = self._query_size()
^^^^^^^^^^^^^^^^^^
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/sqlite_bucket.py", line 100, in _query_size
return self.connection.execute(f"SELECT COUNT(*) FROM {self.table}").fetchone()[0]
^^^^^^^^^^^^^^^
File "/builddir/build/BUILD/PyrateLimiter-103252ca8d5336dc19b69fda6b65798eac932fd2/pyrate_limiter/sqlite_bucket.py", line 73, in connection
self._connection.execute(
sqlite3.OperationalError: database is locked
"""
The above exception was the direct cause of the following exception:
executor_class = <class 'concurrent.futures.process.ProcessPoolExecutor'>
bucket_class = <class 'pyrate_limiter.sqlite_bucket.SQLiteBucket'>
@pytest.mark.parametrize("bucket_class", [SQLiteBucket])
@pytest.mark.parametrize("executor_class", [ThreadPoolExecutor, ProcessPoolExecutor])
def test_concurrency(executor_class, bucket_class):
"""Make a fixed number of concurrent requests using a shared Limiter, and check the total time
they take to run
"""
logger.info(f"Testing {bucket_class.__name__} with {executor_class.__name__}")
# Set up limiter
bucket_kwargs = {
"path": join(gettempdir(), f"test_{executor_class.__name__}.sqlite"),
}
limiter = Limiter(
RequestRate(LIMIT_REQUESTS_PER_SECOND, Duration.SECOND),
bucket_class=bucket_class,
bucket_kwargs=bucket_kwargs,
)
# Set up request function
bucket_ids = [f"{executor_class.__name__}_bucket_{i}" for i in range(N_BUCKETS)]
start_time = perf_counter()
request_func = partial(_send_request, limiter, bucket_ids, start_time)
# Distribute requests across workers
with executor_class(max_workers=N_WORKERS) as executor:
> list(executor.map(request_func, range(N_REQUESTS), timeout=300))
tests/test_concurrency.py:57:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib64/python3.11/concurrent/futures/process.py:597: in _chain_from_iterable_of_lists
for element in iterable:
/usr/lib64/python3.11/concurrent/futures/_base.py:621: in result_iterator
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
/usr/lib64/python3.11/concurrent/futures/_base.py:317: in _result_or_cancel
return fut.result(timeout)
/usr/lib64/python3.11/concurrent/futures/_base.py:449: in result
return self.__get_result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = None
def __get_result(self):
if self._exception:
try:
> raise self._exception
E sqlite3.OperationalError: database is locked
/usr/lib64/python3.11/concurrent/futures/_base.py:401: OperationalError
----------------------------- Captured stderr call -----------------------------
INFO:pyrate_limiter.tests:Testing SQLiteBucket with ProcessPoolExecutor
------------------------------ Captured log call -------------------------------
INFO pyrate_limiter.tests:test_concurrency.py:38 Testing SQLiteBucket with ProcessPoolExecutor
=========================== short test summary info ============================
FAILED tests/test_concurrency.py::test_concurrency[ProcessPoolExecutor-SQLiteBucket]
=================== 1 failed, 54 passed in 209.79s (0:03:29) ===================
but this is harder to reliably reproduce.