apache/airflow-client-python

Get Dags call is not working.

varun-shahdadpuri opened this issue · 4 comments

Response from API call via Postman -
'/dags?limit=1

{
    "dag_id": "Disaster_Recovery",
    "description": null,
    "file_token": "Ii91c3IvbG9jYWwvYWlyZmxvdy9kYWdzL2NvbmNvcmRfZGFnc19kci5weSI.fWukEYmKdWxEz4D_qEy-F2BqCZw",
    "fileloc": "/usr/local/airflow/dags/concord_dags_dr.py",
    "is_active": true,
    "is_paused": false,
    "is_subdag": false,
    "owners": [
        "Airflow"
    ],
    "root_dag_id": null,
    "schedule_interval": {
        "__type": "CronExpression",
        "value": "*/5 * * * *"
    },
    "tags": []
}

Using the api-client to get the same information leads to error -

        with airflow_client.client.ApiClient(self.configuration) as api_client:
            # Create an instance of the API class
            api_instance = dag_api.DAGApi(api_client)
            try:
                api_response = api_instance.get_dags(limit=1)
                pprint(api_response)
airflow_client.client.exceptions.ApiValueError: Invalid inputs given to generate an instance of 'DAGCollectionAllOf'. The input data was invalid for the allOf schema 'DAGCollectionAllOf' in the composed schema 'DAGCollection'. Error=DAG has no attribute 'is_active' at ['received_data']['dags'][0]['is_active']

I did tried it on many Dags and it works fine for me.

Dag doesn’t have is_active attribute. It is used only and only when the dag file is removed. See here..

It seems there is some misconfiguration in your Dag. Can you share this dag here?

👋 hello everyone, I have the same issue with the following:

docker-compose.yaml

version: "3.9"

x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy  

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.1.0
        min_airlfow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airlfow_version_comparable )); then
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo -e "\033[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
          echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
            https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
          exit 1
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:${AIRFLOW_GID:-0}"
    volumes:
      - .:/sources

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:

./dags/tutorial.py

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

    t1 >> [t2, t3]

The code I am running (inside a Jupyter Notebook running in JupyterLab)
notebook_cell

import time
import airflow_client.client as client
from airflow_client.client.api import dag_api
from airflow_client.client.model.dag_collection import DAGCollection
from airflow_client.client.model.error import Error
from pprint import pprint

# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.

# Configure HTTP basic authorization: Basic
configuration = client.Configuration(
    username = 'airflow',
    password = 'airflow',
    host = "http://airflow-webserver:8080/api/v1"
)

# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api_client:
    # Create an instance of the API class
    api_instance = dag_api.DAGApi(api_client)
    limit = 100 # int | The numbers of items to return. (optional) if omitted the server will use the default value of 100
    offset = 0 # int | The number of items to skip before starting to collect the result set. (optional)
    # example passing only required values which don't have defaults set
    # and optional values
    try:
        # List DAGs
        api_response = api_instance.get_dags(limit=limit, offset=offset, order_by=order_by)
        pprint(api_response)
    except client.ApiException as e:
        print("Exception when calling DAGApi->get_dags: %s\n" % e)

The error I am getting:

...
/opt/conda/lib/python3.8/site-packages/airflow_client/client/model_utils.py in get_allof_instances(self, model_args, constant_args)
   1628             composed_instances.append(allof_instance)
   1629         except Exception as ex:
-> 1630             raise ApiValueError(
   1631                 "Invalid inputs given to generate an instance of '%s'. The "
   1632                 "input data was invalid for the allOf schema '%s' in the composed "

ApiValueError: Invalid inputs given to generate an instance of 'DAGCollectionAllOf'. The input data was invalid for the allOf schema 'DAGCollectionAllOf' in the composed schema 'DAGCollection'. Error=DAG has no attribute 'is_active' at ['received_data']['dags'][0]['is_active']

The DAG is running fine by looking at the UI:
image

Dag doesn’t have is_active attribute

When you query a DAG from the API it does.

Postman screenshot
image

I confirm the issue. I tested following versions combinations:

apache-airflow==2.1.4
apache-airflow-client==2.1.0

and

apache-airflow==2.2.0
apache-airflow-client==2.1.0

In both cases I get the same error for get_dag and get_dags method:

airflow_client.client.exceptions.ApiAttributeError: DAG has no attribute 'is_active' at ['received_data']['is_active']

Raw API response:

curl -X 'GET' 'http://localhost:8080/api/v1/dags?limit=100&only_active=true' -H 'accept: application/json'
{
  "dags": [
    {
      "dag_id": "test_dag",
      "description": null,
      "file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL3Rlc3RfZGFnLnB5Ig.OapbMJoMLHYlHJ9XnD_xClqxLKw",
      "fileloc": "/opt/airflow/dags/test_dag.py",
      "is_active": true,
      "is_paused": true,
      "is_subdag": false,
      "owners": [
        "airflow"
      ],
      "root_dag_id": null,
      "schedule_interval": null,
      "tags": []
    }
  ],
  "total_entries": 1
}

I'm using very simple DAG:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator


with DAG(
        'test_dag',
        schedule_interval=None,
        start_date=datetime(2021, 1, 1),
) as dag:
    PythonOperator(
        task_id=f'test_task',
        python_callable=lambda: print('hi'),
        dag=dag
    )

Working as expected on latest client version (2.5.1) and latest airflow (2.5.1).

    # Get dag list
    dag_api_instance = dag_api.DAGApi(api_client)
    try:
        api_response = dag_api_instance.get_dags(limit=1)
        pprint(api_response)
    except airflow_client.client.OpenApiException as e:
        print("Exception when calling DagAPI->get_dags: %s\n" % e)

Response:

image

Closing for now, feel free to reopen if needed.