[Bug] Unable to do partial parsing after disabling Cosmos caching.
josefbits opened this issue · 3 comments
Astronomer Cosmos Version
Other Astronomer Cosmos version (please specify below)
If "Other Astronomer Cosmos version" selected, which one?
1.4.3
dbt-core version
1.7.15
Versions of dbt adapters
dbt-postgres 1.7.15
LoadMode
DBT_LS_MANIFEST
ExecutionMode
VIRTUALENV
InvocationMode
DBT_RUNNER
airflow version
2.8.2
Operating System
Ubuntu 22.04.4 LTS
If a you think it's an UI issue, what browsers are you seeing the problem on?
No response
Deployment
Docker-Compose
Deployment details
- Docker version 25.0.3, build 4debf41
- Docker Compose version v2.24.6
What happened?
After setting AIRFLOW__COSMOS__ENABLE_CACHE=False
dbt tasks are unable to run partial parsing due to the error "Unable to do partial parsing because saved manifest not found. Starting full parse."
The dbt artifacts are pre-generated in CI and the entire target/
directory synced to the dbt project path on the Airflow host.
Relevant log output
**** With AIRFLOW__COSMOS__ENABLE_CACHE=False ****
08a120f7b403
*** Found local files:
*** * /opt/airflow/logs/dag_id=cosmos_dag/run_id=manual__2024-06-11T13:37:41.235278+00:00/task_id=run_models.raw_payments_seed/attempt=1.log
[2024-06-11, 13:37:47 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [queued]>
[2024-06-11, 13:37:47 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [queued]>
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2214} INFO - Executing <Task(DbtSeedVirtualenvOperator): run_models.raw_payments_seed> on 2024-06-11 13:37:41.235278+00:00
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:60} INFO - Started process 1265 to run task
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'cosmos_dag', 'run_models.raw_payments_seed', 'manual__2024-06-11T13:37:41.235278+00:00', '--job-id', '862', '--raw', '--subdir', 'DAGS_FOLDER/cosmos_dag.py', '--cfg-path', '/tmp/tmppaayxmna']
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:88} INFO - Job 862: Subtask run_models.raw_payments_seed
[2024-06-11, 13:37:47 UTC] {task_command.py:423} INFO - Running <TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [running]> on host 08a120f7b403
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='cosmos_dag' AIRFLOW_CTX_TASK_ID='run_models.raw_payments_seed' AIRFLOW_CTX_EXECUTION_DATE='2024-06-11T13:37:41.235278+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-11T13:37:41.235278+00:00'
[2024-06-11, 13:37:47 UTC] {local.py:292} INFO - Cloning project to writable temp directory /tmp/tmpn1tuhvtw from /opt/airflow/dags/dbt/jaffle_shop
[2024-06-11, 13:37:47 UTC] {config.py:266} INFO - Using user-supplied profiles.yml at /opt/***/dags/dbt/jaffle_shop/profiles.yml
[2024-06-11, 13:37:47 UTC] {subprocess.py:60} INFO - Tmp dir root location:
/tmp
[2024-06-11, 13:37:47 UTC] {subprocess.py:73} INFO - Running command: ['/opt/***/dbt_venv/bin/dbt', 'seed', '--models', 'raw_payments', '--project-dir', '/tmp/tmpn1tuhvtw', '--profiles-dir', '/opt/***/dags/dbt/jaffle_shop', '--profile', 'jaffle_shop', '--target', 'dev']
[2024-06-11, 13:37:47 UTC] {subprocess.py:84} INFO - Command output:
[2024-06-11, 13:37:57 UTC] {subprocess.py:94} INFO - �[0m13:37:57 Running with dbt=1.7.15
[2024-06-11, 13:37:58 UTC] {subprocess.py:94} INFO - �[0m13:37:58 Registered adapter: postgres=1.7.15
[2024-06-11, 13:37:58 UTC] {subprocess.py:94} INFO - �[0m13:37:58 Unable to do partial parsing because saved manifest not found. Starting full parse.
[2024-06-11, 13:38:05 UTC] {subprocess.py:94} INFO - �[0m13:38:05 Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 401 macros, 0 groups, 0 semantic models
**** With AIRFLOW__COSMOS__ENABLE_CACHE=True ****
1cf95644acec
*** Found local files:
*** * /opt/airflow/logs/dag_id=cosmos_dag/run_id=manual__2024-06-11T13:55:34.501323+00:00/task_id=run_models.raw_payments_seed/attempt=1.log
[2024-06-11, 13:55:43 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [queued]>
[2024-06-11, 13:55:43 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [queued]>
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2214} INFO - Executing <Task(DbtSeedVirtualenvOperator): run_models.raw_payments_seed> on 2024-06-11 13:55:34.501323+00:00
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:60} INFO - Started process 75 to run task
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'cosmos_dag', 'run_models.raw_payments_seed', 'manual__2024-06-11T13:55:34.501323+00:00', '--job-id', '874', '--raw', '--subdir', 'DAGS_FOLDER/cosmos_dag.py', '--cfg-path', '/tmp/tmpb4__880_']
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:88} INFO - Job 874: Subtask run_models.raw_payments_seed
[2024-06-11, 13:55:43 UTC] {task_command.py:423} INFO - Running <TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [running]> on host 1cf95644acec
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='cosmos_dag' AIRFLOW_CTX_TASK_ID='run_models.raw_payments_seed' AIRFLOW_CTX_EXECUTION_DATE='2024-06-11T13:55:34.501323+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-11T13:55:34.501323+00:00'
[2024-06-11, 13:55:43 UTC] {local.py:292} INFO - Cloning project to writable temp directory /tmp/tmpeszetm6h from /opt/airflow/dags/dbt/jaffle_shop
[2024-06-11, 13:55:43 UTC] {local.py:303} INFO - Partial parse is enabled and the latest partial parse file is /opt/airflow/dags/dbt/jaffle_shop/target/partial_parse.msgpack
[2024-06-11, 13:55:44 UTC] {config.py:266} INFO - Using user-supplied profiles.yml at /opt/***/dags/dbt/jaffle_shop/profiles.yml
[2024-06-11, 13:55:44 UTC] {subprocess.py:60} INFO - Tmp dir root location:
/tmp
[2024-06-11, 13:55:44 UTC] {subprocess.py:73} INFO - Running command: ['/opt/***/dbt_venv/bin/dbt', 'seed', '--models', 'raw_payments', '--project-dir', '/tmp/tmpeszetm6h', '--profiles-dir', '/opt/***/dags/dbt/jaffle_shop', '--profile', 'jaffle_shop', '--target', 'dev']
[2024-06-11, 13:55:44 UTC] {subprocess.py:84} INFO - Command output:[2024-06-11, 13:55:54 UTC] {subprocess.py:94} INFO - �[0m13:55:54 Running with dbt=1.7.15
[2024-06-11, 13:55:55 UTC] {subprocess.py:94} INFO - �[0m13:55:55 Registered adapter: postgres=1.7.15
[2024-06-11, 13:55:55 UTC] {subprocess.py:94} INFO - �[0m13:55:55 Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 401 macros, 0 groups, 0 semantic models
How to reproduce
Set AIRFLOW__COSMOS__ENABLE_CACHE=False
environment variable in the Airflow configuration.
Anything else :)?
I have been able to reproduce this in our MWAA deployment as well. It's also running astronomer-cosmos v1.4.3 but on a different Airflow version, dbt-core version, and the dbt-teradata adapter so it doesn't seem to be environment specific.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Contact Details
Hello @josefbits! I'm here to help you with any bugs, questions, or contributions you might have. Let me know how I can assist you!
To resolve the issue with partial parsing in dbt after disabling Cosmos caching in Airflow, you need to ensure that the partial parse file and the manifest file are correctly copied and updated in the target dbt project directory. Here is a step-by-step approach:
-
Identify the latest partial parse file:
Use the_get_latest_partial_parse
function to determine the most up-to-date partial parse file. -
Update the cache with the latest partial parse file:
Use the_update_partial_parse_cache
function to update the cache directory with the latest partial parse file contents. -
Copy the partial parse file to the project directory:
Use the_copy_partial_parse_to_project
function to copy the partial parse file and the manifest file to the target dbt project directory.
Here is the relevant code to perform these steps:
from pathlib import Path
from cosmos.dbt.project import get_partial_parse_path
from cosmos.cache import _get_latest_partial_parse, _update_partial_parse_cache, _copy_partial_parse_to_project
# Define the paths
dbt_project_path = Path("/path/to/your/dbt/project")
cache_dir = Path("/path/to/your/cache/directory")
# Step 1: Identify the latest partial parse file
latest_partial_parse_filepath = _get_latest_partial_parse(dbt_project_path, cache_dir)
if latest_partial_parse_filepath:
# Step 2: Update the cache with the latest partial parse file
_update_partial_parse_cache(latest_partial_parse_filepath, cache_dir)
# Step 3: Copy the partial parse file to the project directory
_copy_partial_parse_to_project(latest_partial_parse_filepath, dbt_project_path)
else:
print("No partial parse file found.")
Ensure that the paths to your dbt project and cache directory are correctly specified. This code will help maintain the partial parse file and manifest file in sync, thus resolving the issue with partial parsing in dbt [1].
Hello @josefbits! I'm here to help you with any bugs, questions, or contributions you might have. Let me know how I can assist you!
To resolve the issue with partial parsing in dbt after disabling Cosmos caching in Airflow, you need to ensure that the partial parse file and the manifest file are correctly copied and updated in the target dbt project directory. Here is a step-by-step approach:
- Identify the latest partial parse file:
Use the_get_latest_partial_parse
function to determine the most up-to-date partial parse file.- Update the cache with the latest partial parse file:
Use the_update_partial_parse_cache
function to update the cache directory with the latest partial parse file contents.- Copy the partial parse file to the project directory:
Use the_copy_partial_parse_to_project
function to copy the partial parse file and the manifest file to the target dbt project directory.Here is the relevant code to perform these steps:
from pathlib import Path from cosmos.dbt.project import get_partial_parse_path from cosmos.cache import _get_latest_partial_parse, _update_partial_parse_cache, _copy_partial_parse_to_project # Define the paths dbt_project_path = Path("/path/to/your/dbt/project") cache_dir = Path("/path/to/your/cache/directory") # Step 1: Identify the latest partial parse file latest_partial_parse_filepath = _get_latest_partial_parse(dbt_project_path, cache_dir) if latest_partial_parse_filepath: # Step 2: Update the cache with the latest partial parse file _update_partial_parse_cache(latest_partial_parse_filepath, cache_dir) # Step 3: Copy the partial parse file to the project directory _copy_partial_parse_to_project(latest_partial_parse_filepath, dbt_project_path) else: print("No partial parse file found.")Ensure that the paths to your dbt project and cache directory are correctly specified. This code will help maintain the partial parse file and manifest file in sync, thus resolving the issue with partial parsing in dbt [1].
@dosu @tatiana If I understand correctly, during execution Cosmos will symlink the project files from the dbt project path to the temporary directory. However, as per the create_symlinks function it ignores logs
, target
, and profiles.yml
. As per the above solution, copying the manifests to the project directory doesn't solve that problem. It's possible to work around this in some cases by specifying a target path like:
operator_args={"dbt_cmd_flags": ["--target-path", f"{DBT_PROJECT_PATH}/target"],},
However, this will not work on most managed solutions using an object storage back end as the directory is read-only. Of course it can be synced to Airflow worker local disk, but this brings us full circle to caching and whether or not concurrent context updates to the manifests will be a problem (#1042).
IMO the simplest solution would be to check for existing manifests, either in the dbt project target/ directory or, when using LoadMode.DBT_MANIFEST or AUTOMATIC, the manifest_path root provided in the ProjectConfig. These could then be copied instead to the temporary directory using a function similar to _copy_partial_parse_to_xxx
.
You're correct that Cosmos symlinks project files to a temporary directory, ignoring logs
, target
, and profiles.yml
. This can indeed cause issues with partial parsing when AIRFLOW__COSMOS__ENABLE_CACHE
is set to False
.
To address this, you can modify the process to check for existing manifests in the target/
directory or the manifest_path
root provided in the ProjectConfig
. If found, these manifests can be copied to the temporary directory. Here’s how you can implement this:
-
Check for Existing Manifests: Before symlinking, check if the manifest files exist in the
target/
directory or themanifest_path
. -
Copy Manifests to Temporary Directory: If manifests are found, copy them to the temporary directory.
Here’s a code snippet to illustrate this approach:
from pathlib import Path
import shutil
def copy_manifests_to_temp(project_path: Path, tmp_dir: Path) -> None:
"""Copy existing manifest files to the temporary directory."""
target_dir = project_path / "target"
manifest_files = ["manifest.json", "partial_parse.msgpack"]
for manifest_file in manifest_files:
source_path = target_dir / manifest_file
if source_path.exists():
shutil.copy(source_path, tmp_dir / manifest_file)
def create_symlinks_with_manifests(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
"""Create symlinks to the dbt project files and copy manifests to the temp directory."""
ignore_paths = ["logs", "target", "profiles.yml"]
if ignore_dbt_packages:
ignore_paths.append("dbt_packages")
for child_name in os.listdir(project_path):
if child_name not in ignore_paths:
os.symlink(project_path / child_name, tmp_dir / child_name)
# Copy manifests to the temporary directory
copy_manifests_to_temp(project_path, tmp_dir)
# Example usage
dbt_project_path = Path("/path/to/dbt/project")
tmp_dir = Path("/path/to/temp/directory")
create_symlinks_with_manifests(dbt_project_path, tmp_dir, ignore_dbt_packages=False)
This approach ensures that the necessary manifest files are available in the temporary directory, enabling partial parsing even when caching is disabled. This should help mitigate issues with concurrent context updates to the manifests in environments with multiple Airflow workers or managed solutions using object storage backends [1][2].