snjypl/airflow-provider-grafana-loki

Not pushing logs to Loki

Closed this issue · 12 comments

Hi, I have attempted to use this provider on the 2.6.2 version of Airflow and it seems to me like it doesn't push the logs to Loki.

Once I open the logs view for a task, it loads local logs but I can see in the docker logs that it is trying to load it from Loki.

airflow-airflow-webserver-1  | [2023-07-08T06:39:49.086+0000] {loki_task_handler.py:134} INFO - loki log query params {'query': ' {dag_id="crm-elastic-dag",task_id="hello"}\n
      | json try_number="try_number",map_index="map_index",run_id="run_id"\n                    | try_number="1" and\n                      map_index="-1" and\n                      run_id="manual__2023-07-08T06:39:32.209086+00:00"\n                    | __error__!="JSONParserErr"\n                ', 'start': '2023-06-23T06:39:34.475096+00:00', 'end': '2023-07-08T07:39:34.724944+00:00', 'limit': 5000, 'direction': 'forward'}

Using the provider manually in a task works fine using it this way:

 from grafana_loki_provider.hooks.loki import LokiHook
 loki_hook = LokiHook(loki_conn_id=loki_conn_id)

Logs from task execution:

airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.041+0000] {scheduler_job_runner.py:412} INFO - 1 tasks up for execution:
airflow-airflow-scheduler-1  |  <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [scheduled]>
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.041+0000] {scheduler_job_runner.py:480} INFO - DAG crm-elastic-dag has 0/16 running and queued tasks
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.042+0000] {scheduler_job_runner.py:587} INFO - Setting the following tasks to queued state:
airflow-airflow-scheduler-1  |  <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [scheduled]>
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.046+0000] {scheduler_job_runner.py:625} INFO - Sending TaskInstanceKey(dag_id='crm-elastic-dag', task_id='hello', run_id='manual__2023-07-08T06:46:49.915800+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.047+0000] {base_executor.py:147} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'manual__2023-07-08T06:46:49.915800+00:00', '--local', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py']
airflow-airflow-worker-1     | [2023-07-08 06:46:51,056: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[8a3bb8d5-9a63-4495-b234-ef7762a1a788] received
airflow-airflow-worker-1     | [2023-07-08 06:46:51,066: INFO/ForkPoolWorker-15] [8a3bb8d5-9a63-4495-b234-ef7762a1a788] Executing command in Celery: ['airflow', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'manual__2023-07-08T06:46:49.915800+00:00', '--local', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py']
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.107+0000] {scheduler_job_runner.py:677} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='crm-elastic-dag', task_id='hello', run_id='manual__2023-07-08T06:46:49.915800+00:00', try_number=1, map_index=-1)
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.131+0000] {scheduler_job_runner.py:703} INFO - Setting external_id for <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [queued]> to 8a3bb8d5-9a63-4495-b234-ef7762a1a788
airflow-airflow-worker-1     | [2023-07-08 06:46:51,151: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/crm-elastig-dag.py
airflow-airflow-worker-1     | [2023-07-08 06:46:51,191: INFO/ForkPoolWorker-15] This is a log message
airflow-airflow-worker-1     | [2023-07-08 06:46:52,330: INFO/ForkPoolWorker-15] Running <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [queued]> on host d5752f79742d
airflow-airflow-webserver-1  | 127.0.0.1 - - [08/Jul/2023:06:46:53 +0000] "GET /health HTTP/1.1" 200 243 "-" "curl/7.74.0"
airflow-airflow-worker-1     | [2023-07-08 06:46:53,394: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[8a3bb8d5-9a63-4495-b234-ef7762a1a788] succeeded in 2.335277110338211s: None
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.344+0000] {dagrun.py:616} INFO - Marking run <DagRun crm-elastic-dag @ 2023-07-08 06:46:49.915800+00:00: manual__2023-07-08T06:46:49.915800+00:00, state:running, queued_at: 2023-07-08 06:46:49.935860+00:00. externally triggered: True> successful
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.344+0000] {dagrun.py:682} INFO - DagRun Finished: dag_id=crm-elastic-dag, execution_date=2023-07-08 06:46:49.915800+00:00, run_id=manual__2023-07-08T06:46:49.915800+00:00, run_start_date=2023-07-08 06:46:50.980423+00:00, run_end_date=2023-07-08 06:46:54.344670+00:00, run_duration=3.364247, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-07-07 00:00:00+00:00, data_interval_end=2023-07-08 00:00:00+00:00, dag_hash=c848848d668b428fd5345193e82ebc08
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.354+0000] {dag.py:3490} INFO - Setting next_dagrun for crm-elastic-dag to 2023-07-08T00:00:00+00:00, run_after=2023-07-09T00:00:00+00:00
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.391+0000] {scheduler_job_runner.py:677} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='crm-elastic-dag', task_id='hello', run_id='manual__2023-07-08T06:46:49.915800+00:00', try_number=1, map_index=-1)
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.398+0000] {scheduler_job_runner.py:733} INFO - TaskInstance Finished: dag_id=crm-elastic-dag, task_id=hello, run_id=manual__2023-07-08T06:46:49.915800+00:00, map_index=-1, run_start_date=2023-07-08 06:46:52.740642+00:00, run_end_date=2023-07-08 06:46:53.212918+00:00, run_duration=0.472276, state=success, executor_state=success, try_number=1, max_tries=0, job_id=67, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2023-07-08 06:46:51.043588+00:00, queued_by_job_id=63, pid=8798
airflow-airflow-webserver-1  | 172.29.0.23 - - [08/Jul/2023:06:46:55 +0000] "GET /get_logs_with_metadata?dag_id=crm-elastic-dag&task_id=hello&map_index=-1&execution_date=2023-07-08T06%3A39%3A32.209086%2B00%3A00&try_number=1&metadata=null HTTP/1.1" 200 2451 "https://airflow.local.lab.com/log?dag_id=crm-elastic-dag&task_id=hello&execution_date=2023-07-08T06%3A39%3A32.209086%2B00%3A00&map_index=-1" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36

