/nats.py

An asyncio based Python client for NATS

Primary LanguagePythonApache License 2.0Apache-2.0

NATS - Python Client for Asyncio

An asyncio (PEP 3156) Python client for the NATS messaging system.

pypi Build Status Versions License Apache 2.0

Supported platforms

Should be compatible with at least Python +3.5.1 using gnatsd as the server.

Getting Started

git clone https://github.com/nats-io/asyncio-nats
cd asyncio-nats
python setup.py install

Or via pip:

pip install asyncio-nats-client

Basic Usage

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    await nc.connect("demo.nats.io:4222", loop=loop)

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sid = await nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    await nc.auto_unsubscribe(sid, 2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sid = await nc.subscribe("help", "workers", help_request)

    # Send a request and expect a single response
    # and trigger timeout if not faster than 200 ms.
    try:
        response = await nc.request("help", b'help me', 0.2)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except ErrTimeout:
        print("Request timed out")

    # Remove interest in subscription.
    await nc.unsubscribe(sid)

    # Terminate connection to NATS.
    await nc.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Wildcard Subscriptions

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers


async def run(loop):
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222", loop=loop)

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # "*" matches any token, at any level of the subject.
    await nc.subscribe("foo.*.baz", cb=message_handler)
    await nc.subscribe("foo.bar.*", cb=message_handler)

    # ">" matches any length of the tail of a subject, and can only be the last token
    # E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
    await nc.subscribe("foo.>", cb=message_handler)

    # Matches all of the above.
    await nc.publish("foo.bar.baz", b'Hello World')

    # Gracefully close the connection.
    await nc.drain()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Advanced Usage

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    try:
        # Setting explicit list of servers in a cluster.
        await nc.connect(servers=["nats://127.0.0.1:4222", "nats://127.0.0.1:4223", "nats://127.0.0.1:4224"], loop=loop)
    except ErrNoServers as e:
        print(e)
        return

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        for i in range(0, 20):
            await nc.publish(reply, "i={i}".format(i=i).encode())

    await nc.subscribe("help.>", cb=message_handler)

    async def request_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Signal the server to stop sending messages after we got 10 already.
    await nc.request(
        "help.please", b'help', expected=10, cb=request_handler)

    try:
        # Flush connection to server, returns when all messages have been processed.
        # It raises a timeout if roundtrip takes longer than 1 second.
        await nc.flush(1)
    except ErrTimeout:
        print("Flush timeout")

    await asyncio.sleep(1, loop=loop)

    # Drain gracefully closes the connection, allowing all subscribers to
    # handle any pending messages inflight that the server may have sent.
    await nc.drain()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Clustered Usage

import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):

    nc = NATS()

    # Setup pool of servers from a NATS cluster.
    options = {
        "servers": [
            "nats://user1:pass1@127.0.0.1:4222",
            "nats://user2:pass2@127.0.0.1:4223",
            "nats://user3:pass3@127.0.0.1:4224",
        ],
        "loop": loop,
    }

    # Will try to connect to servers in order of configuration,
    # by defaults it connect to one in the pool randomly.
    options["dont_randomize"] = True

    # Optionally set reconnect wait and max reconnect attempts.
    # This example means 10 seconds total per backend.
    options["max_reconnect_attempts"] = 5
    options["reconnect_time_wait"] = 2

    async def disconnected_cb():
        print("Got disconnected!")

    async def reconnected_cb():
        # See who we are connected to on reconnect.
        print("Got reconnected to {url}".format(url=nc.connected_url.netloc))

    # Setup callbacks to be notified on disconnects and reconnects
    options["disconnected_cb"] = disconnected_cb
    options["reconnected_cb"] = reconnected_cb

    async def error_cb(e):
        print("There was an error: {}".format(e))

    async def closed_cb():
        print("Connection is closed")

    async def subscribe_handler(msg):
        print("Got message: ", msg.subject, msg.reply, msg.data)

    # Setup callbacks to be notified when there is an error
    # or connection is closed.
    options["error_cb"] = error_cb
    options["closed_cb"] = closed_cb

    try:
        await nc.connect(**options)
    except ErrNoServers as e:
        # Could not connect to any server in the cluster.
        print(e)
        return

    if nc.is_connected:
        await nc.subscribe("help.*", cb=subscribe_handler)

        max_messages = 1000
        start_time = datetime.now()
        print("Sending {} messages to NATS...".format(max_messages))

        for i in range(0, max_messages):
            try:
                await nc.publish("help.{}".format(i), b'A')
                await nc.flush(0.500)
            except ErrConnectionClosed as e:
                print("Connection closed prematurely.")
                break
            except ErrTimeout as e:
                print("Timeout occured when publishing msg i={}: {}".format(
                    i, e))

        end_time = datetime.now()
        await nc.close()
        duration = end_time - start_time
        print("Duration: {}".format(duration))

        try:
            await nc.publish("help", b"hello world")
        except ErrConnectionClosed:
            print("Can't publish since no longer connected.")

    err = nc.last_error
    if err is not None:
        print("Last Error: {}".format(err))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

TLS

TLS connections can be configured with an ssl context

ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                        keyfile='client-key.pem')
await nc.connect(servers=["tls://127.0.0.1:4443"], loop=loop, tls=ssl_ctx)

Setting the scheme to tls in the connect URL will make the client create a default ssl context automatically:

import asyncio
import ssl
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()
    await nc.connect("tls://demo.nats.io:4443", loop=loop)

Note: If getting SSL certificate errors in OS X, try first installing the certifi certificate bundle. If using Python 3.7 for example, then run:

$ /Applications/Python\ 3.7/Install\ Certificates.command
 -- pip install --upgrade certifi
Collecting certifi
...
 -- removing any existing file or link
 -- creating symlink to certifi certificate bundle
 -- setting permissions
 -- update complete

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.