AthenaAccessKeyProfileMapping does not work as expected locally
Opened this issue ยท 3 comments
When defining an AWS connection like so in airflow_settings.yaml
:
airflow:
connections:
- conn_id: aws_default
conn_type: aws
conn_host:
conn_schema:
conn_login: {aws_access_key_id}
conn_password: {aws_secret_access_key}
conn_port:
conn_extra:
Running a dag with AthenaAccessKeyProfileMapping
returns this error:
botocore.exceptions.ClientError: An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid
And that's because AwsGenericHook
returns frozen_credentials
as defined above, i.e. no session token, when calling get_credentials
in AthenaAccessKeyProfileMapping
.
To make the dag run successfully, I have to use AWS CLI aws sts get-session-token
and then define the connection like this:
airflow:
connections:
- conn_id: aws_default
conn_type: aws
conn_host:
conn_schema:
conn_login: {session_access_key_id}
conn_password: {session_secret_access_key}
conn_port:
conn_extra:
aws_session_token: {session_token)
However, this is annoying to do every time a session expires. To fix this issue, I subclassed AthenaAccessKeyProfileMapping
to make the call to sts
in code:
from botocore.credentials import ReadOnlyCredentials
from cosmos.profiles import (
AthenaAccessKeyProfileMapping as BaseAthenaAccessKeyProfileMapping,
)
class AthenaAccessKeyProfileMapping(BaseAthenaAccessKeyProfileMapping):
def _get_temporary_credentials(self): # type: ignore
"""
Helper function to retrieve temporary short lived credentials
Returns an object including access_key, secret_key and token
"""
# use StsHook to get session
from airflow.providers.amazon.aws.hooks.sts import StsHook
hook = StsHook(self.conn_id) # type: ignore
duration = self.profile_args.get("duration_seconds", 3600)
sts_response = hook.conn.get_session_token(DurationSeconds=duration)
credentials = ReadOnlyCredentials(
sts_response["Credentials"]["AccessKeyId"],
sts_response["Credentials"]["SecretAccessKey"],
sts_response["Credentials"]["SessionToken"],
)
return credentials
I'm not sure if this is the best solution. Additionally, I don't think this will work with a deployed Airflow that runs with an IAM role that already has a session. That is, it will fail to call get_session_token
due to session limitation:
The temporary security credentials created by GetSessionToken can be used to make API calls to any Amazon Web Services service with the following exceptions: You cannot call any STS API except AssumeRole or GetCallerIdentity
Your approach of subclassing AthenaAccessKeyProfileMapping
to call sts
for temporary credentials is viable and aligns with the existing mechanisms in the Cosmos repository. The provided code already includes a method _get_temporary_credentials
that retrieves temporary short-lived credentials using the AwsGenericHook
from Airflow. This method is used within the profile
property to set the aws_access_key_id
, aws_secret_access_key
, and aws_session_token
.
Here is the relevant part of the code that confirms this approach:
class AthenaAccessKeyProfileMapping(BaseProfileMapping):
# ... other parts of the class ...
@property
def profile(self) -> dict[str, Any | None]:
"Gets profile. The password is stored in an environment variable."
self.temporary_credentials = self._get_temporary_credentials() # type: ignore
profile = {
**self.mapped_params,
**self.profile_args,
"aws_access_key_id": self.temporary_credentials.access_key,
"aws_secret_access_key": self.get_env_var_format("aws_secret_access_key"),
"aws_session_token": self.get_env_var_format("aws_session_token"),
}
return self.filter_null(profile)
def _get_temporary_credentials(self): # type: ignore
"""
Helper function to retrieve temporary short lived credentials
Returns an object including access_key, secret_key and token
"""
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
hook = AwsGenericHook(self.conn_id) # type: ignore
credentials = hook.get_credentials()
return credentials
Additionally, the tests provided in the repository validate the functionality of the AthenaAccessKeyProfileMapping
class, ensuring that it correctly claims the connection type and sets the profile values. The tests use mock objects to simulate the AWS credentials and verify that the profile mapping behaves as expected [1][2].
However, your concern about the deployed Airflow running with an IAM role that already has a session is valid. The get_session_token
call might fail due to session limitations. To address this, you can conditionally check if the session token is already present and only call get_session_token
if it is not:
def _get_temporary_credentials(self): # type: ignore
"""
Helper function to retrieve temporary short lived credentials
Returns an object including access_key, secret_key and token
"""
from airflow.providers.amazon.aws.hooks.sts import StsHook
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
hook = AwsGenericHook(self.conn_id) # type: ignore
credentials = hook.get_credentials()
if not credentials.token:
sts_hook = StsHook(self.conn_id) # type: ignore
duration = self.profile_args.get("duration_seconds", 3600)
credentials = sts_hook.conn.get_session_token(DurationSeconds=duration)
return credentials
This way, the code will only request a new session token if one is not already present, avoiding the session limitation issue [1].
Details
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a ๐ if this is helpful and ๐ if it is irrelevant.
To continue the conversation, mention @dosu.
Hey @tuan-h, Would you like to submit a PR?
@pankajastro Yes, I can submit a PR based on the bot's suggestion if you think that's ok. Specifically, using AwsGenericHook
first and if the token is not present, fall back to StsHook
.