/aws-msk-iam-sasl-signer-python

Primary LanguagePythonApache License 2.0Apache-2.0

aws-msk-iam-sasl-signer-python

Package Version Python Versions Build status License Security Scan

This is an Amazon MSK Library in Python. This library provides a function to generates a base 64 encoded signed url to enable authentication/authorization with an MSK Cluster. The signed url is generated by using your IAM credentials.

  • Free software: Apache Software License 2.0

Features

  • Provides a function to generate auth token using IAM credentials from the AWS default credentials chain.
  • Provides a function to generate auth token using IAM credentials from the AWS named profile.
  • Provides a function to generate auth token using assumed IAM role's credentials.
  • Provides a function to generate auth token using a CredentialProvider. The CredentialProvider should be inherited from botocore.credentials.CredentialProvider class.

Get Started

  • For installation, refer to installation guide
  • In order to use the signer library with a Kafka client library with SASL/OAUTHBEARER mechanism, add the callback function in your code.
  • For example, here is the sample code to use with dpkp/kafka-python library:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('<my aws region>')
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers='<my bootstrap string>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
)

topic = "<my-topic>"
while True:
    try:
        inp=input(">")
        producer.send(topic, inp.encode())
        producer.flush()
        print("Produced!")
    except Exception:
        print("Failed to send message:", e)

producer.close()
from confluent_kafka import Consumer
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("<my aws region>")
    # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms
    return auth_token, expiry_ms/1000

c = Consumer({
    "debug": "all",
    'bootstrap.servers': "<my bootstrap string>",
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb,
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['<my-topic>'])

print("Starting consumer!")

while True:
    msg = c.poll(5)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()
  • In order to use a named profile to generate token, replace the token() function with code below :
class MSKTokenProvider():
    def token(self):
        oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_profile('<your aws region>', '<named_profile>')
        return oauth2_token
  • In order to use a role arn to generate token, replace the token() function with code below :
class MSKTokenProvider():
    def token(self):
        oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_role_arn('<your aws region>', '<role_arn>')
        return oauth2_token
  • In order to use a custom credentials provider, replace the token() function with code below :
class MSKTokenProvider():
    def token(self):
        oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_credentials_provider('<your aws region>', '<your_credentials_provider')
        return oauth2_token

Running Tests

You can run tests in all supported Python versions using pytest. By default, it will run all of the unit tests.

$ pytest

You can also run tests with setup.py:

$ python setup.py test

To fix lint issues, run the pre-commit command:

$ pre-commit run --all-files

To run tests with coverage information, run:

$ coverage run --source=aws_msk_iam_sasl_signer.MSKAuthTokenProvider -m pytest tests/test_auth_token_provider.py
$ coverage report -m

Troubleshooting

Finding out which identity is being used

You may receive an Access denied error and there may be some doubt as to which credential is being exactly used. The credential may be sourced from a role ARN, EC2 instance profile, credential profile etc. When calling generate_auth_token(), you can set aws_debug_creds argument to True along with client side logging set to DEBUG then the signer library will print a debug log of the form:

MSKAuthTokenProvider.generate_auth_token('<my aws region>', aws_debug_creds = True)
Credentials Identity: {UserId: ABCD:test124, Account: 1234567890, Arn: arn:aws:sts::1234567890:assumed-role/abc/test124}

The log line provides the IAM Account, IAM user id and the ARN of the IAM Principal corresponding to the credential being used.

Getting Help

Please use these community resources for getting help. We use the GitHub issues for tracking bugs and feature requests.

This repository provides a pluggable library with any Python Kafka client for SASL/OAUTHBEARER mechanism. For more information about SASL/OAUTHBEARER mechanism please go to KIP 255.

Opening Issues

If you encounter a bug with the AWS MSK IAM SASL Signer for Python, we would like to hear about it. Search the Issues and see if others are also experiencing the same issue before opening a new issue. Please include the version of AWS MSK IAM SASL Signer for Python, Python, and OS you’re using. Please also include reproduction case when appropriate.

The GitHub issues are intended for bug reports and feature requests. For help and questions with using AWS MSK IAM SASL Signer for Python, please make use of the resources listed in the Getting Help section. Keeping the list of open issues lean will help us respond in a timely manner.

Contributing

We value feedback and contributions from our community. Whether it's a bug report, new feature, correction, or additional documentation, we welcome your issues and pull requests. Please read through this CONTRIBUTING document before submitting any issues or pull requests to ensure we have all the necessary information to effectively respond to your contribution.

More Resources

Credits

This package was created with Cookiecutter and the audreyr/cookiecutter-pypackage project template.