Get Dags call is not working.
varun-shahdadpuri opened this issue · 4 comments
Response from API call via Postman -
'/dags?limit=1
{
"dag_id": "Disaster_Recovery",
"description": null,
"file_token": "Ii91c3IvbG9jYWwvYWlyZmxvdy9kYWdzL2NvbmNvcmRfZGFnc19kci5weSI.fWukEYmKdWxEz4D_qEy-F2BqCZw",
"fileloc": "/usr/local/airflow/dags/concord_dags_dr.py",
"is_active": true,
"is_paused": false,
"is_subdag": false,
"owners": [
"Airflow"
],
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "*/5 * * * *"
},
"tags": []
}
Using the api-client to get the same information leads to error -
with airflow_client.client.ApiClient(self.configuration) as api_client:
# Create an instance of the API class
api_instance = dag_api.DAGApi(api_client)
try:
api_response = api_instance.get_dags(limit=1)
pprint(api_response)
airflow_client.client.exceptions.ApiValueError: Invalid inputs given to generate an instance of 'DAGCollectionAllOf'. The input data was invalid for the allOf schema 'DAGCollectionAllOf' in the composed schema 'DAGCollection'. Error=DAG has no attribute 'is_active' at ['received_data']['dags'][0]['is_active']
I did tried it on many Dags and it works fine for me.
Dag doesn’t have is_active
attribute. It is used only and only when the dag file is removed. See here..
It seems there is some misconfiguration in your Dag. Can you share this dag here?
👋 hello everyone, I have the same issue with the following:
docker-compose.yaml
version: "3.9"
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.1.0
min_airlfow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airlfow_version_comparable )); then
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo -e "\033[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
exit 1
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:${AIRFLOW_GID:-0}"
volumes:
- .:/sources
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
volumes:
postgres-db-volume:
./dags/tutorial.py
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
t1 >> [t2, t3]
The code I am running (inside a Jupyter Notebook running in JupyterLab)
notebook_cell
import time
import airflow_client.client as client
from airflow_client.client.api import dag_api
from airflow_client.client.model.dag_collection import DAGCollection
from airflow_client.client.model.error import Error
from pprint import pprint
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure HTTP basic authorization: Basic
configuration = client.Configuration(
username = 'airflow',
password = 'airflow',
host = "http://airflow-webserver:8080/api/v1"
)
# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = dag_api.DAGApi(api_client)
limit = 100 # int | The numbers of items to return. (optional) if omitted the server will use the default value of 100
offset = 0 # int | The number of items to skip before starting to collect the result set. (optional)
# example passing only required values which don't have defaults set
# and optional values
try:
# List DAGs
api_response = api_instance.get_dags(limit=limit, offset=offset, order_by=order_by)
pprint(api_response)
except client.ApiException as e:
print("Exception when calling DAGApi->get_dags: %s\n" % e)
The error I am getting:
...
/opt/conda/lib/python3.8/site-packages/airflow_client/client/model_utils.py in get_allof_instances(self, model_args, constant_args)
1628 composed_instances.append(allof_instance)
1629 except Exception as ex:
-> 1630 raise ApiValueError(
1631 "Invalid inputs given to generate an instance of '%s'. The "
1632 "input data was invalid for the allOf schema '%s' in the composed "
ApiValueError: Invalid inputs given to generate an instance of 'DAGCollectionAllOf'. The input data was invalid for the allOf schema 'DAGCollectionAllOf' in the composed schema 'DAGCollection'. Error=DAG has no attribute 'is_active' at ['received_data']['dags'][0]['is_active']
The DAG is running fine by looking at the UI:
Dag doesn’t have is_active attribute
When you query a DAG from the API it does.
I confirm the issue. I tested following versions combinations:
apache-airflow==2.1.4
apache-airflow-client==2.1.0
and
apache-airflow==2.2.0
apache-airflow-client==2.1.0
In both cases I get the same error for get_dag
and get_dags
method:
airflow_client.client.exceptions.ApiAttributeError: DAG has no attribute 'is_active' at ['received_data']['is_active']
Raw API response:
curl -X 'GET' 'http://localhost:8080/api/v1/dags?limit=100&only_active=true' -H 'accept: application/json'
{
"dags": [
{
"dag_id": "test_dag",
"description": null,
"file_token": "Ii9vcHQvYWlyZmxvdy9kYWdzL3Rlc3RfZGFnLnB5Ig.OapbMJoMLHYlHJ9XnD_xClqxLKw",
"fileloc": "/opt/airflow/dags/test_dag.py",
"is_active": true,
"is_paused": true,
"is_subdag": false,
"owners": [
"airflow"
],
"root_dag_id": null,
"schedule_interval": null,
"tags": []
}
],
"total_entries": 1
}
I'm using very simple DAG:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
'test_dag',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
) as dag:
PythonOperator(
task_id=f'test_task',
python_callable=lambda: print('hi'),
dag=dag
)
Working as expected on latest client version (2.5.1) and latest airflow (2.5.1).
# Get dag list
dag_api_instance = dag_api.DAGApi(api_client)
try:
api_response = dag_api_instance.get_dags(limit=1)
pprint(api_response)
except airflow_client.client.OpenApiException as e:
print("Exception when calling DagAPI->get_dags: %s\n" % e)
Response:
Closing for now, feel free to reopen if needed.