kubeflow/pipelines

Pod Labels Not Applied Correctly Across Multiple Executors in Kubeflow Pipeline (KF1.9 , KFP SDK 2.8.0, Argo backend)

Closed this issue · 1 comments

Environment

Kubeflow version: 1.9
KFP SDK version: 2.8.0
kubernetes-kfp version: 1.2.0

Backend: Argo

Steps to reproduce

  1. run the pipeline. Both executors will have same pod labels (as define only for "addition")
  2. now just rename addition to waddition (we want it to alphabetically be after divide) and run the pipeline
  3. in this case , both executors will have labels as defined for divide.

Expected result

Each executor should have its own pod labels. Currently , first (alphabetically) executor and its labels will be labels for ALL other executors.

Here's the code to re-create this issue. There is a "get_pod_label" in both of components to render pod label values.

from typing import List

from kfp import dsl, kubernetes
from kfp.dsl import Output, Dataset


@dsl.component(packages_to_install=["kubernetes", "pandas"])
def addition(a: float, b: float, labels_out: Output[Dataset]) -> float:
    import pandas as pd

    def get_pod_label() -> {}:
        import os

        from kubernetes import client, config

        # Configure API
        config.load_incluster_config()
        v1 = client.CoreV1Api()

        # Get current pod name and namespace
        pod_name = os.environ.get("HOSTNAME")
        namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()

        if not pod_name:
            raise Exception("Cannot determine pod name")

        # Get pod information
        pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)

        # Extract labels
        labels = pod.metadata.labels

        print(type(labels))

        print(f"Pod Labels: {labels}")

        return labels

    print("kill-cache2")
    pd.DataFrame([get_pod_label()]).to_csv(labels_out.path, index=False)

    return a + b

@dsl.component(packages_to_install=["kubernetes", "pandas"])
def divide(a: float, b: float, labels_out: Output[Dataset]) -> float:
    import pandas as pd

    def get_pod_label() -> dict:
        import os
        import pandas as pd

        from kubernetes import client, config

        # Configure API
        config.load_incluster_config()
        v1 = client.CoreV1Api()

        # Get current pod name and namespace
        pod_name = os.environ.get("HOSTNAME")
        namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()

        if not pod_name:
            raise Exception("Cannot determine pod name")

        # Get pod information
        pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)

        # Extract labels
        labels = pod.metadata.labels

        print(f"Pod Labels: {labels}")

        return labels
    print("kill-cache2")
    pd.DataFrame([get_pod_label()]).to_csv(labels_out.path, index=False)

    return a / b

@dsl.pipeline
def hello_pipeline(a: float = 1, b: float = 2, c: float = 3) -> None:
    total = addition(a=a, b=b)
    kubernetes.add_pod_label(total, "fruit", "apple")
    kubernetes.add_pod_label(total, "vegetable", "carrot")


    fraction = divide(a=total.outputs["Output"], b=c)
    kubernetes.add_pod_label(fraction, "human", "milos")
    kubernetes.add_pod_label(fraction, "animal", "bear")

from kfp import compiler

compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

Impacted by this bug? Give it a 👍.

closing cause here's the same problem described with some comments : #11077