Source code for rastervision.core.rv_pipeline.rv_pipeline

from typing import TYPE_CHECKING
from os.path import join
import logging
import tempfile
import shutil
from functools import lru_cache

import click
import numpy as np

from rastervision.pipeline.pipeline import Pipeline
from rastervision.core.box import Box
from rastervision.core.data_sample import DataSample
from rastervision.core.data import Scene, Labels
from rastervision.core.backend import Backend
from rastervision.pipeline.file_system.utils import (
    download_if_needed, zipdir, get_local_path, upload_or_copy, make_dir,
    sync_from_dir, file_exists)

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from rastervision.core.rv_pipeline import RVPipelineConfig

ALL_COMMANDS = ['analyze', 'chip', 'train', 'predict', 'eval', 'bundle']
SPLITTABLE_COMMANDS = ['chip', 'predict']
GPU_COMMANDS = ['train', 'predict']


[docs]class RVPipeline(Pipeline): """Base class of all Raster Vision Pipelines. This can be subclassed to implement Pipelines for different computer vision tasks over geospatial imagery. The commands and what they produce include: - analyze: metrics on the imagery and labels - chip: small training and validation images taken from larger scenes - train: model trained on chips - predict: predictions over entire validation and test scenes - eval: evaluation metrics for predictions generated by model - bundle: bundle containing model and any other files needed to make predictions using the Predictor. """
[docs] def __init__(self, config: 'RVPipelineConfig', tmp_dir: str): super().__init__(config, tmp_dir) self.backend: 'Backend | None' = None self.config: 'RVPipelineConfig'
@property def commands(self): commands = ALL_COMMANDS[:] if len(self.config.analyzers) == 0 and 'analyze' in commands: commands.remove('analyze') click.secho("Skipping 'analyze' command...", fg='green', bold=True) commands = self.config.backend.filter_commands(commands) return commands @property def split_commands(self): return self.config.backend.filter_commands(SPLITTABLE_COMMANDS) @property def gpu_commands(self): return self.config.backend.filter_commands(GPU_COMMANDS)
[docs] def analyze(self): """Run each analyzer over training scenes.""" dataset = self.config.dataset class_config = dataset.class_config scene_id_to_cfg = {s.id: s for s in dataset.all_scenes} @lru_cache(maxsize=len(dataset.all_scenes)) def build_scene(scene_id: str) -> Scene: cfg = scene_id_to_cfg[scene_id] scene = cfg.build( class_config, self.tmp_dir, use_transformers=False) return scene # build and run each AnalyzerConfig for each scene group for a in self.config.analyzers: for group_name, group_ids in dataset.scene_groups.items(): if len(group_ids) == 0: log.info(f'Skipping scene group "{group_name}". ' 'Empty scene group.') continue group_scenes = (build_scene(id) for id in group_ids) analyzer = a.build(scene_group=(group_name, group_scenes)) log.info(f'Running {type(analyzer).__name__} on ' f'scene group "{group_name}"...') analyzer.process(group_scenes, self.tmp_dir)
[docs] def get_train_windows(self, scene: Scene) -> list[Box]: """Return the training windows for a Scene. Each training window represents the spatial extent of a training chip to generate. Args: scene: Scene to generate windows for """ raise NotImplementedError()
[docs] def get_train_labels(self, window: Box, scene: Scene) -> Labels: """Return the training labels in a window for a scene. Returns: Labels that lie within window """ raise NotImplementedError()
[docs] def chip(self, split_ind: int = 0, num_splits: int = 1): """Save training and validation chips.""" cfg = self.config log.info(f'Chip options: {cfg.chip_options}') dataset = cfg.dataset.get_split_config(split_ind, num_splits) if not dataset.train_scenes and not dataset.validation_scenes: return backend = cfg.backend.build(cfg, self.tmp_dir) backend.chip_dataset(dataset, cfg.chip_options)
[docs] def train(self): """Train a model and save it.""" backend = self.config.backend.build(self.config, self.tmp_dir) backend.train(source_bundle_uri=self.config.source_bundle_uri)
[docs] def post_process_sample(self, sample: DataSample) -> DataSample: """Post-process sample in pipeline-specific way. This should be called before writing a sample during chipping. """ return sample
[docs] def post_process_batch(self, windows: list[Box], chips: np.ndarray, labels: Labels) -> Labels: """Post-process a batch of predictions.""" return labels
[docs] def post_process_predictions(self, labels: Labels, scene: Scene) -> Labels: """Post-process all labels at end of prediction.""" return labels
[docs] def predict(self, split_ind=0, num_splits=1): """Make predictions over each validation and test scene. This uses a sliding window. """ if self.backend is None: self.build_backend() class_config = self.config.dataset.class_config dataset = self.config.dataset.get_split_config(split_ind, num_splits) for scene_config in (dataset.validation_scenes + dataset.test_scenes): scene = scene_config.build(class_config, self.tmp_dir) labels = self.predict_scene(scene) scene.label_store.save(labels)
[docs] def predict_scene(self, scene: Scene) -> Labels: if self.backend is None: self.build_backend() labels = self.backend.predict_scene( scene, predict_options=self.config.predict_options) labels = self.post_process_predictions(labels, scene) return labels
[docs] def eval(self): """Evaluate predictions against ground truth.""" dataset = self.config.dataset class_config = dataset.class_config # it might make sense to make excluded_groups a field in an EvalConfig # in the future excluded_groups = ['train_scenes'] scene_id_to_cfg = {s.id: s for s in dataset.all_scenes} @lru_cache(maxsize=len(dataset.all_scenes)) def build_scene(scene_id: str) -> Scene: cfg = scene_id_to_cfg[scene_id] scene = cfg.build( class_config, self.tmp_dir, use_transformers=True) return scene # build and run each EvaluatorConfig for each scene group for e in self.config.evaluators: for group_name, group_ids in dataset.scene_groups.items(): if group_name in excluded_groups: continue if len(group_ids) == 0: log.info(f'Skipping scene group "{group_name}". ' 'Empty scene group.') continue group_scenes = (build_scene(id) for id in group_ids) evaluator = e.build( class_config, scene_group=(group_name, group_scenes)) log.info(f'Running {type(evaluator).__name__} on ' f'scene group "{group_name}"...') try: evaluator.process(group_scenes, self.tmp_dir) except FileNotFoundError: log.warning(f'Skipping scene group "{group_name}". ' 'Either labels or predictions are missing for ' 'some scene.')
[docs] def bundle(self): """Save a model bundle with whatever is needed to make predictions. The model bundle is a zip file and it is used by the Predictor and predict CLI subcommand. """ with tempfile.TemporaryDirectory(dir=self.tmp_dir) as tmp_dir: bundle_dir = join(tmp_dir, 'bundle') make_dir(bundle_dir) for fn in self.config.backend.get_bundle_filenames(): path = download_if_needed( join(self.config.train_uri, fn), tmp_dir) shutil.copy(path, join(bundle_dir, fn)) if file_exists(self.config.analyze_uri, include_dir=True): analyze_dst = join(bundle_dir, 'analyze') sync_from_dir(self.config.analyze_uri, analyze_dst) path = download_if_needed(self.config.get_config_uri(), tmp_dir) shutil.copy(path, join(bundle_dir, 'pipeline-config.json')) model_bundle_uri = self.config.get_model_bundle_uri() model_bundle_path = get_local_path(model_bundle_uri, self.tmp_dir) zipdir(bundle_dir, model_bundle_path) upload_or_copy(model_bundle_path, model_bundle_uri)
[docs] def build_backend(self, uri: str | None = None) -> None: self.backend = self.config.backend.build(self.config, self.tmp_dir) self.backend.load_model(uri)