Has anyone tried it recently for automatic pushing to Loki? Or maybe an example of a DAG task that works with it?

I'm having the same issue.

@markoftw @PApostol

can you please share some more details:

  1. airflow version
  2. airflow grafana provider version
  3. the loki log handler configuration
  4. are you getting the error for all the task?
  5. there is a known issue with fetching logs for older task retries. see if you can fetch log for latest task try.
  6. are the task logs visible in grafana?
  7. describe your airlfow setup. are you using airlfow standalone mode?
  8. can you try reproducing the issue with an example dag?

can you please the some more logs around the loki query?

  1. v2.6.2
  2. 0.0.2
  3. Host 'loki' is a resolvable container name in docker (same docker network) image
  4. Yes
  5. Have tried, the same thing for all
  6. I also don't see a Loki label within the Grafana explorer
  7. Using the docker-compose setup https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
  8. Example dag I have tried

crm-elastig-dag.py

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from includes.test import hello


#from grafana_loki_provider.hooks.loki import LokiHook
#loki_hook = LokiHook(loki_conn_id="loki")
#import logging
#logging.info("HELLO")

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")


# This seems to work, but it's a manual configuration
"""
params = {
    "query": "{foo=\"bar2\"}",
    "limit":5000,
    "direction": "forward",
}

resp = loki_hook.query_range(params)

print(resp)
"""


args = {
    'owner': 'Marko',
    'start_date': days_ago(1)
}

dag = DAG(
    dag_id='crm-elastic-dag',
    default_args=args,
    schedule_interval='@daily'
)

with dag:
    hello_world = PythonOperator(
        task_id='hello',
        python_callable=hello,
        # provide_context=True
    )

test.py

def hello():
    print('Hello!')

Logs from execution

image

Logs from graph:

