Do you have plans and timelines for supporting sync subscriptions?
3asm opened this issue · 4 comments
Hi,
I am exploring using Nats and the chosen implementation requires reading one single message at a time.
Sync subscription is currently supported in Go and Java, but I didn't find any information on plans or timelines to do so in Python.
Do you have plans and timelines for supporting sync subscriptions?
Hi @3asm, you could create a sync subscription by using a queue as follows for example for now. I think adding sync subscription sounds good though, maybe we can support something like this as part of the client...
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
await nc.connect(loop=loop)
# Start session with NATS Streaming cluster.
sc = STAN()
await sc.connect("test-cluster", "client-123", nats=nc)
# Synchronous Publisher, does not return until an ack
# has been received from NATS Streaming.
for i in range(0, 100):
await sc.publish("test", "n:{0}".format(i).encode())
queue = asyncio.Queue(maxsize=0)
async def cb(msg):
print("Received a message (seq={}): {}".format(msg.seq, msg.data))
await queue.put(msg)
# Subscribe to get all messages since beginning.
await sc.subscribe("hi", start_at='first', cb=cb)
# Consume one by one messages
msg = await queue.get()
print(msg)
msg = await queue.get()
print(msg)
# Close NATS and NATS Streaming sessions
await sc.close()
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
Thanks @wallyqs.
Wouldn't the callback keep fetching messages from the server this way? The goal is to avoid fetching a new messages without implicit order.
yes but new messages would be fetched from the server anyway since NATS Streaming / STAN is using a push based model, so best we can do to process one by one is to add them to an internal memory structure.
Got it, Thanks for the answer.