Source code for rastervision.core.rv_pipeline.rv_pipeline

import logging
from os.path import join
import tempfile
import shutil
from typing import TYPE_CHECKING, Optional, List
from functools import lru_cache

import click
import numpy as np
from import tqdm

from rastervision.pipeline.pipeline import Pipeline
from import Box
from rastervision.core.data_sample import DataSample
from import Scene, Labels
from rastervision.core.backend import Backend
from rastervision.core.rv_pipeline import TRAIN, VALIDATION
from rastervision.pipeline.file_system.utils import (
    download_if_needed, zipdir, get_local_path, upload_or_copy, make_dir,
    sync_to_dir, file_exists)

log = logging.getLogger(__name__)

    from rastervision.core.rv_pipeline import RVPipelineConfig

ALL_COMMANDS = ['analyze', 'chip', 'train', 'predict', 'eval', 'bundle']
SPLITTABLE_COMMANDS = ['chip', 'predict']
GPU_COMMANDS = ['train', 'predict']

[docs]class RVPipeline(Pipeline): """Base class of all Raster Vision Pipelines. This can be subclassed to implement Pipelines for different computer vision tasks over geospatial imagery. The commands and what they produce include: - analyze: metrics on the imagery and labels - chip: small training and validation images taken from larger scenes - train: model trained on chips - predict: predictions over entire validation and test scenes - eval: evaluation metrics for predictions generated by model - bundle: bundle containing model and any other files needed to make predictions using the Predictor. """
[docs] def __init__(self, config: 'RVPipelineConfig', tmp_dir: str): super().__init__(config, tmp_dir) self.backend: Optional['Backend'] = None self.config: 'RVPipelineConfig'
@property def commands(self): commands = ALL_COMMANDS[:] if len(self.config.analyzers) == 0 and 'analyze' in commands: commands.remove('analyze') click.secho("Skipping 'analyze' command...", fg='green', bold=True) commands = self.config.backend.filter_commands(commands) return commands @property def split_commands(self): return self.config.backend.filter_commands(SPLITTABLE_COMMANDS) @property def gpu_commands(self): return self.config.backend.filter_commands(GPU_COMMANDS)
[docs] def analyze(self): """Run each analyzer over training scenes.""" dataset = self.config.dataset class_config = dataset.class_config scene_id_to_cfg = { s for s in dataset.all_scenes} @lru_cache(maxsize=len(dataset.all_scenes)) def build_scene(scene_id: str) -> Scene: cfg = scene_id_to_cfg[scene_id] scene = class_config, self.tmp_dir, use_transformers=False) return scene # build and run each AnalyzerConfig for each scene group for a in self.config.analyzers: for group_name, group_ids in dataset.scene_groups.items(): if len(group_ids) == 0:'Skipping scene group "{group_name}". ' 'Empty scene group.') continue group_scenes = (build_scene(id) for id in group_ids) analyzer =, group_scenes))'Running {type(analyzer).__name__} on ' f'scene group "{group_name}"...') analyzer.process(group_scenes, self.tmp_dir)
[docs] def get_train_windows(self, scene: Scene) -> List[Box]: """Return the training windows for a Scene. Each training window represents the spatial extent of a training chip to generate. Args: scene: Scene to generate windows for """ raise NotImplementedError()
[docs] def get_train_labels(self, window: Box, scene: Scene) -> Labels: """Return the training labels in a window for a scene. Returns: Labels that lie within window """ raise NotImplementedError()
[docs] def chip(self, split_ind: int = 0, num_splits: int = 1): """Save training and validation chips.""" cfg = self.config backend =, self.tmp_dir) dataset = cfg.dataset.get_split_config(split_ind, num_splits) if not dataset.train_scenes and not dataset.validation_scenes: return class_cfg = dataset.class_config with backend.get_sample_writer() as writer: def chip_scene(scene, split): windows = self.get_train_windows(scene) with tqdm( windows, desc=f'Making {split} chips from scene {}', mininterval=0.5) as bar: for window in bar: chip = scene.raster_source.get_chip(window) labels = self.get_train_labels(window, scene) sample = DataSample( chip=chip, window=window, labels=labels, scene_id=str(, is_train=split == TRAIN) sample = self.post_process_sample(sample) writer.write_sample(sample) for s in dataset.train_scenes: chip_scene(, self.tmp_dir), TRAIN) for s in dataset.validation_scenes: chip_scene(, self.tmp_dir), VALIDATION)
[docs] def train(self): """Train a model and save it.""" backend =, self.tmp_dir) backend.train(source_bundle_uri=self.config.source_bundle_uri)
[docs] def post_process_sample(self, sample: DataSample) -> DataSample: """Post-process sample in pipeline-specific way. This should be called before writing a sample during chipping. """ return sample
[docs] def post_process_batch(self, windows: List[Box], chips: np.ndarray, labels: Labels) -> Labels: """Post-process a batch of predictions.""" return labels
[docs] def post_process_predictions(self, labels: Labels, scene: Scene) -> Labels: """Post-process all labels at end of prediction.""" return labels
[docs] def predict(self, split_ind=0, num_splits=1): """Make predictions over each validation and test scene. This uses a sliding window. """ # Cache backend so subsquent calls will be faster. This is useful for # the predictor. if self.backend is None: self.backend =, self.tmp_dir) self.backend.load_model() class_config = self.config.dataset.class_config dataset = self.config.dataset.get_split_config(split_ind, num_splits) for scene_config in (dataset.validation_scenes + dataset.test_scenes): scene =, self.tmp_dir) labels = self.predict_scene(scene, self.backend) labels = self.post_process_predictions(labels, scene)
[docs] def predict_scene(self, scene: Scene, backend: Backend) -> Labels: chip_sz = self.config.predict_chip_sz stride = chip_sz return backend.predict_scene(scene, chip_sz=chip_sz, stride=stride)
[docs] def eval(self): """Evaluate predictions against ground truth.""" dataset = self.config.dataset class_config = dataset.class_config # it might make sense to make excluded_groups a field in an EvalConfig # in the future excluded_groups = ['train_scenes'] scene_id_to_cfg = { s for s in dataset.all_scenes} @lru_cache(maxsize=len(dataset.all_scenes)) def build_scene(scene_id: str) -> Scene: cfg = scene_id_to_cfg[scene_id] scene = class_config, self.tmp_dir, use_transformers=True) return scene # build and run each EvaluatorConfig for each scene group for e in self.config.evaluators: for group_name, group_ids in dataset.scene_groups.items(): if group_name in excluded_groups: continue if len(group_ids) == 0:'Skipping scene group "{group_name}". ' 'Empty scene group.') continue group_scenes = (build_scene(id) for id in group_ids) evaluator = class_config, scene_group=(group_name, group_scenes))'Running {type(evaluator).__name__} on ' f'scene group "{group_name}"...') try: evaluator.process(group_scenes, self.tmp_dir) except FileNotFoundError: log.warn(f'Skipping scene group "{group_name}". ' 'Either labels or predictions are missing for ' 'some scene.')
[docs] def bundle(self): """Save a model bundle with whatever is needed to make predictions. The model bundle is a zip file and it is used by the Predictor and predict CLI subcommand. """ with tempfile.TemporaryDirectory(dir=self.tmp_dir) as tmp_dir: bundle_dir = join(tmp_dir, 'bundle') make_dir(bundle_dir) for fn in self.config.backend.get_bundle_filenames(): path = download_if_needed( join(self.config.train_uri, fn), tmp_dir) shutil.copy(path, join(bundle_dir, fn)) if file_exists(self.config.analyze_uri, include_dir=True): sync_to_dir(self.config.analyze_uri, join( bundle_dir, 'analyze')) path = download_if_needed(self.config.get_config_uri(), tmp_dir) shutil.copy(path, join(bundle_dir, 'pipeline-config.json')) model_bundle_uri = self.config.get_model_bundle_uri() model_bundle_path = get_local_path(model_bundle_uri, self.tmp_dir) zipdir(bundle_dir, model_bundle_path) upload_or_copy(model_bundle_path, model_bundle_uri)