Panini is a modern framework for fast and straightforward microservices development. Our goal is to create fastapi/aiohttp/flask-like solution but for NATS streaming. The framework allows you to work with NATS features and some additional logic using a simple interface:
- easy to initialize application
from panini import app as panini_app
app = panini_app.App(
service_name='your-microservice-name',
host='127.0.0.1',
port=4222,
)
- stream via NATS broker to some subject
@app.task()
async def publish():
while True:
message = get_some_update()
await app.publish(subject='some.subject', message=message)
- subscribe to subject
@app.listen('some.subject')
async def subject_for_requests_listener(msg):
subject = msg.subject
message = msg.data
# handle incoming message
- request to subject
response = await app.request(subject='some.request.subject.123', message={'request':'params'})
- receive a request from another microservice and return a response like HTTP request-response
@app.listen('some.request.subject.123')
async def request_listener(msg):
subject = msg.subject
message = msg.data
# handle request
return {'success': True, 'data': 'request has been processed'}
- create periodic tasks
@app.timer_task(interval=2)
async def your_periodic_task():
for _ in range(10):
await app.publish(subject='some.publish.subject', message={'some':'data'})
- synchronous and asynchronous endpoints
@app.timer_task(interval=2)
def your_periodic_task():
for _ in range(10):
app.publish_sync(subject='some.publish.subject', message={'some':'data'})
- accept different datatypes: dict, str, bytes
@app.timer_task(interval=2)
def your_periodic_task():
for _ in range(10):
app.publish_sync(subject='some.publish.subject', message=b'messageinbytesrequiresminimumoftimetosend', data_type=bytes)
- create middlewares for NATS messages
from panini.middleware import Middleware
class MyMiddleware(Middleware):
async def send_publish(self, subject, message, publish_func, **kwargs):
print('do something before publish')
await publish_func(subject, message, **kwargs)
print('do something after publish')
async def listen_publish(self, msg, cb):
print('do something before listen')
await cb(msg)
print('do something after listen')
async def send_request(self, subject, message, request_func, **kwargs):
print('do something before send request')
result = await request_func(subject, message, **kwargs)
print('do something after send request')
return result
async def listen_request(self, msg, cb):
print('do something before listen request')
result = await cb(msg)
print('do something after listen request')
return result
- create HTTP endpoints with aiohttp and NATS endpoints all together in one microservice
from aiohttp import web
@app.listen('some.publish.subject')
async def subject_for_requests_listener(msg):
handle_incoming_message(msg.subject, msg.data)
@app.http.get('/get')
async def web_endpoint_listener(request):
"""
Single HTTP endpoint
"""
return web.Response(text="Hello, world")
@app.http.view('/path/to/rest/endpoints')
class MyView(web.View):
"""
HTTP endpoints for REST schema
"""
async def get(self):
request = self.request
return web.Response(text="Hello, REST world")
async def post(self):
request = self.request
return web.Response(text="Hello, REST world")
- built-in traffic balancing between instances of the microservice if you have high loads
app = panini_app.App(
service_name='async_publish',
host='127.0.0.1',
allocation_queue_group='group24',
port=4222,
app_strategy='asyncio',
)
# incoming traffic will be distributed among
# all microservices that are in the "group24"
What is NATS?
It is a high-performance messaging system written in Golang. It is straightforward to use, can run millions of messages per minute through one broker, and easily scales if you need many brokers. Has 48 well-known clients, 11 of which are supported by maintainers, 18 are contributors to the community. Delivery Guarantees, High Availability and Fault Tolerance. Panini based on NATS python client.
What is subject?
It is just a string of characters that form a name the publisher and subscriber can use to find each other. NATS supports a wild card that dramatically increases the usability of subjects
NATS support subject hierarchies and wildcards What is a hierarchy? here
What is a wildcard? here
Just try following the examples to see how easy it is.
The project is inspired by faust, the wonderful python project based on Kafka Streams
is coming...
pip install panini
Additional requirements:
- python >= 3.8.2
Run broker from a directory that include file docker-compose.yml. Command below:
docker-compose up
Stop broker:
docker-compose down
Note, for production we recommend running the broker with dockerized microservices. Example of dockerized panini project here
For streams:
from panini import app as panini_app
app = panini_app.App(
service_name='async_publish',
host='127.0.0.1',
port=4222,
app_strategy='asyncio',
)
log = app.logger
msg = {'key1':'value1', 'key2':2, 'key3':3.0, 'key4':[1,2,3,4], 'key5':{'1':1, '2':2, '3':3, '4':4, '5':5}, 'key6':{'subkey1':'1', 'subkey2':2, '3':3, '4':4, '5':5}, 'key7':None}
@app.task()
async def publish():
for _ in range(10):
await app.publish(subject='some.publish.subject', message=msg)
log.info(f'send message {msg}')
@app.timer_task(interval=2)
async def publish_periodically():
for _ in range(10):
await app.publish(subject='some.publish.subject', message=msg)
log.info(f'send message from periodic task {msg}')
@app.listen('some.publish.subject')
async def receive_messages(msg):
log.info(f'got message {msg.data}')
if __name__ == "__main__":
app.start()
Let's say name of script above app.py
. Be sure that broker is running and just execute:
python3 app.py
It's all! Microservice launched
Classical request-response:
from panini import app as panini_app
app = panini_app.App(
service_name='async_request',
host='127.0.0.1',
port=4222,
app_strategy='asyncio',
)
log = app.logger
msg = {'key1': 'value1', 'key2': 2, 'key3': 3.0, 'key4': [1, 2, 3, 4], 'key5': {'1': 1, '2': 2, '3': 3, '4': 4, '5': 5},
'key6': {'subkey1': '1', 'subkey2': 2, '3': 3, '4': 4, '5': 5}, 'key7': None}
@app.task()
async def request():
for _ in range(10):
result = await app.request(subject='some.request.subject.123', message=msg)
log.info(f'response: {result}')
@app.listen('some.request.subject.123')
async def request_listener(msg):
log.info('request has been processed')
return {'success': True, 'data': f'request from {msg.subject} has been processed'}
if __name__ == "__main__":
app.start()
A response of request is sent to the third subject. This method can significantly increase the throughput in comparison to classical request-response model
from panini import app as panini_app
app = panini_app.App(
service_name='async_reply_to',
host='127.0.0.1',
port=4222,
app_strategy='asyncio',
)
log = app.logger
msg = {'key1': 'value1', 'key2': 2, 'key3': 3.0, 'key4': [1, 2, 3, 4], 'key5': {'1': 1, '2': 2, '3': 3, '4': 4, '5': 5},
'key6': {'subkey1': '1', 'subkey2': 2, '3': 3, '4': 4, '5': 5}, 'key7': None}
@app.task()
async def request_to_another_subject():
for _ in range(10):
await app.publish(subject='some.subject.for.request.with.response.to.another.subject',
message=msg,
reply_to='reply.to.subject')
log.info('sent request')
@app.listen('some.subject.for.request.with.response.to.another.subject')
async def request_listener(msg):
log.info('request has been processed')
return {'success': True, 'data': f'request from {msg.subject} has been processed'}
@app.listen('reply.to.subject')
async def another_subject_listener(msg):
log.info(f'received response: {msg.subject} {msg.data}')
if __name__ == "__main__":
app.start()
Validator allows you to validate incoming messages:
from panini import app as panini_app
from panini.validator import Validator, Field
app = panini_app.App(
service_name='validators',
host='127.0.0.1',
port=4222,
app_strategy='asyncio',
)
log = app.logger
class SubTestValidator(Validator):
subkey1 = Field(type=str)
subkey2 = Field(type=int)
class TestValidator(Validator):
key1 = Field(type=str)
key2 = Field(type=int)
key3 = Field(type=float)
key4 = Field(type=list)
key5 = Field(type=dict)
key6 = Field(type=SubTestValidator)
key7 = Field(type=int, null=True)
key8 = Field(type=int, null=True, default=None)
msg = {'key1': 'value1', 'key2': 2, 'key3': 3.0, 'key4': [1, 2, 3, 4], 'key5': {'1': 1, '2': 2, '3': 3, '4': 4, '5': 5},
'key6': {'subkey1': '1', 'subkey2': 2, '3': 3, '4': 4, '5': 5}, 'key7': None}
@app.task()
async def publish():
for _ in range(10):
await app.publish(subject='some.publish.subject', message=msg)
@app.timer_task(interval=2)
async def publish_periodically():
for _ in range(10):
await app.publish(subject='some.publish.subject', message=msg)
@app.listen('some.publish.subject', validator=TestValidator)
async def subject_for_requests_listener(msg):
log.info(f'got message {msg.data}')
if __name__ == "__main__":
app.start()
You must specify web_server=True to activate the web server. Aiohttp is used as a web server. Accordingly, you can use their syntax. Also, you can specify web
from aiohttp import web
from panini import app as panini_app
app = panini_app.App(
service_name='async_web_server',
host='127.0.0.1',
port=4222,
app_strategy='asyncio',
web_server=True,
web_host='127.0.0.1',
web_port=8999,
)
log = app.logger
@app.http.get('/get')
async def web_endpoint_listener(request):
"""
Single HTTP endpoint
"""
return web.Response(text="Hello, world")
@app.http.view('/path/to/rest/endpoints')
class MyView(web.View):
"""
HTTP endpoints for REST schema
"""
async def get(self):
request = self.request
return web.Response(text="Hello, REST world")
async def post(self):
request = self.request
return web.Response(text="Hello, REST world")
if __name__ == "__main__":
app.start()
Not familiar with asyncio? Try a synchronous implementation
from panini import app as panini_app
app = panini_app.App(
service_name='ms_template_sync_by_lib',
host='127.0.0.1',
port=4222,
app_strategy='sync',
)
log = app.logger
msg = {'key1':'value1', 'key2':2, 'key3':3.0, 'key4':[1,2,3,4], 'key5':{'1':1, '2':2, '3':3, '4':4, '5':5}, 'key6':{'subkey1':'1', 'subkey2':2, '3':3, '4':4, '5':5}, 'key7':None}
@app.task()
def publish():
for _ in range(10):
app.publish_sync(subject='some.publish.subject', message=msg)
log.info(f'send message {msg}')
@app.timer_task(interval=2)
def publish_periodically():
for _ in range(10):
app.publish_sync(subject='some.publish.subject', message=msg)
log.info(f'send message from periodic task {msg}')
@app.listen('some.publish.subject')
def subject_for_requests_listener(msg):
log.info(f'got message {msg.data}')
if __name__ == "__main__":
app.start()
Remember, a synchronous app_strategy many times slower than an asynchronous one. It is designed for users who have no experience with asyncio. Sync implementation only useful for very lazy microservices
Panini creates a logfile folder in the project directory and stores all logs there. There are several ways to store your own logs there.
Logging from app object:
from panini import app as panini_app
from panini.utils.logger import get_logger
app = panini_app.App( # create app
service_name='ms_template_sync_by_lib',
host='127.0.0.1',
port=4222,
app_strategy='sync',
)
log = app.logger # create log handler
log = get_logger('ms_template_sync_by_lib') # does exactly the same thing
log.info("some log") # write log
log.warning("some warn log")
log.error("some error log")
log.exception("some exception log with automatic traceback logging")
Separated (after setting at the startup - you can get any registered logger with a get_logger function):
from panini.utils.logger import get_logger
log = get_logger('some_logger_name')
log.warning("some log") # write log
Panini uses logging in separate process by default to speed-up app, but you can change it on the app creation:
from panini import app as panini_app
app = panini_app.App(
service_name='ms_template_sync_by_lib',
host='127.0.0.1',
port=4222,
app_strategy='sync',
logger_in_separate_process=False, # specify this option for logging in main process
)
Panini let you to choose between default (recommended by developers) and custom logger configurations. If you want to
use custom logging config - just create config/log_config.json
file with custom logger configuration at the app root.
Panini will automatically detect and set it. After that you can get your logger with get_logger
function.
We use pytest for testing
To run tests (notice, that nats-server must be running on port 4222 for tests):
cd tests/
./run_test.sh
Welcome contributor! We are looking developers to make Panini a great project.
Working on your first Pull Request? You can learn how from this free series, How to Contribute to an Open Source Project on GitHub.
Here's how you can help:
- suggest new updates or report about bug here
- review a pull request
- fix an issue
- write a tutorial
- always follow by this guide for your contributions
At this point, you're ready to make your changes! Feel free to ask for help 😸