/aiokeydb-py

Async KeyDB Python Client

Primary LanguagePython

aiokeydb

Latest Version: PyPI version

Unified Syncronous and Asyncronous Python client for KeyDB and Redis.

It implements the majority of redis-py API methods and is fully compatible with redis-py API methods, while also providing a unified Client for both sync and async methods.

Additionally, newer apis such as KeyDBClient are strongly typed to provide a more robust experience.


Usage

aiokeydb can be installed from pypi using pip or from source.

Installation

# Install from pypi
pip install aiokeydb

# Install from source
pip install git+https://github.com/trisongz/aiokeydb-py.git

Examples

Below are some examples of how to use aiokeydb. Additional examples can be found in the tests directory.

import time
import asyncio
import uuid
from aiokeydb import KeyDBClient

# The session can be explicitly initialized, or
# will be lazily initialized on first use
# through environment variables with all 
# params being prefixed with `KEYDB_`

keydb_uri = "keydb://localhost:6379/0"

# Initialize the Unified Client
KeyDBClient.init_session(
    uri = keydb_uri,
)

# Cache the results of these functions
# cachify works for both sync and async functions
# and has many params to customize the caching behavior
# and supports both `redis` and `keydb` backends
# as well as `api` frameworks such as `fastapi` and `starlette`

@KeyDBClient.cachify()
async def async_fibonacci(number: int):
    if number == 0: return 0
    elif number == 1: return 1
    return await async_fibonacci(number - 1) + await async_fibonacci(number - 2)

@KeyDBClient.cachify()
def fibonacci(number: int):
    if number == 0: return 0
    elif number == 1: return 1
    return fibonacci(number - 1) + fibonacci(number - 2)

async def test_fib(n: int = 100, runs: int = 10):
    # Test that both results are the same.
    sync_t, async_t = 0.0, 0.0

    for i in range(runs):
        t = time.time()
        print(f'[Async - {i}/{runs}] Fib Result: {await async_fibonacci(n)}')
        tt = time.time() - t
        print(f'[Async - {i}/{runs}] Fib Time: {tt:.2f}s')
        async_t += tt

        t = time.time()
        print(f'[Sync  - {i}/{runs}] Fib Result: {fibonacci(n)}')
        tt = time.time() - t
        print(f'[Sync  - {i}/{runs}] Fib Time: {tt:.2f}s')
        sync_t += tt
    
    print(f'[Async] Cache Average Time: {async_t / runs:.2f}s | Total Time: {async_t:.2f}s')
    print(f'[Sync ] Cache Average Time: {sync_t / runs:.2f}s | Total Time: {sync_t:.2f}s')
        
async def test_setget(runs: int = 10):
    # By default, the client utilizes `pickle` to serialize
    # and deserialize objects. This can be changed by setting
    # the `serializer`

    sync_t, async_t = 0.0, 0.0
    for i in range(runs):
        value = str(uuid.uuid4())
        key = f'async-test-{i}'
        t = time.time()
        await KeyDBClient.async_set(key, value)
        assert await KeyDBClient.async_get(key) == value
        tt = time.time() - t
        print(f'[Async - {i}/{runs}] Get/Set: {key} -> {value} = {tt:.2f}s')
        async_t += tt

        value = str(uuid.uuid4())
        key = f'sync-test-{i}'
        t = time.time()
        KeyDBClient.set(key, value)
        assert KeyDBClient.get(key) == value
        tt = time.time() - t
        print(f'[Sync  - {i}/{runs}] Get/Set: {key}  -> {value} = {tt:.2f}s')
        sync_t += tt
    
    print(f'[Async] GetSet Average Time: {async_t / runs:.2f}s | Total Time: {async_t:.2f}s')
    print(f'[Sync ] GetSet Average Time: {sync_t / runs:.2f}s | Total Time: {sync_t:.2f}s')


async def run_tests(fib_n: int = 100, fib_runs: int = 10, setget_runs: int = 10):
    
    # You can explicitly wait for the client to be ready
    # Sync version
    # KeyDBClient.wait_for_ready()
    await KeyDBClient.async_wait_for_ready()

    # Run the tests
    await test_fib(n = fib_n, runs = fib_runs)
    await test_setget(runs = setget_runs)


    # Utilize the current session
    await KeyDBClient.async_set('async_test_0', 'test')
    assert await KeyDBClient.async_get('async_test_0') == 'test'

    KeyDBClient.set('sync_test_0', 'test')
    assert KeyDBClient.get('sync_test_0') == 'test'


    # you can access the `KeyDBSession` object directly
    # which mirrors the APIs in `KeyDBClient`

    await KeyDBClient.session.async_set('async_test_1', 'test')
    assert await KeyDBClient.session.async_get('async_test_1') == 'test'

    KeyDBClient.session.set('sync_test_1', 'test')
    assert KeyDBClient.session.get('sync_test_1') == 'test'

    # The underlying client can be accessed directly
    # if the desired api methods aren't mirrored

    # KeyDBClient.keydb
    # KeyDBClient.async_keydb
    # Since encoding / decoding is not handled by the client
    # you must encode / decode the data yourself
    await KeyDBClient.async_keydb.set('async_test_2', b'test')
    assert await KeyDBClient.async_keydb.get('async_test_2') == b'test'

    KeyDBClient.keydb.set('sync_test_2', b'test')
    assert KeyDBClient.keydb.get('sync_test_2') == b'test'

    # You can also explicitly close the client
    # However, this closes the connectionpool and will terminate
    # all connections. This is not recommended unless you are
    # explicitly closing the client.

    # Sync version
    # KeyDBClient.close()
    await KeyDBClient.aclose()

