Pod Labels Not Applied Correctly Across Multiple Executors in Kubeflow Pipeline (KF1.9 , KFP SDK 2.8.0, Argo backend)
Closed this issue · 1 comments
milosjava commented
Environment
Kubeflow version: 1.9
KFP SDK version: 2.8.0
kubernetes-kfp version: 1.2.0
Backend: Argo
Steps to reproduce
- run the pipeline. Both executors will have same pod labels (as define only for "addition")
- now just rename addition to waddition (we want it to alphabetically be after divide) and run the pipeline
- in this case , both executors will have labels as defined for divide.
Expected result
Each executor should have its own pod labels. Currently , first (alphabetically) executor and its labels will be labels for ALL other executors.
Here's the code to re-create this issue. There is a "get_pod_label" in both of components to render pod label values.
from typing import List
from kfp import dsl, kubernetes
from kfp.dsl import Output, Dataset
@dsl.component(packages_to_install=["kubernetes", "pandas"])
def addition(a: float, b: float, labels_out: Output[Dataset]) -> float:
import pandas as pd
def get_pod_label() -> {}:
import os
from kubernetes import client, config
# Configure API
config.load_incluster_config()
v1 = client.CoreV1Api()
# Get current pod name and namespace
pod_name = os.environ.get("HOSTNAME")
namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
if not pod_name:
raise Exception("Cannot determine pod name")
# Get pod information
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
# Extract labels
labels = pod.metadata.labels
print(type(labels))
print(f"Pod Labels: {labels}")
return labels
print("kill-cache2")
pd.DataFrame([get_pod_label()]).to_csv(labels_out.path, index=False)
return a + b
@dsl.component(packages_to_install=["kubernetes", "pandas"])
def divide(a: float, b: float, labels_out: Output[Dataset]) -> float:
import pandas as pd
def get_pod_label() -> dict:
import os
import pandas as pd
from kubernetes import client, config
# Configure API
config.load_incluster_config()
v1 = client.CoreV1Api()
# Get current pod name and namespace
pod_name = os.environ.get("HOSTNAME")
namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
if not pod_name:
raise Exception("Cannot determine pod name")
# Get pod information
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
# Extract labels
labels = pod.metadata.labels
print(f"Pod Labels: {labels}")
return labels
print("kill-cache2")
pd.DataFrame([get_pod_label()]).to_csv(labels_out.path, index=False)
return a / b
@dsl.pipeline
def hello_pipeline(a: float = 1, b: float = 2, c: float = 3) -> None:
total = addition(a=a, b=b)
kubernetes.add_pod_label(total, "fruit", "apple")
kubernetes.add_pod_label(total, "vegetable", "carrot")
fraction = divide(a=total.outputs["Output"], b=c)
kubernetes.add_pod_label(fraction, "human", "milos")
kubernetes.add_pod_label(fraction, "animal", "bear")
from kfp import compiler
compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')
Impacted by this bug? Give it a 👍.