awslabs/aiops-modules

[FEATURE]Trigger ray jobs from training containers

Closed this issue · 4 comments

Describe the solution you'd like

Please provide a sample for triggering ray jobs from training containers (app specific). for getting started, it could be within the same namespace, but ideally target across namespaces

Example:

apiVersion: batch/v1
kind: Job
metadata:
  name: ray-hello-1
  namespace: ray
spec:
  template:
    spec:
      containers:
      - name: ray-hello-1
        image: python:3.9.19
        command: ['sh', '-c', 'pip install ray"[client]"==2.30.0 && python -c "import ray; import os; RAY_ADDRESS = os.environ[\"RAY_ADDRESS\"]; ray.init(address=RAY_ADDRESS); print(ray.cluster_resources())"']
        env:
          - name: RAY_ADDRESS
            value: "ray://kuberay-head-svc:10001"
      restartPolicy: Never

E2E example job submission from a container:

apiVersion: batch/v1
kind: Job
metadata:
  name: pytorch-training-e2e-1
  namespace: ray
spec:
  template:
    spec:
      serviceAccountName: ray-operator-ray-operator
      containers:
      - name: pytorch-training-e2e
        image: python:3.9.19
        command: ['sh', '-c', 'pip install ray"[default,client]"==2.30.0 && cd /home/ray/sample/ && ray job submit --address ${RAY_ADDRESS} --working-dir="." -- python pytorch_training_e2e.py']
        env:
          - name: RAY_ADDRESS
            value: "http://kuberay-head-svc:8265"
        volumeMounts:
        - name: code-sample
          mountPath: /home/ray/sample
      restartPolicy: Never
      volumes:
        - name: code-sample
          configMap:
            name: pytorch-training-e2e
            items:
              - key: pytorch_training_e2e.py
                path: pytorch_training_e2e.py
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: pytorch-training-e2e
  namespace: ray
data:
  pytorch_training_e2e.py: |
    import click
    import time
    import json
    import os
    import tempfile
    from typing import Dict
    
    import numpy as np
    from torchvision import transforms
    from torchvision.models import resnet18
    import torch
    import torch.nn as nn
    import torch.optim as optim
    
    import ray
    from ray import train
    from ray.train import Checkpoint, RunConfig, ScalingConfig
    from ray.train.torch import TorchTrainer
    
    
    def add_fake_labels(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        batch_size = len(batch["image"])
        batch["label"] = np.zeros([batch_size], dtype=int)
        return batch
    
    
    def transform_image(
        batch: Dict[str, np.ndarray], transform: torch.nn.Module
    ) -> Dict[str, np.ndarray]:
        transformed_tensors = [transform(image).numpy() for image in batch["image"]]
        batch["image"] = transformed_tensors
        return batch
    
    
    def train_loop_per_worker(config):
        raw_model = resnet18(pretrained=True)
        model = train.torch.prepare_model(raw_model)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    
        train_dataset_shard = train.get_dataset_shard("train")
    
        for epoch in range(config["num_epochs"]):
            running_loss = 0.0
            for i, data in enumerate(
                train_dataset_shard.iter_torch_batches(batch_size=config["batch_size"])
            ):
                # get the inputs; data is a list of [inputs, labels]
                inputs = data["image"].to(device=train.torch.get_device())
                labels = data["label"].to(device=train.torch.get_device())
                # zero the parameter gradients
                optimizer.zero_grad()
    
                # forward + backward + optimize
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
    
                # print statistics
                running_loss += loss.item()
                if i % 2000 == 1999:  # print every 2000 mini-batches
                    print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
                    running_loss = 0.0
    
            with tempfile.TemporaryDirectory() as tmpdir:
                torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
                train.report(
                    dict(running_loss=running_loss),
                    checkpoint=Checkpoint.from_directory(tmpdir),
                )
    
    
    @click.command(help="Run Batch prediction on Pytorch ResNet models.")
    @click.option("--data-size-gb", type=int, default=1)
    @click.option("--num-epochs", type=int, default=2)
    @click.option("--num-workers", type=int, default=1)
    @click.option("--smoke-test", is_flag=True, default=False)
    def main(data_size_gb: int, num_epochs=2, num_workers=1, smoke_test: bool = False):
        data_url = (
            f"s3://anonymous@air-example-data-2/{data_size_gb}G-image-data-synthetic-raw"
        )
        print(
            "Running Pytorch image model training with "
            f"{data_size_gb}GB data from {data_url}"
        )
        print(f"Training for {num_epochs} epochs with {num_workers} workers.")
        start = time.time()
    
        if smoke_test:
            # Only read one image
            data_url = [data_url + "/dog.jpg"]
            print("Running smoke test on CPU with a single example")
        else:
            print(f"Running GPU training with {data_size_gb}GB data from {data_url}")
    
        dataset = ray.data.read_images(data_url, size=(256, 256))
    
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ]
        )
    
        dataset = dataset.map_batches(add_fake_labels)
        dataset = dataset.map_batches(transform_image, fn_kwargs={"transform": transform})
    
        trainer = TorchTrainer(
            train_loop_per_worker=train_loop_per_worker,
            train_loop_config={"batch_size": 64, "num_epochs": num_epochs},
            datasets={"train": dataset},
            scaling_config=ScalingConfig(
                num_workers=num_workers, use_gpu=int(not smoke_test)
            ),
            run_config=RunConfig(storage_path="/ray/export"),
        )
        trainer.fit()
    
        total_time_s = round(time.time() - start, 2)
    
        # For structured output integration with internal tooling
        results = {"data_size_gb": data_size_gb, "num_epochs": num_epochs}
        results["perf_metrics"] = [
            {
                "perf_metric_name": "total_time_s",
                "perf_metric_value": total_time_s,
                "perf_metric_type": "LATENCY",
            },
            {
                "perf_metric_name": "throughout_MB_s",
                "perf_metric_value": round(
                    num_epochs * data_size_gb * 1024 / total_time_s, 2
                ),
                "perf_metric_type": "THROUGHPUT",
            },
        ]
    
        test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/release_test_out.json")
        with open(test_output_json, "wt") as f:
            json.dump(results, f)
    
        print(results)
    
    
    if __name__ == "__main__":
        main()

Closed as completed.