astronomer/astronomer-cosmos

[Bug] Unable to do partial parsing after disabling Cosmos caching.

josefbits opened this issue · 3 comments

Astronomer Cosmos Version

Other Astronomer Cosmos version (please specify below)

If "Other Astronomer Cosmos version" selected, which one?

1.4.3

dbt-core version

1.7.15

Versions of dbt adapters

dbt-postgres 1.7.15

LoadMode

DBT_LS_MANIFEST

ExecutionMode

VIRTUALENV

InvocationMode

DBT_RUNNER

airflow version

2.8.2

Operating System

Ubuntu 22.04.4 LTS

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Docker-Compose

Deployment details

  • Docker version 25.0.3, build 4debf41
  • Docker Compose version v2.24.6

What happened?

After setting AIRFLOW__COSMOS__ENABLE_CACHE=False dbt tasks are unable to run partial parsing due to the error "Unable to do partial parsing because saved manifest not found. Starting full parse."

The dbt artifacts are pre-generated in CI and the entire target/ directory synced to the dbt project path on the Airflow host.

Relevant log output

**** With AIRFLOW__COSMOS__ENABLE_CACHE=False ****
08a120f7b403
*** Found local files:
***   * /opt/airflow/logs/dag_id=cosmos_dag/run_id=manual__2024-06-11T13:37:41.235278+00:00/task_id=run_models.raw_payments_seed/attempt=1.log
[2024-06-11, 13:37:47 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [queued]>
[2024-06-11, 13:37:47 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [queued]>
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2214} INFO - Executing <Task(DbtSeedVirtualenvOperator): run_models.raw_payments_seed> on 2024-06-11 13:37:41.235278+00:00
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:60} INFO - Started process 1265 to run task
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'cosmos_dag', 'run_models.raw_payments_seed', 'manual__2024-06-11T13:37:41.235278+00:00', '--job-id', '862', '--raw', '--subdir', 'DAGS_FOLDER/cosmos_dag.py', '--cfg-path', '/tmp/tmppaayxmna']
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:88} INFO - Job 862: Subtask run_models.raw_payments_seed
[2024-06-11, 13:37:47 UTC] {task_command.py:423} INFO - Running <TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [running]> on host 08a120f7b403
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='cosmos_dag' AIRFLOW_CTX_TASK_ID='run_models.raw_payments_seed' AIRFLOW_CTX_EXECUTION_DATE='2024-06-11T13:37:41.235278+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-11T13:37:41.235278+00:00'
[2024-06-11, 13:37:47 UTC] {local.py:292} INFO - Cloning project to writable temp directory /tmp/tmpn1tuhvtw from /opt/airflow/dags/dbt/jaffle_shop
[2024-06-11, 13:37:47 UTC] {config.py:266} INFO - Using user-supplied profiles.yml at /opt/***/dags/dbt/jaffle_shop/profiles.yml
[2024-06-11, 13:37:47 UTC] {subprocess.py:60} INFO - Tmp dir root location: 
 /tmp
