Source code for rastervision.aws_batch.aws_batch_runner

import logging
import os
import uuid
from typing import List, Optional

from rastervision.pipeline import rv_config_ as rv_config
from rastervision.pipeline.runner import Runner

log = logging.getLogger(__name__)
AWS_BATCH = 'batch'


[docs]def submit_job(cmd: List[str], job_name: str, debug: bool = False, profile: str = False, attempts: int = 5, parent_job_ids: List[str] = None, num_array_jobs: Optional[int] = None, use_gpu: bool = False, job_queue: Optional[str] = None, job_def: Optional[str] = None) -> str: """Submit a job to run on AWS Batch. Args: cmd: a command to run in the Docker container for the remote job debug: if True, run the command using a ptvsd wrapper which sets up a remote VS Code Python debugger server profile: if True, run the command using kernprof, a line profiler attempts: the number of times to try running the command which is useful in case of failure. parent_job_ids: optional list of parent Batch job ids. The job created by this will only run after the parent jobs complete successfully. num_array_jobs: if set, make this a Batch array job with size equal to num_array_jobs use_gpu: if True, run the job in a GPU-enabled queue job_queue: if set, use this job queue job_def: if set, use this job definition """ batch_config = rv_config.get_namespace_config(AWS_BATCH) if job_queue is None: if use_gpu: job_queue = batch_config('gpu_job_queue') else: job_queue = batch_config('cpu_job_queue') if job_def is None: if use_gpu: job_def = batch_config('gpu_job_def') else: job_def = batch_config('cpu_job_def') import boto3 client = boto3.client('batch') cmd_list = cmd.split(' ') if debug: cmd_list = [ 'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006', '--wait', '-m' ] + cmd_list if profile: cmd_list = ['kernprof', '-v', '-l'] + cmd_list kwargs = { 'jobName': job_name, 'jobQueue': job_queue, 'jobDefinition': job_def, 'containerOverrides': { 'command': cmd_list }, 'retryStrategy': { 'attempts': attempts }, } if parent_job_ids: kwargs['dependsOn'] = [{'jobId': id} for id in parent_job_ids] if num_array_jobs: kwargs['arrayProperties'] = {'size': num_array_jobs} job_id = client.submit_job(**kwargs)['jobId'] msg = 'submitted job with jobName={} and jobId={} w/ parent(s)={}'.format( job_name, job_id, parent_job_ids) log.info(msg) log.info(cmd_list) return job_id
[docs]class AWSBatchRunner(Runner): """Runs pipelines remotely using AWS Batch. Requires Everett configuration of form: ``` [AWS_BATCH] cpu_job_queue= cpu_job_def= gpu_job_queue= gpu_job_def= attempts= ``` """
[docs] def run(self, cfg_json_uri, pipeline, commands, num_splits=1, pipeline_run_name: str = 'raster-vision'): parent_job_ids = [] # pipeline-specific job queue if hasattr(pipeline, 'job_queue'): pipeline_job_queue = pipeline.job_queue else: pipeline_job_queue = None # pipeline-specific job definition if hasattr(pipeline, 'job_def'): pipeline_job_def = pipeline.job_def else: pipeline_job_def = None for command in commands: # command-specific job queue, job definition job_def = pipeline_job_def job_queue = pipeline_job_queue if hasattr(pipeline, command): fn = getattr(pipeline, command) if hasattr(fn, 'job_def'): job_def = fn.job_def if hasattr(fn, 'job_queue'): job_queue = fn.job_queue num_array_jobs = None use_gpu = command in pipeline.gpu_commands job_name = f'{pipeline_run_name}-{command}-{uuid.uuid4()}' cmd = ['python', '-m', 'rastervision.pipeline.cli'] if rv_config.get_verbosity() > 1: cmd.append('-' + 'v' * (rv_config.get_verbosity() - 1)) cmd.extend( ['run_command', cfg_json_uri, command, '--runner', AWS_BATCH]) if command in pipeline.split_commands and num_splits > 1: num_array_jobs = num_splits cmd += ['--num-splits', str(num_splits)] job_id = submit_job( cmd=' '.join(cmd), job_name=job_name, parent_job_ids=parent_job_ids, num_array_jobs=num_array_jobs, use_gpu=use_gpu, job_queue=job_queue, job_def=job_def) parent_job_ids = [job_id] job_queue = None job_def = None
[docs] def get_split_ind(self): return int(os.environ.get('AWS_BATCH_JOB_ARRAY_INDEX', 0))