[FEATURE]Trigger ray jobs from training containers
Closed this issue · 4 comments
srinivasreddych commented
Describe the solution you'd like
Please provide a sample for triggering ray jobs from training containers (app specific). for getting started, it could be within the same namespace, but ideally target across namespaces
kukushking commented
Example:
apiVersion: batch/v1
kind: Job
metadata:
name: ray-hello-1
namespace: ray
spec:
template:
spec:
containers:
- name: ray-hello-1
image: python:3.9.19
command: ['sh', '-c', 'pip install ray"[client]"==2.30.0 && python -c "import ray; import os; RAY_ADDRESS = os.environ[\"RAY_ADDRESS\"]; ray.init(address=RAY_ADDRESS); print(ray.cluster_resources())"']
env:
- name: RAY_ADDRESS
value: "ray://kuberay-head-svc:10001"
restartPolicy: Never
kukushking commented
kukushking commented
E2E example job submission from a container:
apiVersion: batch/v1
kind: Job
metadata:
name: pytorch-training-e2e-1
namespace: ray
spec:
template:
spec:
serviceAccountName: ray-operator-ray-operator
containers:
- name: pytorch-training-e2e
image: python:3.9.19
command: ['sh', '-c', 'pip install ray"[default,client]"==2.30.0 && cd /home/ray/sample/ && ray job submit --address ${RAY_ADDRESS} --working-dir="." -- python pytorch_training_e2e.py']
env:
- name: RAY_ADDRESS
value: "http://kuberay-head-svc:8265"
volumeMounts:
- name: code-sample
mountPath: /home/ray/sample
restartPolicy: Never
volumes:
- name: code-sample
configMap:
name: pytorch-training-e2e
items:
- key: pytorch_training_e2e.py
path: pytorch_training_e2e.py
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pytorch-training-e2e
namespace: ray
data:
pytorch_training_e2e.py: |
import click
import time
import json
import os
import tempfile
from typing import Dict
import numpy as np
from torchvision import transforms
from torchvision.models import resnet18
import torch
import torch.nn as nn
import torch.optim as optim
import ray
from ray import train
from ray.train import Checkpoint, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
def add_fake_labels(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch_size = len(batch["image"])
batch["label"] = np.zeros([batch_size], dtype=int)
return batch
def transform_image(
batch: Dict[str, np.ndarray], transform: torch.nn.Module
) -> Dict[str, np.ndarray]:
transformed_tensors = [transform(image).numpy() for image in batch["image"]]
batch["image"] = transformed_tensors
return batch
def train_loop_per_worker(config):
raw_model = resnet18(pretrained=True)
model = train.torch.prepare_model(raw_model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
train_dataset_shard = train.get_dataset_shard("train")
for epoch in range(config["num_epochs"]):
running_loss = 0.0
for i, data in enumerate(
train_dataset_shard.iter_torch_batches(batch_size=config["batch_size"])
):
# get the inputs; data is a list of [inputs, labels]
inputs = data["image"].to(device=train.torch.get_device())
labels = data["label"].to(device=train.torch.get_device())
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % 2000 == 1999: # print every 2000 mini-batches
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
dict(running_loss=running_loss),
checkpoint=Checkpoint.from_directory(tmpdir),
)
@click.command(help="Run Batch prediction on Pytorch ResNet models.")
@click.option("--data-size-gb", type=int, default=1)
@click.option("--num-epochs", type=int, default=2)
@click.option("--num-workers", type=int, default=1)
@click.option("--smoke-test", is_flag=True, default=False)
def main(data_size_gb: int, num_epochs=2, num_workers=1, smoke_test: bool = False):
data_url = (
f"s3://anonymous@air-example-data-2/{data_size_gb}G-image-data-synthetic-raw"
)
print(
"Running Pytorch image model training with "
f"{data_size_gb}GB data from {data_url}"
)
print(f"Training for {num_epochs} epochs with {num_workers} workers.")
start = time.time()
if smoke_test:
# Only read one image
data_url = [data_url + "/dog.jpg"]
print("Running smoke test on CPU with a single example")
else:
print(f"Running GPU training with {data_size_gb}GB data from {data_url}")
dataset = ray.data.read_images(data_url, size=(256, 256))
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
dataset = dataset.map_batches(add_fake_labels)
dataset = dataset.map_batches(transform_image, fn_kwargs={"transform": transform})
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={"batch_size": 64, "num_epochs": num_epochs},
datasets={"train": dataset},
scaling_config=ScalingConfig(
num_workers=num_workers, use_gpu=int(not smoke_test)
),
run_config=RunConfig(storage_path="/ray/export"),
)
trainer.fit()
total_time_s = round(time.time() - start, 2)
# For structured output integration with internal tooling
results = {"data_size_gb": data_size_gb, "num_epochs": num_epochs}
results["perf_metrics"] = [
{
"perf_metric_name": "total_time_s",
"perf_metric_value": total_time_s,
"perf_metric_type": "LATENCY",
},
{
"perf_metric_name": "throughout_MB_s",
"perf_metric_value": round(
num_epochs * data_size_gb * 1024 / total_time_s, 2
),
"perf_metric_type": "THROUGHPUT",
},
]
test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/release_test_out.json")
with open(test_output_json, "wt") as f:
json.dump(results, f)
print(results)
if __name__ == "__main__":
main()
kukushking commented
Closed as completed.