[2024-06-11, 13:37:47 UTC] {subprocess.py:73} INFO - Running command: ['/opt/***/dbt_venv/bin/dbt', 'seed', '--models', 'raw_payments', '--project-dir', '/tmp/tmpn1tuhvtw', '--profiles-dir', '/opt/***/dags/dbt/jaffle_shop', '--profile', 'jaffle_shop', '--target', 'dev']
[2024-06-11, 13:37:47 UTC] {subprocess.py:84} INFO - Command output:
[2024-06-11, 13:37:57 UTC] {subprocess.py:94} INFO - �[0m13:37:57  Running with dbt=1.7.15
[2024-06-11, 13:37:58 UTC] {subprocess.py:94} INFO - �[0m13:37:58  Registered adapter: postgres=1.7.15
[2024-06-11, 13:37:58 UTC] {subprocess.py:94} INFO - �[0m13:37:58  Unable to do partial parsing because saved manifest not found. Starting full parse.
[2024-06-11, 13:38:05 UTC] {subprocess.py:94} INFO - �[0m13:38:05  Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 401 macros, 0 groups, 0 semantic models


**** With AIRFLOW__COSMOS__ENABLE_CACHE=True ****
1cf95644acec
*** Found local files:
***   * /opt/airflow/logs/dag_id=cosmos_dag/run_id=manual__2024-06-11T13:55:34.501323+00:00/task_id=run_models.raw_payments_seed/attempt=1.log
[2024-06-11, 13:55:43 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [queued]>
[2024-06-11, 13:55:43 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [queued]>
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2214} INFO - Executing <Task(DbtSeedVirtualenvOperator): run_models.raw_payments_seed> on 2024-06-11 13:55:34.501323+00:00
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:60} INFO - Started process 75 to run task
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'cosmos_dag', 'run_models.raw_payments_seed', 'manual__2024-06-11T13:55:34.501323+00:00', '--job-id', '874', '--raw', '--subdir', 'DAGS_FOLDER/cosmos_dag.py', '--cfg-path', '/tmp/tmpb4__880_']
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:88} INFO - Job 874: Subtask run_models.raw_payments_seed
[2024-06-11, 13:55:43 UTC] {task_command.py:423} INFO - Running <TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [running]> on host 1cf95644acec
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='cosmos_dag' AIRFLOW_CTX_TASK_ID='run_models.raw_payments_seed' AIRFLOW_CTX_EXECUTION_DATE='2024-06-11T13:55:34.501323+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-11T13:55:34.501323+00:00'
[2024-06-11, 13:55:43 UTC] {local.py:292} INFO - Cloning project to writable temp directory /tmp/tmpeszetm6h from /opt/airflow/dags/dbt/jaffle_shop
[2024-06-11, 13:55:43 UTC] {local.py:303} INFO - Partial parse is enabled and the latest partial parse file is /opt/airflow/dags/dbt/jaffle_shop/target/partial_parse.msgpack
[2024-06-11, 13:55:44 UTC] {config.py:266} INFO - Using user-supplied profiles.yml at /opt/***/dags/dbt/jaffle_shop/profiles.yml
[2024-06-11, 13:55:44 UTC] {subprocess.py:60} INFO - Tmp dir root location: 
 /tmp
[2024-06-11, 13:55:44 UTC] {subprocess.py:73} INFO - Running command: ['/opt/***/dbt_venv/bin/dbt', 'seed', '--models', 'raw_payments', '--project-dir', '/tmp/tmpeszetm6h', '--profiles-dir', '/opt/***/dags/dbt/jaffle_shop', '--profile', 'jaffle_shop', '--target', 'dev']
[2024-06-11, 13:55:44 UTC] {subprocess.py:84} INFO - Command output:[2024-06-11, 13:55:54 UTC] {subprocess.py:94} INFO - �[0m13:55:54  Running with dbt=1.7.15
[2024-06-11, 13:55:55 UTC] {subprocess.py:94} INFO - �[0m13:55:55  Registered adapter: postgres=1.7.15
[2024-06-11, 13:55:55 UTC] {subprocess.py:94} INFO - �[0m13:55:55  Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 401 macros, 0 groups, 0 semantic models

How to reproduce

Set AIRFLOW__COSMOS__ENABLE_CACHE=False environment variable in the Airflow configuration.

Anything else :)?

I have been able to reproduce this in our MWAA deployment as well. It's also running astronomer-cosmos v1.4.3 but on a different Airflow version, dbt-core version, and the dbt-teradata adapter so it doesn't seem to be environment specific.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Contact Details

Hello @josefbits! I'm here to help you with any bugs, questions, or contributions you might have. Let me know how I can assist you!

To resolve the issue with partial parsing in dbt after disabling Cosmos caching in Airflow, you need to ensure that the partial parse file and the manifest file are correctly copied and updated in the target dbt project directory. Here is a step-by-step approach:

  1. Identify the latest partial parse file:
    Use the _get_latest_partial_parse function to determine the most up-to-date partial parse file.

  2. Update the cache with the latest partial parse file:
    Use the _update_partial_parse_cache function to update the cache directory with the latest partial parse file contents.

  3. Copy the partial parse file to the project directory:
    Use the _copy_partial_parse_to_project function to copy the partial parse file and the manifest file to the target dbt project directory.

Here is the relevant code to perform these steps:

from pathlib import Path
from cosmos.dbt.project import get_partial_parse_path
from cosmos.cache import _get_latest_partial_parse, _update_partial_parse_cache, _copy_partial_parse_to_project

# Define the paths
dbt_project_path = Path("/path/to/your/dbt/project")
cache_dir = Path("/path/to/your/cache/directory")

# Step 1: Identify the latest partial parse file
latest_partial_parse_filepath = _get_latest_partial_parse(dbt_project_path, cache_dir)

if latest_partial_parse_filepath:
    # Step 2: Update the cache with the latest partial parse file
    _update_partial_parse_cache(latest_partial_parse_filepath, cache_dir)
    
    # Step 3: Copy the partial parse file to the project directory
    _copy_partial_parse_to_project(latest_partial_parse_filepath, dbt_project_path)
else:
    print("No partial parse file found.")

Ensure that the paths to your dbt project and cache directory are correctly specified. This code will help maintain the partial parse file and manifest file in sync, thus resolving the issue with partial parsing in dbt [1].

To continue talking to Dosu, mention @dosu.

Hello @josefbits! I'm here to help you with any bugs, questions, or contributions you might have. Let me know how I can assist you!

To resolve the issue with partial parsing in dbt after disabling Cosmos caching in Airflow, you need to ensure that the partial parse file and the manifest file are correctly copied and updated in the target dbt project directory. Here is a step-by-step approach:

  1. Identify the latest partial parse file:
    Use the _get_latest_partial_parse function to determine the most up-to-date partial parse file.
  2. Update the cache with the latest partial parse file:
    Use the _update_partial_parse_cache function to update the cache directory with the latest partial parse file contents.
  3. Copy the partial parse file to the project directory:
    Use the _copy_partial_parse_to_project function to copy the partial parse file and the manifest file to the target dbt project directory.

Here is the relevant code to perform these steps:

from pathlib import Path
from cosmos.dbt.project import get_partial_parse_path
from cosmos.cache import _get_latest_partial_parse, _update_partial_parse_cache, _copy_partial_parse_to_project

# Define the paths
dbt_project_path = Path("/path/to/your/dbt/project")
cache_dir = Path("/path/to/your/cache/directory")

# Step 1: Identify the latest partial parse file
latest_partial_parse_filepath = _get_latest_partial_parse(dbt_project_path, cache_dir)

if latest_partial_parse_filepath:
    # Step 2: Update the cache with the latest partial parse file
    _update_partial_parse_cache(latest_partial_parse_filepath, cache_dir)
    
    # Step 3: Copy the partial parse file to the project directory
    _copy_partial_parse_to_project(latest_partial_parse_filepath, dbt_project_path)
else:
    print("No partial parse file found.")

Ensure that the paths to your dbt project and cache directory are correctly specified. This code will help maintain the partial parse file and manifest file in sync, thus resolving the issue with partial parsing in dbt [1].

To continue talking to Dosu, mention @dosu.

@dosu @tatiana If I understand correctly, during execution Cosmos will symlink the project files from the dbt project path to the temporary directory. However, as per the create_symlinks function it ignores logs, target, and profiles.yml. As per the above solution, copying the manifests to the project directory doesn't solve that problem. It's possible to work around this in some cases by specifying a target path like:

operator_args={"dbt_cmd_flags": ["--target-path", f"{DBT_PROJECT_PATH}/target"],},

However, this will not work on most managed solutions using an object storage back end as the directory is read-only. Of course it can be synced to Airflow worker local disk, but this brings us full circle to caching and whether or not concurrent context updates to the manifests will be a problem (#1042).

IMO the simplest solution would be to check for existing manifests, either in the dbt project target/ directory or, when using LoadMode.DBT_MANIFEST or AUTOMATIC, the manifest_path root provided in the ProjectConfig. These could then be copied instead to the temporary directory using a function similar to _copy_partial_parse_to_xxx.

You're correct that Cosmos symlinks project files to a temporary directory, ignoring logs, target, and profiles.yml. This can indeed cause issues with partial parsing when AIRFLOW__COSMOS__ENABLE_CACHE is set to False.

To address this, you can modify the process to check for existing manifests in the target/ directory or the manifest_path root provided in the ProjectConfig. If found, these manifests can be copied to the temporary directory. Here’s how you can implement this:

  1. Check for Existing Manifests: Before symlinking, check if the manifest files exist in the target/ directory or the manifest_path.

  2. Copy Manifests to Temporary Directory: If manifests are found, copy them to the temporary directory.

Here’s a code snippet to illustrate this approach:

from pathlib import Path
import shutil

def copy_manifests_to_temp(project_path: Path, tmp_dir: Path) -> None:
    """Copy existing manifest files to the temporary directory."""
    target_dir = project_path / "target"
    manifest_files = ["manifest.json", "partial_parse.msgpack"]

    for manifest_file in manifest_files:
        source_path = target_dir / manifest_file
        if source_path.exists():
            shutil.copy(source_path, tmp_dir / manifest_file)

def create_symlinks_with_manifests(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
    """Create symlinks to the dbt project files and copy manifests to the temp directory."""
    ignore_paths = ["logs", "target", "profiles.yml"]
    if ignore_dbt_packages:
        ignore_paths.append("dbt_packages")

    for child_name in os.listdir(project_path):
        if child_name not in ignore_paths:
            os.symlink(project_path / child_name, tmp_dir / child_name)

    # Copy manifests to the temporary directory
    copy_manifests_to_temp(project_path, tmp_dir)

# Example usage
dbt_project_path = Path("/path/to/dbt/project")
tmp_dir = Path("/path/to/temp/directory")
create_symlinks_with_manifests(dbt_project_path, tmp_dir, ignore_dbt_packages=False)

This approach ensures that the necessary manifest files are available in the temporary directory, enabling partial parsing even when caching is disabled. This should help mitigate issues with concurrent context updates to the manifests in environments with multiple Airflow workers or managed solutions using object storage backends [1][2].

To continue talking to Dosu, mention @dosu.