astronomer/astronomer-cosmos

AthenaAccessKeyProfileMapping does not work as expected locally

Opened this issue ยท 3 comments

When defining an AWS connection like so in airflow_settings.yaml:

airflow:
  connections:
    - conn_id: aws_default
      conn_type: aws
      conn_host:
      conn_schema:
      conn_login: {aws_access_key_id}
      conn_password: {aws_secret_access_key} 
      conn_port:
      conn_extra:

Running a dag with AthenaAccessKeyProfileMapping returns this error:

botocore.exceptions.ClientError: An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid

And that's because AwsGenericHook returns frozen_credentials as defined above, i.e. no session token, when calling get_credentials in AthenaAccessKeyProfileMapping.

To make the dag run successfully, I have to use AWS CLI aws sts get-session-token and then define the connection like this:

airflow:
  connections:
    - conn_id: aws_default
      conn_type: aws
      conn_host:
      conn_schema:
      conn_login: {session_access_key_id}
      conn_password: {session_secret_access_key} 
      conn_port:
      conn_extra:
        aws_session_token: {session_token)

However, this is annoying to do every time a session expires. To fix this issue, I subclassed AthenaAccessKeyProfileMapping to make the call to sts in code:

from botocore.credentials import ReadOnlyCredentials
from cosmos.profiles import (
    AthenaAccessKeyProfileMapping as BaseAthenaAccessKeyProfileMapping,
)


class AthenaAccessKeyProfileMapping(BaseAthenaAccessKeyProfileMapping):
    def _get_temporary_credentials(self):  # type: ignore
        """
        Helper function to retrieve temporary short lived credentials
        Returns an object including access_key, secret_key and token
        """
        # use StsHook to get session
        from airflow.providers.amazon.aws.hooks.sts import StsHook

        hook = StsHook(self.conn_id)  # type: ignore
        duration = self.profile_args.get("duration_seconds", 3600)
        sts_response = hook.conn.get_session_token(DurationSeconds=duration)
        credentials = ReadOnlyCredentials(
            sts_response["Credentials"]["AccessKeyId"],
            sts_response["Credentials"]["SecretAccessKey"],
            sts_response["Credentials"]["SessionToken"],
        )
        return credentials

I'm not sure if this is the best solution. Additionally, I don't think this will work with a deployed Airflow that runs with an IAM role that already has a session. That is, it will fail to call get_session_token due to session limitation:

The temporary security credentials created by GetSessionToken can be used to make API calls to any Amazon Web Services service with the following exceptions: You cannot call any STS API except AssumeRole or GetCallerIdentity

Your approach of subclassing AthenaAccessKeyProfileMapping to call sts for temporary credentials is viable and aligns with the existing mechanisms in the Cosmos repository. The provided code already includes a method _get_temporary_credentials that retrieves temporary short-lived credentials using the AwsGenericHook from Airflow. This method is used within the profile property to set the aws_access_key_id, aws_secret_access_key, and aws_session_token.

Here is the relevant part of the code that confirms this approach:

class AthenaAccessKeyProfileMapping(BaseProfileMapping):
    # ... other parts of the class ...

    @property
    def profile(self) -> dict[str, Any | None]:
        "Gets profile. The password is stored in an environment variable."

        self.temporary_credentials = self._get_temporary_credentials()  # type: ignore

        profile = {
            **self.mapped_params,
            **self.profile_args,
            "aws_access_key_id": self.temporary_credentials.access_key,
            "aws_secret_access_key": self.get_env_var_format("aws_secret_access_key"),
            "aws_session_token": self.get_env_var_format("aws_session_token"),
        }

        return self.filter_null(profile)

    def _get_temporary_credentials(self):  # type: ignore
        """
        Helper function to retrieve temporary short lived credentials
        Returns an object including access_key, secret_key and token
        """
        from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook

        hook = AwsGenericHook(self.conn_id)  # type: ignore
        credentials = hook.get_credentials()
        return credentials

Additionally, the tests provided in the repository validate the functionality of the AthenaAccessKeyProfileMapping class, ensuring that it correctly claims the connection type and sets the profile values. The tests use mock objects to simulate the AWS credentials and verify that the profile mapping behaves as expected [1][2].

However, your concern about the deployed Airflow running with an IAM role that already has a session is valid. The get_session_token call might fail due to session limitations. To address this, you can conditionally check if the session token is already present and only call get_session_token if it is not:

def _get_temporary_credentials(self):  # type: ignore
    """
    Helper function to retrieve temporary short lived credentials
    Returns an object including access_key, secret_key and token
    """
    from airflow.providers.amazon.aws.hooks.sts import StsHook
    from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook

    hook = AwsGenericHook(self.conn_id)  # type: ignore
    credentials = hook.get_credentials()

    if not credentials.token:
        sts_hook = StsHook(self.conn_id)  # type: ignore
        duration = self.profile_args.get("duration_seconds", 3600)
        credentials = sts_hook.conn.get_session_token(DurationSeconds=duration)

    return credentials

This way, the code will only request a new session token if one is not already present, avoiding the session limitation issue [1].

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a ๐Ÿ‘ if this is helpful and ๐Ÿ‘Ž if it is irrelevant.

To continue the conversation, mention @dosu.

Hey @tuan-h, Would you like to submit a PR?

@pankajastro Yes, I can submit a PR based on the bot's suggestion if you think that's ok. Specifically, using AwsGenericHook first and if the token is not present, fall back to StsHook.