asyncio.run(run_tests())

Additionally, you can use the previous APIs that are expected to be present

from aiokeydb import KeyDB, AsyncKeyDB, from_url

sync_client = KeyDB()
async_client = AsyncKeyDB()

# Alternative methods
sync_client = from_url('keydb://localhost:6379/0')
async_client = from_url('keydb://localhost:6379/0', asyncio = True)

Setting the Global Client Settings

The library is designed to be explict and not have any global state. However, you can set the global client settings by using the KeyDBClient.configure method. This is useful if you are using the KeyDBClient class directly, or if you are using the KeyDBClient class in a library that you are developing.

For example, if you initialize a session with a specific uri, the global client settings will still inherit from the default settings.

from aiokeydb import KeyDBClient
from lazyops.utils import logger

keydb_uris = {
    'default': 'keydb://127.0.0.1:6379/0',
    'cache': 'keydb://localhost:6379/1',
    'public': 'redis://public.redis.db:6379/0',
}

sessions = {}

# these will now be initialized
for name, uri in keydb_uris.items():
    sessions[name] = KeyDBClient.init_session(
        name = name,
        uri = uri,
    )
    logger.info(f'Session {name}: uri: {sessions[name].uri}')

# however if you initialize another session
# it will use the global environment vars

sessions['test'] = KeyDBClient.init_session(
    name = 'test',
)
logger.info(f'Session test: uri: {sessions["test"].uri}')

By configuring the global settings, any downstream sessions will inherit the global settings.

from aiokeydb import KeyDBClient
from lazyops.utils import logger

default_uri = 'keydb://public.host.com:6379/0'
keydb_dbs = {
    'cache': {
        'db_id': 1,
    },
    'db': {
        'uri': 'keydb://127.0.0.1:6379/0',
    },
}

KeyDBClient.configure(
    url = default_uri,
    debug_enabled = True,
    queue_db = 1,
)

# now any sessions that are initialized will use the global settings

sessions = {}
# these will now be initialized

# Initialize the first default session
# which should utilize the `default_uri`
KeyDBClient.init_session()

for name, config in keydb_dbs.items():
    sessions[name] = KeyDBClient.init_session(
        name = name,
        **config
    )
    logger.info(f'Session {name}: uri: {sessions[name].uri}')

KeyDB Worker Queues

Released since v0.1.1

KeyDB Worker Queues is a simple, fast, and reliable queue system for KeyDB. It is designed to be used in a distributed environment, where multiple KeyDB instances are used to process jobs. It is also designed to be used in a single instance environment, where a single KeyDB instance is used to process jobs.

import asyncio
from aiokeydb import KeyDBClient
from aiokeydb.queues import TaskQueue, Worker
from lazyops.utils import logger


# Configure the KeyDB Client - the default keydb client will use 
# db = 0, and queue uses 2 so that it doesn't conflict with other
# by configuring it here, you can explicitly set the db to use
keydb_uri = "keydb://127.0.0.1:6379/0"

# Configure the Queue to use db = 1 instead of 2
KeyDBClient.configure(
    url = keydb_uri,
    debug_enabled = True,
    queue_db = 1,
)

@Worker.add_cronjob("*/1 * * * *")
async def test_cron_task(*args, **kwargs):
    logger.info("Cron task ran")
    await asyncio.sleep(5)

@Worker.add_function()
async def test_task(*args, **kwargs):
    logger.info("Task ran")
    await asyncio.sleep(5)

async def run_tests():
    queue = TaskQueue("test_queue")
    worker = Worker(queue)
    await worker.start()

asyncio.run(run_tests())

Requirements

  • deprecated>=1.2.3

  • packaging>=20.4

  • importlib-metadata >= 1.0; python_version < "3.8"

  • typing-extensions; python_version<"3.8"

  • async-timeout>=4.0.2

  • pydantic

  • anyio

  • lazyops


Major Changes:

  • v0.0.8 -> v0.0.11:

    Fully Migrates away from aioredis to redis-py @ v4.3.4.

    This inherits all API methods from redis-py to enforce compatability since deprecation of aioredis going forward

    This fork of redis-py has some modified semantics, while attempting to replicate all the API methods of redis-py to avoid compatibility issues with underlying libraries that require a pinned redis version.

    Notably, all async Classes and methods are prefixed by Async to avoid name collisions with the sync version.

  • v0.0.11 -> v0.1.0:

    Migration of aiokeydb.client -> aiokeydb.core and aiokeydb.asyncio.client -> aiokeydb.asyncio.core However these have been aliased to their original names to avoid breaking changes.

    Creating a unified API available through new KeyDBClient class that creates sessions which are KeyDBSession inherits from KeyDB and AsyncKeyDBClient class that inherits from AsyncKeyDB class