Skip to content

Instantly share code, notes, and snippets.

@egafni
Created April 11, 2023 16:40
Show Gist options
  • Save egafni/474c5a7796a1ea2b87c92afb9574ecc4 to your computer and use it in GitHub Desktop.
Save egafni/474c5a7796a1ea2b87c92afb9574ecc4 to your computer and use it in GitHub Desktop.
"""
Single entry point for running a script locally inside docker, as a GKE job, or as an ai-platform job
python myproject/run.py ai-platform \
--master-accelerator type=nvidia-tesla-t4,count=1 \
--master-machine-type n1-standard-8 \
-n $JOBNAME \
--polling-interval 10 --region us-central1 \
--docker-uri gcr.io/xyz/myproject:$TAG -- \
python3 myproject/train.py --train_config $CONFIG_PATH
"""
import argparse
import io
import json
import logging
import os
import shlex
import subprocess
import time
from dataclasses import dataclass
from getpass import getuser
from subprocess import check_output
from tempfile import NamedTemporaryFile
from typing import List, Optional
import yaml
from more_itertools import one
# put in repo/constants.py
REPO_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def run(cmd, do_print=True, must_succeed=True):
"""run `cmd` inside a bash shell and stream output to console"""
if do_print:
logging.info(f"*** Running Command: ****\n$ {cmd}")
returncode = os.system(cmd)
if returncode != 0:
msg = f"returncode: {returncode}, cmd: {cmd}"
if must_succeed:
raise RuntimeError(msg)
else:
logging.error(msg)
def build_docker(docker_uri):
# build docker container
# note: If in the future we need docker build to have ssh credentials (for example to clone a git repo)
# we could add "--ssh default" to the docker build command to safely give docker build the ssh credentials
# of the user running this build. However, it requires that the user have ssh agent setup properly which has
# has proven non-trivial and a pain for ML developers.
run(f"cd {REPO_DIR} && export DOCKER_URI={docker_uri} && bash scripts/build_docker.sh")
def push_docker(docker_uri):
# push container
run(f"docker push {docker_uri}")
DEFAULT_REGION = "us-central1"
REGION_CHOICES = ("us-central1", "us-west1")
def get_git_branch():
# note this will cause an exception if called from inside a Docker container
# since we avoid copying the .git files
return subprocess.check_output("git rev-parse --abbrev-ref HEAD", shell=True).strip()
def get_docker_uri(project='myproject',image='repo_name'):
git_branch = get_git_branch().decode().strip()
# remove slashes because docker tags can't have a / in them, but branches are often named user/feature
git_branch = git_branch.replace("/", "___")
return f"gcr.io/{project}/{image}:{git_branch}"
@dataclass
class Resource:
cpu: Optional[int] = None
gpu: Optional[int] = None
gpu_type: Optional[str] = None # ex: 'nvidia-tesla-t4'
@property
def limit(self):
"""kubernetes config value for limits variable"""
d = {"limits": {}}
if self.cpu:
d["limits"]["cpu"] = self.cpu
if self.gpu:
d["limits"]["nvidia.com/gpu"] = self.gpu
return d
RESOURCES = {
"1-cpu": Resource(cpu=1),
"8-cpu": Resource(cpu=8),
"16-cpu": Resource(cpu=16),
"1-gpu-a100": Resource(gpu=1, gpu_type="nvidia-tesla-a100"),
"1-gpu-t4": Resource(gpu=1, gpu_type="nvidia-tesla-t4"),
}
DEFAULT_CLUSTER_NAME = "ml-cluster2"
JOB_TEMPLATE_YAML = """
apiVersion: batch/v1
kind: Job
metadata:
name: REPLACE_ME
spec:
backoffLimit: 1 # only retry once before considering a job failed
template:
metadata:
annotations:
# This version MUST match the TensorFlow version that your model is built on.
# TODO I think I can remove this entire annotation
tf-version.cloud-tpus.google.com: "2.3"
spec:
volumes:
- name: dshm # required to change the size of /dev/shm
emptyDir:
medium: Memory
restartPolicy: Never
containers:
- name: myproject
image: REPLACE_ME
command:
- REPLACE_ME
volumeMounts:
- mountPath: /dev/shm # required to change the size of /dev/shm
name: dshm
""" # noqa: W291
def get_gke_job_job_dict(name: str, command: List[str], docker_uri: str, resources: str, env: Optional[dict] = None):
if env is None:
env = dict()
with io.StringIO(JOB_TEMPLATE_YAML) as fp:
config = yaml.load(fp, Loader=yaml.Loader)
assert len(config), "config cannot be empty"
config["metadata"]["name"] = f"ml-{getuser()}-{name}"
container = one(config["spec"]["template"]["spec"]["containers"])
container["command"] = command
container["env"] = list(env.items())
container["image"] = docker_uri
resource = RESOURCES[resources]
container["resources"] = resource.limit
if resource.gpu_type:
spec = config["spec"]["template"]["spec"]
spec.setdefault("nodeSelector", dict())["cloud.google.com/gke-accelerator"] = resource.gpu_type
# by default is pre-emptible, should probably re-enable this though
# container['nodeSelector'] = {'cloud.google.com/gke-preemptible': "true"}
assert len(config["metadata"]["name"]) < 52
return config
def main():
p = argparse.ArgumentParser()
sps = p.add_subparsers(help="job type")
p1 = sps.add_parser("docker", help="run command locally inside docker")
p1.set_defaults(job_type="docker")
p1.add_argument(
"--docker-uri",
default=get_docker_uri(),
help=f"docker uri to use, default uses the branch name for the tag and " f"is {get_docker_uri()}",
)
p1.add_argument("command", nargs="*", help="The command to run")
p2 = sps.add_parser("gke_job", help="submit job to google cloud and run inside docker")
p2.set_defaults(job_type="gke_job")
p2.add_argument(
"-n",
"--name",
required=True,
help="Name of the experiment. " "Will also be the job name, if this is an ai-platform job.",
)
p2.add_argument("-r", "--resources", help=f"If job_type is `gke_job`, must be one of: {RESOURCES.keys()}")
p2.add_argument("-c", "--cluster_name", help="kubernetes cluster to submit to ", default=DEFAULT_CLUSTER_NAME)
p2.add_argument("command", nargs="*", help="The command to run")
p2.add_argument(
"--docker-uri",
default=get_docker_uri(),
help=f"docker uri to use, default uses the branch name for the tag and " f"is {get_docker_uri()}",
)
p3 = sps.add_parser("ai-platform", help="submit job to google cloud using ai-platform")
p3.set_defaults(job_type="ai-platform")
p3.add_argument(
"-n",
"--name",
required=True,
help="Name of the experiment. " "Will also be the job name, if this is an ai-platform job.",
)
p3.add_argument("--master-machine-type", default="n1-standard-8", help='ex "n1-standard-16"')
p3.add_argument("--master-accelerator", help='ex "type=nvidia-tesla-t4,count=1')
p3.add_argument(
"-s",
"--skip-stream-logs",
action="store_true",
help="exit immediately after job is submitted without streaming logs",
)
p3.add_argument("--region", help="region to launch job", default=DEFAULT_REGION, choices=REGION_CHOICES)
p3.add_argument("-pi", "--polling-interval", type=int, default=5, help="polling interval for stream logs")
p3.add_argument("--skip-docker-build", action="store_true", help="skipping building/pushing the docker container")
p3.add_argument(
"--docker-uri",
default=get_docker_uri(),
help=f"docker uri to use, default uses the branch name for the tag and " f"is {get_docker_uri()}",
)
p3.add_argument(
"--billing-labels",
required=True,
type=str,
help="""List of label KEY=VALUE pairs to add.
Keys must start with a lowercase character and contain only hyphens
(-), underscores (_), lowercase characters, and numbers. Values must
contain only hyphens (-), underscores (_), lowercase characters, and
numbers.""",
)
p3.add_argument("command", nargs="*", help="The command to run")
args = p.parse_args()
print(args)
# FIXME make sure repo is not dirty so we can tag with the checksum and log the checksum in order to reproduce runs
# FIXME check if ai-platform job name is available early
# parse command back into a command line string that be passed to training script
if len(args.command) == 0:
raise ValueError(f"Syntax is run.py RUNARGS -- COMMAND: {args.command}")
# todo replace with shlex.join after upgrading python version
# note shlex.quote twice because the Dockerfile entry point is "bash -c"
command_str = shlex.quote(" ".join(shlex.quote(s) for s in args.command))
if args.job_type == "docker":
# notes:
# --runtime=nvidia enables gpu
# -v "$HOME/.config/gke_job/application_default_credentials.json":/gcp/creds.json:ro' mounts local google auth
# --env GOOGLE_APPLICATION_CREDENTIALS=/gcp/creds.json' lets tools know where the google auth creds are
if not args.skip_docker_build:
build_docker(args.docker_uri)
run(
f"docker run --runtime=nvidia -it --rm --shm-size=16g"
f' -v "$HOME/.config/gcloud/application_default_credentials.json":/gcp/creds.json:ro'
f" --env GOOGLE_APPLICATION_CREDENTIALS=/gcp/creds.json"
f' --env CUDA_VISIBLE_DEVICES={os.environ["CUDA_VISIBLE_DEVICES"]}'
f" {args.docker_uri}"
f" {command_str}"
)
elif args.job_type == "gke_job":
if "_" in args.name:
raise ValueError("GKE does not allow experiment names with underscores")
assert args.resources in RESOURCES.keys(), f"{args.resources} must be one of {RESOURCES.keys()}"
config = get_gke_job_job_dict(
name=args.name, command=args.command, docker_uri=args.docker_uri, env=dict(), resources=args.resources
)
# write job config to a temporary yaml file
with NamedTemporaryFile(mode="w") as fp:
fp.write(yaml.dump(config))
fp.flush()
run(f"cat {fp.name}")
run(f"gcloud container clusters get-credentials {args.cluster_name} && kubectl create -f {fp.name}")
run(f'kubectl describe pod -l job-name={config["metadata"]["name"]}')
print("\n*** Useful Commands ***")
print(f'+ kubectl describe pod -l job-name={config["metadata"]["name"]}')
print(f'+ kubectl logs -l job-name={config["metadata"]["name"]}')
print(f'+ kubectl logs -f -l job-name={config["metadata"]["name"]}')
print(f'+ kubectl attach -l job-name={config["metadata"]["name"]}')
print(f'+ kubectl exec -l job-name={config["metadata"]["name"]} -- ls')
print(f'+ kubectl delete pod -l job-name={config["metadata"]["name"]}')
print(f'+ kubectl get pod -l job-name={config["metadata"]["name"]} --output=yaml')
print("helpful links")
print("https://kubernetes.io/docs/tasks/debug-application-cluster/determine-reason-pod-failure")
elif args.job_type == "ai-platform":
if "-" in args.name and args.job_type == "ai-platform":
raise ValueError("ai-platform does not allow experiment names with dashes")
if not args.skip_docker_build:
build_docker(args.docker_uri)
push_docker(args.docker_uri)
job_name = args.name
cmd = [
f"gcloud ai-platform jobs submit training {job_name} --scale-tier custom",
f"--master-image-uri {args.docker_uri}",
f"--master-machine-type {args.master_machine_type}",
"--enable-web-access",
f"--region {args.region}",
]
if args.master_accelerator:
cmd.append(f"--master-accelerator {args.master_accelerator}")
if args.billing_labels:
cmd.append(f"--labels {args.billing_labels}")
run(" ".join(cmd) + " -- " + command_str)
run(f"gcloud ai-platform jobs describe {job_name}")
print("\n*** Useful Commands ***")
print(f"+ gcloud ai-platform jobs cancel {job_name}")
# wait for state to not be preparing
state = "QUEUED"
describe_output = dict()
while state in ["QUEUED", "PREPARING"]:
describe_output = json.loads(
check_output(f"gcloud ai-platform jobs describe {job_name} --format json", shell=True)
)
state = describe_output["state"].strip()
print(state, end=" ", flush=True)
time.sleep(10)
else:
print()
print(describe_output, flush=True)
if not args.skip_stream_logs:
run(f"gcloud ai-platform jobs stream-logs {job_name} --polling-interval {args.polling_interval}")
else:
raise ValueError(f"{args.job_type} is invalid")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment