cunarist/solie

task durations and aggregate trades can be more generic with `TimedQueue`

Opened this issue · 0 comments

import asyncio
import time

class TimedQueue:
    """A queue that only stores items added within a certain time window."""

    def __init__(self, max_age):
        """
        Initialize the TimedQueue.

        Parameters:
            max_age (float): The maximum age (in seconds) allowed for items in the queue.
        """
        self.queue = asyncio.Queue()
        self.max_age = max_age
        self.loop = asyncio.get_event_loop()

    async def add_item(self, item):
        """
        Add an item to the queue.

        Parameters:
            item: The item to add to the queue.
        """
        await self.queue.put((time.time(), item))
        await self._cleanup()

    async def _cleanup(self):
        """Remove items from the queue that are older than the maximum age."""
        current_time = time.time()
        while not self.queue.empty():
            timestamp, _ = await self.queue.get()
            if current_time - timestamp <= self.max_age:
                self.queue.put_nowait((timestamp, item))
                break

    async def get_items(self):
        """
        Retrieve all items currently in the queue.

        Returns:
            list: A list of items in the queue.
        """
        return [item for _, item in list(self.queue._queue)]

async def main():
    timed_queue = TimedQueue(10)
    await timed_queue.add_item("First item")
    await asyncio.sleep(5)  # Simulate some time passing
    await timed_queue.add_item("Second item")
    await asyncio.sleep(6)  # More time passing
    await timed_queue.add_item("Third item")

    items = await timed_queue.get_items()
    print(items)

asyncio.run(main())