omnilib/aiosqlite

database is locked. Wrong usage or out of spec?

Closed this issue · 6 comments

Description

In some situations I'm getting the error "database is locked".
I was expecting that aiosqlites´s connection pool would avoid that.
Perhaps I'm using the library wrong, or pushing it to an extreme that was never expected to work?

In my unittest I execute more than 380 concurrent inserts, I get "database is locked". Less than 380 is OK.
In short my code looks like this

async def insert_data(db_file_path, table_name, data):
  async with aiosqlite.connect(db_file_path) as db:
      query = 'INSERT INTO table (id,type,info_table,tables,url,center_binary_file,filename,cdw_url) VALUES (?,?,?,?,?,?,?,?)'
      await db.execute(query, values)
      await db.commit()

tasks = [insert_data(), insert_data(), insert_data().... ] # len() = 380
await asyncio.gather(*tasks)
        



### Details

* OS: Win10
* Python version: 3.9
* aiosqlite version: 0.18.0
* Can you repro on 'main' branch? N/A
* Can you repro in a clean virtualenv? N/A

How long does this take to run? The default timeout in Python's sqlite is 5 seconds. Here, you're connecting 380 times, and trying to insert 380 times. They'll be queued and whichever the last connection is will be waiting for all the rest so it can do its transaction, and possibly timing out.

Remember, SQLite does not support any sort of concurrent writes. The database file will be locked during a write transaction.

You'll have to either increase your timeout parameter, or find some way to batch these inserts into one transaction. SQLite can write a lot of data very quickly, but not if every insert has its own transaction.

aiosqlite does not use a connection pool. This is likely a limit of the underlying sqlite3 implementation. I'd suggest using a concurrency-limited gather, like aioitertools.asyncio.gather(..., limit=X), especially with many concurrent writes, where only one connection would ever be able to write to a shared database anyways.

@Taywee My insert speed metrics were approximately 50 inserts per second. I did not get any timeout errors

insert 250 rows, time: 5.125394344329834
insert 350 rows, time: 6.984423398971558
insert 380 rows, time: 7.579339265823364

@amyreese I will consider rewriting my method to use a shared database connection:

async def insert_data(db_conn, table_name, data):
  query = 'INSERT INTO table (id,type,info_table,tables,url,center_binary_file,filename,cdw_url) VALUES (?,?,?,?,?,?,?,?)'
  await db_conn.execute(query, values)
  await db_conn.commit()
# create long lived db_conn
# close db_conn before exit()

Thus far I've just used asyncio, but I'm going to need multiprocessing for some cpu bound tasks. I might try sharing a db connection, or make a proxy to the database.

Edit: oops not quite that fast.
I compared new code with old code in a unittest.
The same code runs for a tremendously slow in unittest. 120000 inserts in 46,3 seconds. But that is still 5x faster by reusing the connection.
(minor code tweaks too, like the generator)

Just as bonus info:
I'm getting 3125 inserts per second now, instead of 50.
My old code was obviously terrible. I'm using the same connection now rather than creating a new one before each insert.
1200 rows inserted 0.383 seconds. I'm not even using a concurrency limit.

Here is a self contained example for anyone interested

import asyncio
import aiosqlite
import time
import aio
DB_NAME = "fast_inserts.db"

TABLE_NAME =  'pages'
PAGES_SCHEMA = dict(schema=f'''
                CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
                    id TEXT UNIQUE NOT NULL,
                    type TEXT NOT NULL,
                    info_table TEXT NOT NULL,
                    tables TEXT,
                    url TEXT UNIQUE NOT NULL,
                    center_binary_file BLOB,
                    filename TEXT,
                    cdw_url TEXT
                )
            ''',
              v=2)

async def insert(db_conn, table_name, data):
    """insert regardless of schema"""
    keys = ','.join(data.keys()) # 'id,type,info_table,tables,url,center_binary_file,filename,cdw_url'
    placeholders = ','.join(['?' for _ in range(len(data))]) # '?,?,?,?,?,?,?,?'

    query = f"INSERT INTO {table_name} ({keys}) VALUES ({placeholders})"
    # query = 'INSERT INTO pages (id,type,info_table,tables,url,center_binary_file,filename,cdw_url) VALUES (?,?,?,?,?,?,?,?)'
    values = tuple(data.values())

    await db_conn.execute(query, values)
    await db_conn.commit()


def unique_data():
    id = 0
    while True:
        yield {'id': str(id), 'type': 'case', 'info_table': 'a,b,c', 'tables': 'x,y,z', 'url': f'https://{id}'}
        id += 1

if __name__ == "__main__":
    data_generator =  unique_data()
    async def async_main():

        await aio.create_database_table(DB_NAME)
        async with aiosqlite.connect(DB_NAME) as db_conn:
            # single insert
            await insert(db_conn=db_conn, table_name=TABLE_NAME, data=next(data_generator))

            # many inserts
            num_rows = 1200
            tasks = [insert(db_conn=db_conn, table_name=TABLE_NAME, data=next(data_generator)) for _ in range(num_rows)]
            start = time.time()
            await asyncio.gather(*tasks)
            print(f"insert {num_rows} rows, time: {time.time() - start}")

    asyncio.run(async_main())

@Krogsager
You wouldn't get a timeout error. Sqlite gives you a sqlite3.OperationalError: database is locked because this is caused by SQLITE_BUSY. It's a lock error caused by a timeout waiting on a lock to be released.

See Set A Busy Timeout and (5) SQLITE_BUSY for information on what the timeout parameter does.

This simple script verifies it:

from threading import Thread
from sqlite3 import connect
from time import sleep
from tempfile import TemporaryDirectory
from contextlib import closing

from pathlib import Path

with TemporaryDirectory() as tempdir:
    db = Path(tempdir) / 'test.db'

    def lock_sleep(time: float = 3.0) -> None:
        with connect(db, isolation_level=None, timeout=1) as connection:
            connection.create_function("sleep", 1, sleep)
            with closing(connection.cursor()) as cursor:
                print('running script')
                cursor.executescript(f'''
                    BEGIN IMMEDIATE;
                    SELECT SLEEP({time});
                    COMMIT;
                ''')
    threads = [Thread(target=lock_sleep) for _ in range(2)]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()
$ python test.py
running script
running script
Exception in thread Thread-2 (lock_sleep):
Traceback (most recent call last):
  File "/usr/lib64/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/tmp.0H58rpEkLv/test.py", line 17, in lock_sleep
    cursor.executescript(f'''
sqlite3.OperationalError: database is locked

I'm glad you solved it, though. Remember, in SQLite (and most SQLs, really), inserts are very fast, commits are slow, and opening and closing the database is very slow. The more inserts you can do between each commit (within reason), the better.

You can also tune for better performance, too, using some performance-oriented PRAGMA statements.

Personally, I always use PRAGMA journal_mode = WAL (check out the documentation on Write-Ahead Logging), PRAGMA synchronous = NORMAL, and on close I always run PRAGMA analysis_limit = 8192 and PRAGMA optimize

aiosqlite does not use a connection pool. This is likely a limit of the underlying sqlite3 implementation. I'd suggest using a concurrency-limited gather, like aioitertools.asyncio.gather(..., limit=X), especially with many concurrent writes, where only one connection would ever be able to write to a shared database anyways.

That worked, it seems also the user's machine specs play it out, I was getting such lock in a linux machine, but not in a mac OS