2023-03-17 10:48:00 -07:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
$ ./benchmark_worker_startup.py --help
|
|
|
|
|
usage: benchmark_worker_startup.py [-h] --num_gpus_in_cluster
|
|
|
|
|
NUM_GPUS_IN_CLUSTER
|
|
|
|
|
--num_cpus_in_cluster
|
|
|
|
|
NUM_CPUS_IN_CLUSTER
|
|
|
|
|
--num_tasks_or_actors_per_run
|
|
|
|
|
NUM_TASKS_OR_ACTORS_PER_RUN
|
|
|
|
|
--num_measurements_per_configuration
|
|
|
|
|
NUM_MEASUREMENTS_PER_CONFIGURATION
|
|
|
|
|
|
|
|
|
|
This release test measures Ray worker startup time. Specifically, it
|
|
|
|
|
measures the time to start N different tasks or actors, where each task or
|
|
|
|
|
actor imports a large library (currently PyTorch). N is configurable. The
|
|
|
|
|
test runs under a few different configurations: {task, actor} x {runtime
|
|
|
|
|
env, no runtime env} x {GPU, no GPU} x {cold start, warm start} x {import
|
|
|
|
|
torch, no imports}.
|
|
|
|
|
|
|
|
|
|
options:
|
|
|
|
|
-h, --help show this help message and exit
|
|
|
|
|
--num_gpus_in_cluster NUM_GPUS_IN_CLUSTER
|
|
|
|
|
The number of GPUs in the cluster. This determines
|
|
|
|
|
how many GPU resources each actor/task requests.
|
|
|
|
|
--num_cpus_in_cluster NUM_CPUS_IN_CLUSTER
|
|
|
|
|
The number of CPUs in the cluster. This determines
|
|
|
|
|
how many CPU resources each actor/task requests.
|
|
|
|
|
--num_tasks_or_actors_per_run NUM_TASKS_OR_ACTORS_PER_RUN
|
|
|
|
|
The number of tasks or actors per 'run'. A run
|
|
|
|
|
starts this many tasks/actors and consitutes a
|
|
|
|
|
single measurement. Several runs can be composed
|
|
|
|
|
within a single job for measure warm start, or
|
|
|
|
|
spread across different jobs to measure cold start.
|
|
|
|
|
--num_measurements_per_configuration NUM_MEASUREMENTS_PER_CONFIGURATION
|
|
|
|
|
The number of measurements to record per
|
|
|
|
|
configuration.
|
|
|
|
|
|
|
|
|
|
This script uses test_single_configuration.py to run the actual
|
|
|
|
|
measurements.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import asyncio
|
|
|
|
|
import random
|
|
|
|
|
import statistics
|
|
|
|
|
import subprocess
|
|
|
|
|
import sys
|
2025-11-10 14:01:27 -08:00
|
|
|
from collections import defaultdict
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
|
|
|
|
import ray
|
|
|
|
|
from ray._private.test_utils import safe_write_to_results_json
|
|
|
|
|
from ray.job_submission import JobStatus, JobSubmissionClient
|
2023-03-17 10:48:00 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(
|
|
|
|
|
num_cpus_in_cluster: int,
|
|
|
|
|
num_gpus_in_cluster: int,
|
|
|
|
|
num_tasks_or_actors_per_run: int,
|
|
|
|
|
num_measurements_per_configuration: int,
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Generate test cases, then run them in random order via run_and_stream_logs.
|
|
|
|
|
"""
|
|
|
|
|
metrics_actor_name = "metrics_actor"
|
|
|
|
|
metrics_actor_namespace = "metrics_actor_namespace"
|
|
|
|
|
metrics_actor = MetricsActor.options( # noqa: F841
|
|
|
|
|
name=metrics_actor_name,
|
|
|
|
|
namespace=metrics_actor_namespace,
|
|
|
|
|
).remote(
|
|
|
|
|
expected_measurements_per_test=num_measurements_per_configuration,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
print_disk_config()
|
|
|
|
|
|
|
|
|
|
run_matrix = generate_test_matrix(
|
|
|
|
|
num_cpus_in_cluster,
|
|
|
|
|
num_gpus_in_cluster,
|
|
|
|
|
num_tasks_or_actors_per_run,
|
|
|
|
|
num_measurements_per_configuration,
|
|
|
|
|
)
|
|
|
|
|
print(f"List of tests: {run_matrix}")
|
|
|
|
|
|
|
|
|
|
for test in random.sample(list(run_matrix), k=len(run_matrix)):
|
|
|
|
|
print(f"Running test {test}")
|
|
|
|
|
asyncio.run(
|
|
|
|
|
run_and_stream_logs(
|
|
|
|
|
metrics_actor_name,
|
|
|
|
|
metrics_actor_namespace,
|
|
|
|
|
test,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
2023-03-19 16:28:46 -07:00
|
|
|
|
2023-03-17 10:48:00 -07:00
|
|
|
@ray.remote(num_cpus=0)
|
|
|
|
|
class MetricsActor:
|
|
|
|
|
"""
|
|
|
|
|
Actor which tests will report metrics to.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, expected_measurements_per_test: int):
|
|
|
|
|
self.measurements = defaultdict(list)
|
|
|
|
|
self.expected_measurements_per_test = expected_measurements_per_test
|
|
|
|
|
|
|
|
|
|
def submit(self, test_name: str, latency: float):
|
|
|
|
|
print(f"got latency {latency} s for test {test_name}")
|
|
|
|
|
self.measurements[test_name].append(latency)
|
|
|
|
|
results = self.create_results_dict_from_measurements(
|
|
|
|
|
self.measurements, self.expected_measurements_per_test
|
|
|
|
|
)
|
|
|
|
|
safe_write_to_results_json(results)
|
|
|
|
|
|
|
|
|
|
assert (
|
|
|
|
|
len(self.measurements[test_name]) <= self.expected_measurements_per_test
|
|
|
|
|
), (
|
|
|
|
|
f"Expected {self.measurements[test_name]} to not have more elements than "
|
|
|
|
|
f"{self.expected_measurements_per_test}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def create_results_dict_from_measurements(
|
2023-03-19 16:28:46 -07:00
|
|
|
all_measurements, expected_measurements_per_test
|
2023-03-17 10:48:00 -07:00
|
|
|
):
|
|
|
|
|
results = {}
|
|
|
|
|
perf_metrics = []
|
|
|
|
|
|
2023-03-19 16:28:46 -07:00
|
|
|
for test_name, measurements in all_measurements.items():
|
2023-03-17 10:48:00 -07:00
|
|
|
test_summary = {
|
|
|
|
|
"measurements": measurements,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(measurements) == expected_measurements_per_test:
|
|
|
|
|
median = statistics.median(measurements)
|
|
|
|
|
test_summary["p50"] = median
|
|
|
|
|
perf_metrics.append(
|
|
|
|
|
{
|
|
|
|
|
"perf_metric_name": f"p50.{test_name}",
|
|
|
|
|
"perf_metric_value": median,
|
|
|
|
|
"perf_metric_type": "LATENCY",
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
results[test_name] = test_summary
|
|
|
|
|
|
|
|
|
|
results["perf_metrics"] = perf_metrics
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_disk_config():
|
|
|
|
|
print("Getting disk sizes via df -h")
|
|
|
|
|
subprocess.check_call("df -h", shell=True)
|
|
|
|
|
|
2023-03-19 16:28:46 -07:00
|
|
|
|
2023-03-17 10:48:00 -07:00
|
|
|
def generate_test_matrix(
|
|
|
|
|
num_cpus_in_cluster: int,
|
|
|
|
|
num_gpus_in_cluster: int,
|
|
|
|
|
num_tasks_or_actors_per_run: int,
|
|
|
|
|
num_measurements_per_test: int,
|
|
|
|
|
):
|
|
|
|
|
|
|
|
|
|
num_repeated_jobs_or_runs = num_measurements_per_test
|
|
|
|
|
total_num_tasks_or_actors = num_tasks_or_actors_per_run * num_repeated_jobs_or_runs
|
|
|
|
|
|
|
|
|
|
num_jobs_per_type = {
|
|
|
|
|
"cold_start": num_repeated_jobs_or_runs,
|
|
|
|
|
"warm_start": 1,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
imports_to_try = ["torch", "none"]
|
|
|
|
|
|
|
|
|
|
tests = set()
|
|
|
|
|
|
|
|
|
|
for with_tasks in [True, False]:
|
|
|
|
|
for with_gpu in [True, False]:
|
2023-05-12 11:02:50 -07:00
|
|
|
# Do not run without runtime env. TODO(cade) Infra team added cgroups to
|
|
|
|
|
# default runtime env, need to find some way around that if we want
|
|
|
|
|
# "pure" (non-runtime-env) measurements.
|
|
|
|
|
for with_runtime_env in [True]:
|
2023-03-17 10:48:00 -07:00
|
|
|
for import_to_try in imports_to_try:
|
|
|
|
|
for num_jobs in num_jobs_per_type.values():
|
|
|
|
|
|
|
|
|
|
num_tasks_or_actors_per_job = (
|
|
|
|
|
total_num_tasks_or_actors // num_jobs
|
|
|
|
|
)
|
|
|
|
|
num_runs_per_job = (
|
|
|
|
|
num_tasks_or_actors_per_job // num_tasks_or_actors_per_run
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
test = TestConfiguration(
|
|
|
|
|
num_jobs=num_jobs,
|
|
|
|
|
num_runs_per_job=num_runs_per_job,
|
|
|
|
|
num_tasks_or_actors_per_run=num_tasks_or_actors_per_run,
|
|
|
|
|
with_tasks=with_tasks,
|
|
|
|
|
with_gpu=with_gpu,
|
|
|
|
|
with_runtime_env=with_runtime_env,
|
|
|
|
|
import_to_try=import_to_try,
|
|
|
|
|
num_cpus_in_cluster=num_cpus_in_cluster,
|
|
|
|
|
num_gpus_in_cluster=num_gpus_in_cluster,
|
|
|
|
|
num_nodes_in_cluster=1,
|
|
|
|
|
)
|
|
|
|
|
tests.add(test)
|
|
|
|
|
|
|
|
|
|
return tests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(eq=True, frozen=True)
|
|
|
|
|
class TestConfiguration:
|
|
|
|
|
num_jobs: int
|
|
|
|
|
num_runs_per_job: int
|
|
|
|
|
num_tasks_or_actors_per_run: int
|
|
|
|
|
with_gpu: bool
|
|
|
|
|
with_tasks: bool
|
|
|
|
|
with_runtime_env: bool
|
|
|
|
|
import_to_try: str
|
|
|
|
|
num_cpus_in_cluster: int
|
|
|
|
|
num_gpus_in_cluster: int
|
|
|
|
|
num_nodes_in_cluster: int
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
with_gpu_str = "with_gpu" if self.with_gpu else "without_gpu"
|
|
|
|
|
executable_unit = "tasks" if self.with_tasks else "actors"
|
|
|
|
|
cold_or_warm_start = "cold" if self.num_jobs > 1 else "warm"
|
|
|
|
|
with_runtime_env_str = (
|
|
|
|
|
"with_runtime_env" if self.with_runtime_env else "without_runtime_env"
|
|
|
|
|
)
|
|
|
|
|
single_node_or_multi_node = (
|
|
|
|
|
"single_node" if self.num_nodes_in_cluster == 1 else "multi_node"
|
|
|
|
|
)
|
|
|
|
|
import_torch_or_none = (
|
|
|
|
|
"import_torch" if self.import_to_try == "torch" else "no_import"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return "-".join(
|
|
|
|
|
[
|
2023-03-19 16:28:46 -07:00
|
|
|
f"seconds_to_{cold_or_warm_start}_start_"
|
|
|
|
|
f"{self.num_tasks_or_actors_per_run}_{executable_unit}",
|
2023-03-17 10:48:00 -07:00
|
|
|
import_torch_or_none,
|
|
|
|
|
with_gpu_str,
|
|
|
|
|
single_node_or_multi_node,
|
|
|
|
|
with_runtime_env_str,
|
2023-03-19 16:28:46 -07:00
|
|
|
f"{self.num_cpus_in_cluster}_CPU_{self.num_gpus_in_cluster}"
|
|
|
|
|
"_GPU_cluster",
|
2023-03-17 10:48:00 -07:00
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_and_stream_logs(
|
|
|
|
|
metrics_actor_name, metrics_actor_namespace, test: TestConfiguration
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Run a particular test configuration by invoking ./test_single_configuration.py.
|
|
|
|
|
"""
|
|
|
|
|
client = JobSubmissionClient("http://127.0.0.1:8265")
|
|
|
|
|
entrypoint = generate_entrypoint(metrics_actor_name, metrics_actor_namespace, test)
|
|
|
|
|
|
|
|
|
|
for _ in range(test.num_jobs):
|
|
|
|
|
print(f"Running {entrypoint}")
|
|
|
|
|
|
|
|
|
|
if not test.with_runtime_env:
|
|
|
|
|
# On non-workspaces, this will run as a job but without a runtime env.
|
|
|
|
|
subprocess.check_call(entrypoint, shell=True)
|
|
|
|
|
else:
|
|
|
|
|
job_id = client.submit_job(
|
|
|
|
|
entrypoint=entrypoint,
|
|
|
|
|
runtime_env={"working_dir": "./"},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
async for lines in client.tail_job_logs(job_id):
|
|
|
|
|
print(lines, end="")
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print(f"Stopping job {job_id}")
|
|
|
|
|
client.stop_job(job_id)
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
job_status = client.get_job_status(job_id)
|
|
|
|
|
if job_status != JobStatus.SUCCEEDED:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Job {job_id} was not successful; status is {job_status}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_entrypoint(
|
|
|
|
|
metrics_actor_name: str, metrics_actor_namespace: str, test: TestConfiguration
|
|
|
|
|
):
|
|
|
|
|
|
|
|
|
|
task_or_actor_arg = "--with_tasks" if test.with_tasks else "--with_actors"
|
|
|
|
|
with_gpu_arg = "--with_gpu" if test.with_gpu else "--without_gpu"
|
|
|
|
|
with_runtime_env_arg = (
|
|
|
|
|
"--with_runtime_env" if test.with_runtime_env else "--without_runtime_env"
|
|
|
|
|
)
|
|
|
|
|
return " ".join(
|
|
|
|
|
[
|
|
|
|
|
"python ./test_single_configuration.py",
|
|
|
|
|
f"--metrics_actor_name {metrics_actor_name}",
|
|
|
|
|
f"--metrics_actor_namespace {metrics_actor_namespace}",
|
|
|
|
|
f"--test_name {test}",
|
|
|
|
|
f"--num_runs {test.num_runs_per_job} ",
|
|
|
|
|
f"--num_tasks_or_actors_per_run {test.num_tasks_or_actors_per_run}",
|
|
|
|
|
f"--num_cpus_in_cluster {test.num_cpus_in_cluster}",
|
|
|
|
|
f"--num_gpus_in_cluster {test.num_gpus_in_cluster}",
|
|
|
|
|
task_or_actor_arg,
|
|
|
|
|
with_gpu_arg,
|
|
|
|
|
with_runtime_env_arg,
|
|
|
|
|
f"--library_to_import {test.import_to_try}",
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args():
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
|
description="This release test measures Ray worker startup time. "
|
|
|
|
|
"Specifically, it measures the time to start N different tasks or"
|
|
|
|
|
" actors, where each task or actor imports a large library ("
|
|
|
|
|
"currently PyTorch). N is configurable.\nThe test runs under a "
|
|
|
|
|
"few different configurations: {task, actor} x {runtime env, "
|
|
|
|
|
"no runtime env} x {GPU, no GPU} x {cold start, warm start} x "
|
|
|
|
|
"{import torch, no imports}.",
|
|
|
|
|
epilog="This script uses test_single_configuration.py to run the "
|
|
|
|
|
"actual measurements.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--num_gpus_in_cluster",
|
|
|
|
|
type=int,
|
|
|
|
|
required=True,
|
|
|
|
|
help="The number of GPUs in the cluster. This determines how many "
|
|
|
|
|
"GPU resources each actor/task requests.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--num_cpus_in_cluster",
|
|
|
|
|
type=int,
|
|
|
|
|
required=True,
|
|
|
|
|
help="The number of CPUs in the cluster. This determines how many "
|
|
|
|
|
"CPU resources each actor/task requests.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--num_tasks_or_actors_per_run",
|
|
|
|
|
type=int,
|
|
|
|
|
required=True,
|
|
|
|
|
help="The number of tasks or actors per 'run'. A run starts this "
|
|
|
|
|
"many tasks/actors and consitutes a single measurement. Several "
|
|
|
|
|
"runs can be composed within a single job for measure warm start, "
|
|
|
|
|
"or spread across different jobs to measure cold start.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--num_measurements_per_configuration",
|
|
|
|
|
type=int,
|
|
|
|
|
required=True,
|
|
|
|
|
help="The number of measurements to record per configuration.",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
args = parse_args()
|
|
|
|
|
sys.exit(
|
|
|
|
|
main(
|
|
|
|
|
args.num_cpus_in_cluster,
|
|
|
|
|
args.num_gpus_in_cluster,
|
|
|
|
|
args.num_tasks_or_actors_per_run,
|
|
|
|
|
args.num_measurements_per_configuration,
|
|
|
|
|
)
|
|
|
|
|
)
|