nats-io/stan.py

Do you have plans and timelines for supporting sync subscriptions?

3asm opened this issue · 4 comments

3asm commented

Hi,

I am exploring using Nats and the chosen implementation requires reading one single message at a time.
Sync subscription is currently supported in Go and Java, but I didn't find any information on plans or timelines to do so in Python.

Do you have plans and timelines for supporting sync subscriptions?

Hi @3asm, you could create a sync subscription by using a queue as follows for example for now. I think adding sync subscription sounds good though, maybe we can support something like this as part of the client...

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    await nc.connect(loop=loop)

    # Start session with NATS Streaming cluster.
    sc = STAN()
    await sc.connect("test-cluster", "client-123", nats=nc)

    # Synchronous Publisher, does not return until an ack
    # has been received from NATS Streaming.
    for i in range(0, 100):
        await sc.publish("test", "n:{0}".format(i).encode())

    queue = asyncio.Queue(maxsize=0)
    async def cb(msg):
        print("Received a message (seq={}): {}".format(msg.seq, msg.data))
        await queue.put(msg)

    # Subscribe to get all messages since beginning.
    await sc.subscribe("hi", start_at='first', cb=cb)

    # Consume one by one messages
    msg = await queue.get()
    print(msg)

    msg = await queue.get()
    print(msg)

    # Close NATS and NATS Streaming sessions
    await sc.close()
    await nc.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()
3asm commented

Thanks @wallyqs.

Wouldn't the callback keep fetching messages from the server this way? The goal is to avoid fetching a new messages without implicit order.

yes but new messages would be fetched from the server anyway since NATS Streaming / STAN is using a push based model, so best we can do to process one by one is to add them to an internal memory structure.

3asm commented

Got it, Thanks for the answer.