This is a work in progress. I had a need for some services to automate admin tasks in RabbitMQ, so I decided to start building some things.
This is a class that makes one connection to the Rabbit Admin API and one to a direct exchange/queue named: rmq_utils/account_creator
. It listens for messages, creates a new user/password/vhost via the Rabbit Admin API, and calls a post_create
method for each. The message must be a JSON object:
{
"user_key": "whatever"
}
The post_create
method is implemented in your sub-class. It will receive two input parameters:
user_key
- The same value received in the messagecredentials
- A dictionary representing the Rabbit account info for the account that was just created. It has the following keys:username
,password
,vhost
Example:
from rmq_utils import AccountCreator
RABBIT_URL = 'rabbit-url-to-listen-for-messages'
ROUTING_KEY = 'unique-key-for-server'
ADMIN_HOST = 'rabbit-admin-host'
ADMIN_USER = 'admin-user'
ADMIN_PASSWORD = 'admin-password'
class ExampleAccountCreator(AccountCreator):
@staticmethod
def post_create(user_key, creds):
# Do whatever you want here, perhaps make a call to
# update a record in a database using the user_key
print('{0}: {1}'.format(user_key, creds['username']))
def main():
creator = ExampleAccountCreator(
rabbit_url=RABBIT_URL,
routing_key=ROUTING_KEY,
mgmt_host=ADMIN_HOST,
mgmt_user=ADMIN_USER,
mgmt_password=ADMIN_PASSWORD)
try:
creator.run()
except KeyboardInterrupt:
creator.stop()
if __name__ == '__main__':
main()
This is a class that makes one connection to the Rabbit Admin API and one to a direct exchange/queue named: rmq_utils/account_destroyer
. It listens for messages, deletes an existing user/vhost via the Rabbit Admin API, and calls a post_delete
method for each. The message must be a JSON object:
{
"user_key": "whatever",
"rabbit_user": "the_username"
}
The post_delete
method is implemented in your sub-class. It will receive one input parameter:
user_key
- The same value received in the message
Example:
from rmq_utils import AccountDestroyer
RABBIT_URL = 'rabbit-url-to-listen-for-messages'
ROUTING_KEY = 'unique-key-for-server'
ADMIN_HOST = 'rabbit-admin-host'
ADMIN_USER = 'admin-user'
ADMIN_PASSWORD = 'admin-password'
class ExampleAccountDestroyer(AccountDestroyer):
@staticmethod
def post_delete(user_key):
# Do whatever you want here, perhaps make a call to
# update a record in a database using the user_key
print('{0}'.format(user_key))
def main():
destroyer = ExampleAccountDestroyer(
rabbit_url=RABBIT_URL,
routing_key=ROUTING_KEY,
mgmt_host=ADMIN_HOST,
mgmt_user=ADMIN_USER,
mgmt_password=ADMIN_PASSWORD)
try:
destroyer.run()
except KeyboardInterrupt:
destroyer.stop()
if __name__ == '__main__':
main()
This service maintains a map of usernames to an array of connection names. It calls a user-defined method (max_connections_by_user
) that returns a map of usernames to integers representing the maximum number of concurrent connections allowed for each. It compares the current number of open connections for each to the max, if above the maximum it closes connections to maintain the maximum.
Example:
from rmq_utils import ConnectionLimiter
class ExampleConnectionLimiter(ConnectionLimiter):
@staticmethod
def max_connections_by_user():
# Do whatever you want here, perhaps make a call to
# a database to get the max connection per user
return {
'test': 1
}
def main():
limiter = ExampleConnectionLimiter(
mgmt_host=ADMIN_HOST,
mgmt_user=ADMIN_USER,
mgmt_password=ADMIN_PASSWORD)
try:
limiter.run()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()