SIGN IN SIGN UP
ray-project / ray UNCLAIMED

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.

41876 0 1 Python
#!/usr/bin/env python3
"""
$ ./benchmark_worker_startup.py --help
usage: benchmark_worker_startup.py [-h] --num_gpus_in_cluster
NUM_GPUS_IN_CLUSTER
--num_cpus_in_cluster
NUM_CPUS_IN_CLUSTER
--num_tasks_or_actors_per_run
NUM_TASKS_OR_ACTORS_PER_RUN
--num_measurements_per_configuration
NUM_MEASUREMENTS_PER_CONFIGURATION
This release test measures Ray worker startup time. Specifically, it
measures the time to start N different tasks or actors, where each task or
actor imports a large library (currently PyTorch). N is configurable. The
test runs under a few different configurations: {task, actor} x {runtime
env, no runtime env} x {GPU, no GPU} x {cold start, warm start} x {import
torch, no imports}.
options:
-h, --help show this help message and exit
--num_gpus_in_cluster NUM_GPUS_IN_CLUSTER
The number of GPUs in the cluster. This determines
how many GPU resources each actor/task requests.
--num_cpus_in_cluster NUM_CPUS_IN_CLUSTER
The number of CPUs in the cluster. This determines
how many CPU resources each actor/task requests.
--num_tasks_or_actors_per_run NUM_TASKS_OR_ACTORS_PER_RUN
The number of tasks or actors per 'run'. A run
starts this many tasks/actors and consitutes a
single measurement. Several runs can be composed
within a single job for measure warm start, or
spread across different jobs to measure cold start.
--num_measurements_per_configuration NUM_MEASUREMENTS_PER_CONFIGURATION
The number of measurements to record per
configuration.
This script uses test_single_configuration.py to run the actual
measurements.
"""
import argparse
import asyncio
import random
import statistics
import subprocess
import sys
from collections import defaultdict
from dataclasses import dataclass
import ray
from ray._private.test_utils import safe_write_to_results_json
from ray.job_submission import JobStatus, JobSubmissionClient
def main(
num_cpus_in_cluster: int,
num_gpus_in_cluster: int,
num_tasks_or_actors_per_run: int,
num_measurements_per_configuration: int,
):
"""
Generate test cases, then run them in random order via run_and_stream_logs.
"""
metrics_actor_name = "metrics_actor"
metrics_actor_namespace = "metrics_actor_namespace"
metrics_actor = MetricsActor.options( # noqa: F841
name=metrics_actor_name,
namespace=metrics_actor_namespace,
).remote(
expected_measurements_per_test=num_measurements_per_configuration,
)
print_disk_config()
run_matrix = generate_test_matrix(
num_cpus_in_cluster,
num_gpus_in_cluster,
num_tasks_or_actors_per_run,
num_measurements_per_configuration,
)
print(f"List of tests: {run_matrix}")
for test in random.sample(list(run_matrix), k=len(run_matrix)):
print(f"Running test {test}")
asyncio.run(
run_and_stream_logs(
metrics_actor_name,
metrics_actor_namespace,
test,
)
)
@ray.remote(num_cpus=0)
class MetricsActor:
"""
Actor which tests will report metrics to.
"""
def __init__(self, expected_measurements_per_test: int):
self.measurements = defaultdict(list)
self.expected_measurements_per_test = expected_measurements_per_test
def submit(self, test_name: str, latency: float):
print(f"got latency {latency} s for test {test_name}")
self.measurements[test_name].append(latency)
results = self.create_results_dict_from_measurements(
self.measurements, self.expected_measurements_per_test
)
safe_write_to_results_json(results)
assert (
len(self.measurements[test_name]) <= self.expected_measurements_per_test
), (
f"Expected {self.measurements[test_name]} to not have more elements than "
f"{self.expected_measurements_per_test}"
)
@staticmethod
def create_results_dict_from_measurements(
all_measurements, expected_measurements_per_test
):
results = {}
perf_metrics = []
for test_name, measurements in all_measurements.items():
test_summary = {
"measurements": measurements,
}
if len(measurements) == expected_measurements_per_test:
median = statistics.median(measurements)
test_summary["p50"] = median
perf_metrics.append(
{
"perf_metric_name": f"p50.{test_name}",
"perf_metric_value": median,
"perf_metric_type": "LATENCY",
}
)
results[test_name] = test_summary
results["perf_metrics"] = perf_metrics
return results
def print_disk_config():
print("Getting disk sizes via df -h")
subprocess.check_call("df -h", shell=True)
def generate_test_matrix(
num_cpus_in_cluster: int,
num_gpus_in_cluster: int,
num_tasks_or_actors_per_run: int,
num_measurements_per_test: int,
):
num_repeated_jobs_or_runs = num_measurements_per_test
total_num_tasks_or_actors = num_tasks_or_actors_per_run * num_repeated_jobs_or_runs
num_jobs_per_type = {
"cold_start": num_repeated_jobs_or_runs,
"warm_start": 1,
}
imports_to_try = ["torch", "none"]
tests = set()
for with_tasks in [True, False]:
for with_gpu in [True, False]:
# Do not run without runtime env. TODO(cade) Infra team added cgroups to
# default runtime env, need to find some way around that if we want
# "pure" (non-runtime-env) measurements.
for with_runtime_env in [True]:
for import_to_try in imports_to_try:
for num_jobs in num_jobs_per_type.values():
num_tasks_or_actors_per_job = (
total_num_tasks_or_actors // num_jobs
)
num_runs_per_job = (
num_tasks_or_actors_per_job // num_tasks_or_actors_per_run
)
test = TestConfiguration(
num_jobs=num_jobs,
num_runs_per_job=num_runs_per_job,
num_tasks_or_actors_per_run=num_tasks_or_actors_per_run,
with_tasks=with_tasks,
with_gpu=with_gpu,
with_runtime_env=with_runtime_env,
import_to_try=import_to_try,
num_cpus_in_cluster=num_cpus_in_cluster,
num_gpus_in_cluster=num_gpus_in_cluster,
num_nodes_in_cluster=1,
)
tests.add(test)
return tests
@dataclass(eq=True, frozen=True)
class TestConfiguration:
num_jobs: int
num_runs_per_job: int
num_tasks_or_actors_per_run: int
with_gpu: bool
with_tasks: bool
with_runtime_env: bool
import_to_try: str
num_cpus_in_cluster: int
num_gpus_in_cluster: int
num_nodes_in_cluster: int
def __repr__(self):
with_gpu_str = "with_gpu" if self.with_gpu else "without_gpu"
executable_unit = "tasks" if self.with_tasks else "actors"
cold_or_warm_start = "cold" if self.num_jobs > 1 else "warm"
with_runtime_env_str = (
"with_runtime_env" if self.with_runtime_env else "without_runtime_env"
)
single_node_or_multi_node = (
"single_node" if self.num_nodes_in_cluster == 1 else "multi_node"
)
import_torch_or_none = (
"import_torch" if self.import_to_try == "torch" else "no_import"
)
return "-".join(
[
f"seconds_to_{cold_or_warm_start}_start_"
f"{self.num_tasks_or_actors_per_run}_{executable_unit}",
import_torch_or_none,
with_gpu_str,
single_node_or_multi_node,
with_runtime_env_str,
f"{self.num_cpus_in_cluster}_CPU_{self.num_gpus_in_cluster}"
"_GPU_cluster",
]
)
async def run_and_stream_logs(
metrics_actor_name, metrics_actor_namespace, test: TestConfiguration
):
"""
Run a particular test configuration by invoking ./test_single_configuration.py.
"""
client = JobSubmissionClient("http://127.0.0.1:8265")
entrypoint = generate_entrypoint(metrics_actor_name, metrics_actor_namespace, test)
for _ in range(test.num_jobs):
print(f"Running {entrypoint}")
if not test.with_runtime_env:
# On non-workspaces, this will run as a job but without a runtime env.
subprocess.check_call(entrypoint, shell=True)
else:
job_id = client.submit_job(
entrypoint=entrypoint,
runtime_env={"working_dir": "./"},
)
try:
async for lines in client.tail_job_logs(job_id):
print(lines, end="")
except KeyboardInterrupt:
print(f"Stopping job {job_id}")
client.stop_job(job_id)
raise
job_status = client.get_job_status(job_id)
if job_status != JobStatus.SUCCEEDED:
raise ValueError(
f"Job {job_id} was not successful; status is {job_status}"
)
def generate_entrypoint(
metrics_actor_name: str, metrics_actor_namespace: str, test: TestConfiguration
):
task_or_actor_arg = "--with_tasks" if test.with_tasks else "--with_actors"
with_gpu_arg = "--with_gpu" if test.with_gpu else "--without_gpu"
with_runtime_env_arg = (
"--with_runtime_env" if test.with_runtime_env else "--without_runtime_env"
)
return " ".join(
[
"python ./test_single_configuration.py",
f"--metrics_actor_name {metrics_actor_name}",
f"--metrics_actor_namespace {metrics_actor_namespace}",
f"--test_name {test}",
f"--num_runs {test.num_runs_per_job} ",
f"--num_tasks_or_actors_per_run {test.num_tasks_or_actors_per_run}",
f"--num_cpus_in_cluster {test.num_cpus_in_cluster}",
f"--num_gpus_in_cluster {test.num_gpus_in_cluster}",
task_or_actor_arg,
with_gpu_arg,
with_runtime_env_arg,
f"--library_to_import {test.import_to_try}",
]
)
def parse_args():
parser = argparse.ArgumentParser(
description="This release test measures Ray worker startup time. "
"Specifically, it measures the time to start N different tasks or"
" actors, where each task or actor imports a large library ("
"currently PyTorch). N is configurable.\nThe test runs under a "
"few different configurations: {task, actor} x {runtime env, "
"no runtime env} x {GPU, no GPU} x {cold start, warm start} x "
"{import torch, no imports}.",
epilog="This script uses test_single_configuration.py to run the "
"actual measurements.",
)
parser.add_argument(
"--num_gpus_in_cluster",
type=int,
required=True,
help="The number of GPUs in the cluster. This determines how many "
"GPU resources each actor/task requests.",
)
parser.add_argument(
"--num_cpus_in_cluster",
type=int,
required=True,
help="The number of CPUs in the cluster. This determines how many "
"CPU resources each actor/task requests.",
)
parser.add_argument(
"--num_tasks_or_actors_per_run",
type=int,
required=True,
help="The number of tasks or actors per 'run'. A run starts this "
"many tasks/actors and consitutes a single measurement. Several "
"runs can be composed within a single job for measure warm start, "
"or spread across different jobs to measure cold start.",
)
parser.add_argument(
"--num_measurements_per_configuration",
type=int,
required=True,
help="The number of measurements to record per configuration.",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
sys.exit(
main(
args.num_cpus_in_cluster,
args.num_gpus_in_cluster,
args.num_tasks_or_actors_per_run,
args.num_measurements_per_configuration,
)
)