*** Found local files:
***   * /opt/airflow/logs/dag_id=crm-elastic-dag/run_id=scheduled__2023-07-23T00:00:00+00:00/task_id=hello/attempt=1.log
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [queued]>
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [queued]>
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): hello> on 2023-07-23 00:00:00+00:00
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:57} INFO - Started process 64 to run task
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'scheduled__2023-07-23T00:00:00+00:00', '--job-id', '78', '--raw', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py', '--cfg-path', '/tmp/tmpxhij1w1e']
[2023-07-24, 21:38:33 UTC] {standard_task_runner.py:85} INFO - Job 78: Subtask hello
[2023-07-24, 21:38:33 UTC] {task_command.py:410} INFO - Running <TaskInstance: crm-elastic-dag.hello scheduled__2023-07-23T00:00:00+00:00 [running]> on host 2862783298b4
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='Marko Smej' AIRFLOW_CTX_DAG_ID='crm-elastic-dag' AIRFLOW_CTX_TASK_ID='hello' AIRFLOW_CTX_EXECUTION_DATE='2023-07-23T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-23T00:00:00+00:00'
[2023-07-24, 21:38:33 UTC] {logging_mixin.py:149} INFO - Hello!
[2023-07-24, 21:38:33 UTC] {python.py:183} INFO - Done. Returned value was: None
[2023-07-24, 21:38:33 UTC] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=crm-elastic-dag, task_id=hello, execution_date=20230723T000000, start_date=20230724T213833, end_date=20230724T213833
[2023-07-24, 21:38:33 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-07-24, 21:38:33 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

Query when opening logs:
image

snjypl commented

@markoftw

i am not able to reproduce the issue using the docker-compose deployment you shared.

it will help, if you could run the below debugging code inside the webserver container.

import airflow
from grafana_loki_provider.hooks.loki import LokiHook
from grafana_loki_provider.log.loki_task_handler import LokiTaskHandler
from datetime import datetime as dt, timedelta


hook = LokiHook(loki_conn_id='loki')
c = hook.get_conn()


print("base_url: {}".format(hook.base_url))
endpoint = hook.v1_base_endpoint.format(method='push')
url = hook.url_from_endpoint(endpoint)
print("url: {}".format(url))

log_handler = LokiTaskHandler(base_log_folder = "", name='test')

log_handler.hook = hook

log_handler.labels = {"name":"test_loki_log"}
log_handler.extras = {}

r =log_handler.loki_write(['test_loki_line_1','test_loki_line_2'])

print("write response")
print(r)
print("wrote log to loki")
print('going to read log from loki')

params = {
	"query": '{name="test_loki_log"}',
	"limit":1000,
	"direction": "forward",
}
print("params: {}".format(params))

data = hook.query_range(params)
print(data)

you can do it by using the below command, please replace the AIRFLOW_WEBSERVER_CONTAINER_NAME with the actual container name:

docker exec AIRFLOW_WEBSERVER_CONTAINER_NAME   /bin/bash  -c "curl -k -s  https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py  & python /tmp/debug_loki.py"

Here are the logs from it:

curl -k -s  https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py  & python /tmp/debug_loki.py
[1] 16801
[2023-08-04T10:10:24.856+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
base_url: http://loki:3100
url: http://loki:3100/loki/api/v1/push
[2023-08-04T10:10:24.861+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
write response
None
wrote log to loki
going to read log from loki
params: {'query': '{name="test_loki_log"}', 'limit': 1000, 'direction': 'forward'}
[2023-08-04T10:10:24.873+0000] {base.py:73} INFO - Using connection ID 'loki' for task execution.
{'status': 'success', 'data': {'resultType': 'streams', 'result': [{'stream': {'name': 'test_loki_log'}, 'values': [['1691143824858186496', '{"line": "test_loki_line_1"}'], ['1691143824858215936', '{"line": "test_loki_line_2"}']]}], 'stats': {'summary': {'bytesProcessedPerSecond': 2028, 'linesProcessedPerSecond': 72, 'totalBytesProcessed': 56, 'totalLinesProcessed': 2, 'execTime': 0.027612, 'queueTime': 0.083636, 'subqueries': 0, 'totalEntriesReturned': 2, 'splits': 3, 'shards': 48}, 'querier': {'store': {'totalChunksRef': 0, 'totalChunksDownloaded': 0, 'chunksDownloadTime': 0, 'chunk': {'headChunkBytes': 0, 'headChunkLines': 0, 'decompressedBytes': 0, 'decompressedLines': 0, 'compressedBytes': 0, 'totalDuplicates': 0}}}, 'ingester': {'totalReached': 48, 'totalChunksMatched': 1, 'totalBatches': 1, 'totalLinesSent': 2, 'store': {'totalChunksRef': 0, 'totalChunksDownloaded': 0, 'chunksDownloadTime': 0, 'chunk': {'headChunkBytes': 56, 'headChunkLines': 2, 'decompressedBytes': 0, 'decompressedLines': 0, 'compressedBytes': 0, 'totalDuplicates': 0}}}, 'cache': {'chunk': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}, 'index': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}, 'result': {'entriesFound': 0, 'entriesRequested': 0, 'entriesStored': 0, 'bytesReceived': 0, 'bytesSent': 0, 'requests': 0, 'downloadTime': 0}}}}}
[1]+  Done                    curl -k -s https://gist.githubusercontent.com/snjypl/e73f13a090ce65acb7bf5d0d70ad3038/raw/ef9a2a3484bb602a600423e63cbd30e4e2b3c3c9/debug_loki_logging -o /tmp/debug_loki.py

This seems to have created the loki name successfully

image

snjypl commented

@markoftw i don't see any issue there. you were able to push to loki and also read from it.
maybe the issue in with the logger configuration.

can you share the airflow_local_settings.py or equivalent where you are configuring the LokiTaskHandler. it will be better if you share the whole module?

also the output of the following:
airflow config list --section logging

Please make sure it does not contain any confidential info before sharing it here.

Sure, here are they:

Click me (airflow_local_settings.py)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow logging settings."""
from __future__ import annotations

import os
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.exceptions import AirflowException

LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()


# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()

LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")

LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
  "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
)

COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")

COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")

COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")

DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")

BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")

PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")

DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
  "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
)

# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
# All of these handlers inherited from FileTaskHandler and providing any value rather than None
# would raise deprecation warning.
FILENAME_TEMPLATE: str | None = None

PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")

DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
  "version": 1,
  "disable_existing_loggers": False,
  "formatters": {
      "airflow": {
          "format": LOG_FORMAT,
          "class": LOG_FORMATTER_CLASS,
      },
      "airflow_coloured": {
          "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
          "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
      },
      "source_processor": {
          "format": DAG_PROCESSOR_LOG_FORMAT,
          "class": LOG_FORMATTER_CLASS,
      },
  },
  "filters": {
      "mask_secrets": {
          "()": "airflow.utils.log.secrets_masker.SecretsMasker",
      },
  },
  "handlers": {
      "console": {
          "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
          "formatter": "airflow_coloured",
          "stream": "sys.stdout",
          "filters": ["mask_secrets"],
      },
      "task": {
          "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
          "formatter": "airflow",
          "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
          "filters": ["mask_secrets"],
      },
      "processor": {
          "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
          "formatter": "airflow",
          "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
          "filename_template": PROCESSOR_FILENAME_TEMPLATE,
          "filters": ["mask_secrets"],
      },
      "processor_to_stdout": {
          "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
          "formatter": "source_processor",
          "stream": "sys.stdout",
          "filters": ["mask_secrets"],
      },
  },
  "loggers": {
      "airflow.processor": {
          "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
          "level": LOG_LEVEL,
          # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
          "propagate": True,
      },
      "airflow.task": {
          "handlers": ["task"],
          "level": LOG_LEVEL,
          # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
          "propagate": True,
          "filters": ["mask_secrets"],
      },
      "flask_appbuilder": {
          "handlers": ["console"],
          "level": FAB_LOG_LEVEL,
          "propagate": True,
      },
  },
  "root": {
      "handlers": ["console"],
      "level": LOG_LEVEL,
      "filters": ["mask_secrets"],
  },
}

EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
if EXTRA_LOGGER_NAMES:
  new_loggers = {
      logger_name.strip(): {
          "handlers": ["console"],
          "level": LOG_LEVEL,
          "propagate": True,
      }
      for logger_name in EXTRA_LOGGER_NAMES.split(",")
  }
  DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)

DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
  "handlers": {
      "processor_manager": {
          "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
          "formatter": "airflow",
          "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
          "mode": "a",
          "maxBytes": 104857600,  # 100MB
          "backupCount": 5,
      }
  },
  "loggers": {
      "airflow.processor_manager": {
          "handlers": ["processor_manager"],
          "level": LOG_LEVEL,
          "propagate": False,
      }
  },
}

# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
  DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
  DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])

  # Manually create log directory for processor_manager handler as RotatingFileHandler
  # will only create file but not the directory.
  processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
      "processor_manager"
  ]
  directory: str = os.path.dirname(processor_manager_handler_config["filename"])
  Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)

##################
# Remote logging #
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")

if REMOTE_LOGGING:

  ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")

  # Storage bucket URL for remote logging
  # S3 buckets should start with "s3://"
  # Cloudwatch log groups should start with "cloudwatch://"
  # GCS buckets should start with "gs://"
  # WASB buckets should start with "wasb"
  # HDFS path should start with "hdfs://"
  # just to help Airflow select correct handler
  REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
  REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})

  if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
      S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
      url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
      CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "log_group_arn": url_parts.netloc + url_parts.path,
              "filename_template": FILENAME_TEMPLATE,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
      key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
      GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
              "gcp_key_path": key_path,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
      WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
          "task": {
              "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
              "wasb_container": "airflow-logs",
              "filename_template": FILENAME_TEMPLATE,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
      key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
      # stackdriver:///airflow-tasks => airflow-tasks
      log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
      STACKDRIVER_REMOTE_HANDLERS = {
          "task": {
              "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
              "formatter": "airflow",
              "name": log_name,
              "gcp_key_path": key_path,
          }
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
      OSS_REMOTE_HANDLERS = {
          "task": {
              "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
              "formatter": "airflow",
              "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
              "oss_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
          },
      }
      DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
      HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
          "task": {
              "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
              "filename_template": FILENAME_TEMPLATE,
          },
      }
      DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
  elif ELASTICSEARCH_HOST:
      ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
      ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
      ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
      ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
      ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
      ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
      ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")

      ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
          "task": {
              "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
              "formatter": "airflow",
              "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
              "filename_template": FILENAME_TEMPLATE,
              "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
              "host": ELASTICSEARCH_HOST,
              "frontend": ELASTICSEARCH_FRONTEND,
              "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
              "json_format": ELASTICSEARCH_JSON_FORMAT,
              "json_fields": ELASTICSEARCH_JSON_FIELDS,
              "host_field": ELASTICSEARCH_HOST_FIELD,
              "offset_field": ELASTICSEARCH_OFFSET_FIELD,
          },
      }

      DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
  elif REMOTE_BASE_LOG_FOLDER.startswith('loki'):
      LOKI_HANDLER: Dict[str, Dict[str, Union[str, bool]]] = {
          'task': {
              'class': 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler',
              'formatter': 'airflow',
              'name':"airflow_task",
              'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
              'filename_template': FILENAME_TEMPLATE
          },
      }

      DEFAULT_LOGGING_CONFIG['handlers'].update(LOKI_HANDLER)
  else:
      raise AirflowException(
          "Incorrect remote log configuration. Please check the configuration of option 'host' in "
          "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
          "'remote_base_log_folder' option in the 'logging' section."
      )
  DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)

airflow config list --section logging

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = loki
delete_local_logs = False
google_key_path =
remote_base_log_folder = loki
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = DEBUG
celery_logging_level =
fab_logging_level = WARNING
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG
colored_console_log = True
colored_log_format = [%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
simple_log_format = %(asctime)s %(levelname)s - %(message)s
dag_processor_log_target = file
dag_processor_log_format = [%(asctime)s] [SOURCE:DAG_PROCESSOR] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index }}/{% endif %}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
snjypl commented

@markoftw , i tried your dag and logger config. am still not able to reproduce the issue.

in your code, the 'Using connection ID 'loki' for task execution.` line is missing in the logs. looks like the loki connection is not being used. it is mostly a configuration issue.


[2023-08-05T12:18:08.135+0530] {task_command.py:410} INFO - Running <TaskInstance: crm-elastic-dag.hello manual__2023-08-05T06:48:06.821339+00:00 [queued]> on host PF3Y5KBZ.
[2023-08-05T12:18:08.797+0530] {base.py:73} INFO - Using connection ID 'loki' for task execution.
[2023-08-05T12:18:08.811+0530] {dagrun.py:630} INFO - Marking run <DagRun crm-elastic-dag @ 2023-08-05 06:48:06.821339+00:00: manual__2023-08-05T06:48:06.821339+00:00, state:running, queued_at: 2023-08-05 06:48:06.849665+00:00. externally triggered: True> successful

i think, it would be good if we could connect on slack and troubleshoot the issue.

otherwise, make sure the webserver/worker and scheduler are all having the same configuration and you are not overriding the log handlers in any of your dag code.

from inside the worker container you can try executing the below code:

import airflow
import logging
logger = logging.root.manager.loggerDict['airflow.task']
print(logger)
loki_handler = logger.handlers[0]
print(loki_handler)
print(loki_handler.hook)

@snjypl I attempted to utilize the "airflow-provider-grafana-loki" provider to send logs to Loki. However, the logs do not appear in the Airflow UI. Is it necessary for the Loki connection to have read access, implying that the Airflow UI retrieves logs directly from Loki?

snjypl commented

@ragavanks airflow webserver reads the logs from Loki. it will be better if you could open a new issue with details. eg: log configuration, airflow task log, airflow webserver log etc.

you can also try the debugging steps mentioned in this issue and see if it helps.

The issue was solved.

It was a misconfiguration in the docker-compose.yml for the worker container.
The worker and webserver containers (possibly others if required, depending on your needs) need to have the correct airflow.cfg bound to their volumes.
e.g.

    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
      - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
      - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
      - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
      - ${AIRFLOW_PROJ_DIR:-.}/airflow.cfg:/opt/airflow/airflow.cfg

Verify this in each container with airflow config list --section logging to have the following settings:

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = loki
logging_config_class = log_config.DEFAULT_LOGGING_CONFIG
...

Thanks to @snjypl!