aioauth
implements OAuth 2.0 protocol and can be used in asynchronous frameworks like FastAPI / Starlette, aiohttp. It can work with any databases like MongoDB
, PostgreSQL
, MySQL
and ORMs like gino, sqlalchemy or databases over simple BaseDB interface.
There are few great OAuth frameworks for Python like oauthlib and authlib, but they do not support asyncio and rewriting these libraries to asyncio is a significant challenge (see issues here and here).
- The OAuth 2.0 Authorization Framework
- OAuth 2.0 Token Introspection
- Proof Key for Code Exchange by OAuth Public Clients
python -m pip install aioauth
import json
from fastapi import FastAPI, Request, Response
from aioauth.server import AuthorizationServer
from aioauth.types import RequestMethod
from aioauth.config import Settings
from aioauth.base.database import BaseDB
from aioauth.requests import (
Query,
Post,
Request as OAuth2Request,
)
from aioauth.responses import Response as OAuth2Response
from aioauth.structures import CaseInsensitiveDict
class DB(BaseDB):
"""Class for interacting with the database. Used by `AuthorizationServer`.
Here you need to override the methods that are responsible for creating tokens,
creating authorization code, getting a client from the database, etc.
"""
async def create_token(self, *args, **kwargs) -> Token:
"""Create token code in db
"""
token = await super().create_token(*args, **kwargs)
# NOTE: Save data from token to db here.
return token
async def create_authorization_code(self, *args, **kwargs) -> AuthorizationCode:
"""Create authorization code in db
"""
authorization_code = await super().create_authorization_code(*args, **kwargs)
# NOTE: Save data from authorization_code to db here.
return authorization_code
async def get_token(self, *args, **kwargs) -> Optional[Token]:
"""Get token from the database by provided request from user.
Returns:
Token: if token exists in db.
None: if no token in db.
"""
token_record = ...
if token_record is not None:
return Token(
access_token=token_record.access_token,
refresh_token=token_record.refresh_token,
scope=token_record.scope,
issued_at=token_record.issued_at,
expires_in=token_record.expires_in,
client_id=token_record.client_id,
token_type=token_record.token_type,
revoked=token_record.revoked
)
async def get_client(self, *args, **kwargs) -> Optional[Client]:
"""Get client record from the database by provided request from user.
Returns:
`Client` instance if client exists in db.
`None` if no client in db.
"""
client_record = ...
if client_record is not None:
return Client(
client_id=client_record.client_id,
client_secret=client_record.client_secret,
grant_types=client_record.grant_types,
response_types=client_record.response_types,
redirect_uris=client_record.redirect_uris,
scope=client_record.scope
)
async def revoke_token(self, request: Request, token: str) -> None:
"""Revokes an existing token. The `revoked`
Flag of the Token must be set to True
"""
token_record = ...
token_record.revoked = True
token_record.save()
async def get_authorization_code(self, *args, **kwargs) -> Optional[AuthorizationCode]:
...
async def delete_authorization_code(self, *args, **kwargs) -> None:
...
async def authenticate(self, *args, **kwargs) -> bool:
...
app = FastAPI()
server = AuthorizationServer(db=DB())
# NOTE: Redefinition of the default aioauth settings
# INSECURE_TRANSPORT must be enabled for local development only!
settings = Settings(
INSECURE_TRANSPORT=True,
)
@app.post("/token")
async def token(request: Request) -> Response:
"""Endpoint to obtain an access and/or ID token by presenting an authorization grant or refresh token.
See Section 4.1.3: https://tools.ietf.org/html/rfc6749#section-4.1.3
"""
oauth2_request: OAuth2Request = await to_oauth2_request(request)
oauth2_response: OAuth2Response = await server.create_token_response(oauth2_request)
return await to_fastapi_response(oauth2_response)
@app.get("/authorize")
async def authorize(request: Request) -> Response:
"""Endpoint to interact with the resource owner and obtain an authorization grant.
See Section 4.1.1: https://tools.ietf.org/html/rfc6749#section-4.1.1
"""
oauth2_request: OAuth2Request = await to_oauth2_request(request)
oauth2_response: OAuth2Response = await server.create_authorization_response(oauth2_request)
return await to_fastapi_response(oauth2_response)
@app.get("/introspect")
async def introspect(request: Request) -> Response:
"""Endpoint returns information about a token.
See Section 2.1: https://tools.ietf.org/html/rfc7662#section-2.1
"""
oauth2_request: OAuth2Request = await to_oauth2_request(request)
oauth2_response: OAuth2Response = await server.create_token_introspection_response(oauth2_request)
return await to_fastapi_response(oauth2_response)
async def to_oauth2_request(request: Request) -> OAuth2Request:
"""Converts fastapi Request instance to OAuth2Request instance"""
form = await request.form()
post = dict(form)
query_params = dict(request.query_params)
method = request.method
headers = CaseInsensitiveDict(**request.headers)
url = str(request.url)
# NOTE: AuthenticationMiddleware must be installed
user = None
if request.user.is_authenticated:
user = request.user
return OAuth2Request(
settings=settings,
method=RequestMethod[method],
headers=headers,
post=Post(**post),
query=Query(**query_params),
url=url,
user=user,
)
async def to_fastapi_response(oauth2_response: OAuth2Response) -> Response:
"""Converts OAuth2Response instance to fastapi Response instance"""
response_content = oauth2_response.content._asdict() if oauth2_response.content is not None else {}
headers = dict(oauth2_response.headers)
status_code = oauth2_response.status_code
content = json.dumps(response_content)
return Response(content=content, headers=headers, status_code=status_code)
Setting | Default value | Description |
---|---|---|
TOKEN_EXPIRES_IN | 86400 | Access token lifetime. |
AUTHORIZATION_CODE_EXPIRES_IN | 300 | Authorization code lifetime. |
INSECURE_TRANSPORT | False | Allow connections over SSL only. When this option is disabled server will raise "HTTP method is not allowed" error. |