import logging
from os.path import join
import tempfile
import shutil
from typing import TYPE_CHECKING, Optional, List
from functools import lru_cache
import click
import numpy as np
from tqdm.auto import tqdm
from rastervision.pipeline.pipeline import Pipeline
from rastervision.core.box import Box
from rastervision.core.data_sample import DataSample
from rastervision.core.data import Scene, Labels
from rastervision.core.backend import Backend
from rastervision.core.rv_pipeline import TRAIN, VALIDATION
from rastervision.pipeline.file_system.utils import (
download_if_needed, zipdir, get_local_path, upload_or_copy, make_dir,
sync_to_dir, file_exists)
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from rastervision.core.rv_pipeline import RVPipelineConfig
ALL_COMMANDS = ['analyze', 'chip', 'train', 'predict', 'eval', 'bundle']
SPLITTABLE_COMMANDS = ['chip', 'predict']
GPU_COMMANDS = ['train', 'predict']
[docs]class RVPipeline(Pipeline):
"""Base class of all Raster Vision Pipelines.
This can be subclassed to implement Pipelines for different computer vision
tasks over geospatial imagery. The commands and what they produce include:
- analyze: metrics on the imagery and labels
- chip: small training and validation images taken from larger scenes
- train: model trained on chips
- predict: predictions over entire validation and test scenes
- eval: evaluation metrics for predictions generated by model
- bundle: bundle containing model and any other files needed to make
predictions using the Predictor.
"""
[docs] def __init__(self, config: 'RVPipelineConfig', tmp_dir: str):
super().__init__(config, tmp_dir)
self.backend: Optional['Backend'] = None
self.config: 'RVPipelineConfig'
@property
def commands(self):
commands = ALL_COMMANDS[:]
if len(self.config.analyzers) == 0 and 'analyze' in commands:
commands.remove('analyze')
click.secho("Skipping 'analyze' command...", fg='green', bold=True)
commands = self.config.backend.filter_commands(commands)
return commands
@property
def split_commands(self):
return self.config.backend.filter_commands(SPLITTABLE_COMMANDS)
@property
def gpu_commands(self):
return self.config.backend.filter_commands(GPU_COMMANDS)
[docs] def analyze(self):
"""Run each analyzer over training scenes."""
dataset = self.config.dataset
class_config = dataset.class_config
scene_id_to_cfg = {s.id: s for s in dataset.all_scenes}
@lru_cache(maxsize=len(dataset.all_scenes))
def build_scene(scene_id: str) -> Scene:
cfg = scene_id_to_cfg[scene_id]
scene = cfg.build(
class_config, self.tmp_dir, use_transformers=False)
return scene
# build and run each AnalyzerConfig for each scene group
for a in self.config.analyzers:
for group_name, group_ids in dataset.scene_groups.items():
if len(group_ids) == 0:
log.info(f'Skipping scene group "{group_name}". '
'Empty scene group.')
continue
group_scenes = (build_scene(id) for id in group_ids)
analyzer = a.build(scene_group=(group_name, group_scenes))
log.info(f'Running {type(analyzer).__name__} on '
f'scene group "{group_name}"...')
analyzer.process(group_scenes, self.tmp_dir)
[docs] def get_train_windows(self, scene: Scene) -> List[Box]:
"""Return the training windows for a Scene.
Each training window represents the spatial extent of a training chip to
generate.
Args:
scene: Scene to generate windows for
"""
raise NotImplementedError()
[docs] def get_train_labels(self, window: Box, scene: Scene) -> Labels:
"""Return the training labels in a window for a scene.
Returns:
Labels that lie within window
"""
raise NotImplementedError()
[docs] def chip(self, split_ind: int = 0, num_splits: int = 1):
"""Save training and validation chips."""
cfg = self.config
backend = cfg.backend.build(cfg, self.tmp_dir)
dataset = cfg.dataset.get_split_config(split_ind, num_splits)
if not dataset.train_scenes and not dataset.validation_scenes:
return
class_cfg = dataset.class_config
with backend.get_sample_writer() as writer:
def chip_scene(scene, split):
windows = self.get_train_windows(scene)
with tqdm(
windows,
desc=f'Making {split} chips from scene {scene.id}',
mininterval=0.5) as bar:
for window in bar:
chip = scene.raster_source.get_chip(window)
labels = self.get_train_labels(window, scene)
sample = DataSample(
chip=chip,
window=window,
labels=labels,
scene_id=str(scene.id),
is_train=split == TRAIN)
sample = self.post_process_sample(sample)
writer.write_sample(sample)
for s in dataset.train_scenes:
chip_scene(s.build(class_cfg, self.tmp_dir), TRAIN)
for s in dataset.validation_scenes:
chip_scene(s.build(class_cfg, self.tmp_dir), VALIDATION)
[docs] def train(self):
"""Train a model and save it."""
backend = self.config.backend.build(self.config, self.tmp_dir)
backend.train(source_bundle_uri=self.config.source_bundle_uri)
[docs] def post_process_sample(self, sample: DataSample) -> DataSample:
"""Post-process sample in pipeline-specific way.
This should be called before writing a sample during chipping.
"""
return sample
[docs] def post_process_batch(self, windows: List[Box], chips: np.ndarray,
labels: Labels) -> Labels:
"""Post-process a batch of predictions."""
return labels
[docs] def post_process_predictions(self, labels: Labels, scene: Scene) -> Labels:
"""Post-process all labels at end of prediction."""
return labels
[docs] def predict(self, split_ind=0, num_splits=1):
"""Make predictions over each validation and test scene.
This uses a sliding window.
"""
# Cache backend so subsquent calls will be faster. This is useful for
# the predictor.
if self.backend is None:
self.backend = self.config.backend.build(self.config, self.tmp_dir)
self.backend.load_model()
class_config = self.config.dataset.class_config
dataset = self.config.dataset.get_split_config(split_ind, num_splits)
for scene_config in (dataset.validation_scenes + dataset.test_scenes):
scene = scene_config.build(class_config, self.tmp_dir)
labels = self.predict_scene(scene, self.backend)
labels = self.post_process_predictions(labels, scene)
scene.label_store.save(labels)
[docs] def predict_scene(self, scene: Scene, backend: Backend) -> Labels:
chip_sz = self.config.predict_chip_sz
stride = chip_sz
return backend.predict_scene(scene, chip_sz=chip_sz, stride=stride)
[docs] def eval(self):
"""Evaluate predictions against ground truth."""
dataset = self.config.dataset
class_config = dataset.class_config
# it might make sense to make excluded_groups a field in an EvalConfig
# in the future
excluded_groups = ['train_scenes']
scene_id_to_cfg = {s.id: s for s in dataset.all_scenes}
@lru_cache(maxsize=len(dataset.all_scenes))
def build_scene(scene_id: str) -> Scene:
cfg = scene_id_to_cfg[scene_id]
scene = cfg.build(
class_config, self.tmp_dir, use_transformers=True)
return scene
# build and run each EvaluatorConfig for each scene group
for e in self.config.evaluators:
for group_name, group_ids in dataset.scene_groups.items():
if group_name in excluded_groups:
continue
if len(group_ids) == 0:
log.info(f'Skipping scene group "{group_name}". '
'Empty scene group.')
continue
group_scenes = (build_scene(id) for id in group_ids)
evaluator = e.build(
class_config, scene_group=(group_name, group_scenes))
log.info(f'Running {type(evaluator).__name__} on '
f'scene group "{group_name}"...')
try:
evaluator.process(group_scenes, self.tmp_dir)
except FileNotFoundError:
log.warn(f'Skipping scene group "{group_name}". '
'Either labels or predictions are missing for '
'some scene.')
[docs] def bundle(self):
"""Save a model bundle with whatever is needed to make predictions.
The model bundle is a zip file and it is used by the Predictor and
predict CLI subcommand.
"""
with tempfile.TemporaryDirectory(dir=self.tmp_dir) as tmp_dir:
bundle_dir = join(tmp_dir, 'bundle')
make_dir(bundle_dir)
for fn in self.config.backend.get_bundle_filenames():
path = download_if_needed(
join(self.config.train_uri, fn), tmp_dir)
shutil.copy(path, join(bundle_dir, fn))
if file_exists(self.config.analyze_uri, include_dir=True):
sync_to_dir(self.config.analyze_uri, join(
bundle_dir, 'analyze'))
path = download_if_needed(self.config.get_config_uri(), tmp_dir)
shutil.copy(path, join(bundle_dir, 'pipeline-config.json'))
model_bundle_uri = self.config.get_model_bundle_uri()
model_bundle_path = get_local_path(model_bundle_uri, self.tmp_dir)
zipdir(bundle_dir, model_bundle_path)
upload_or_copy(model_bundle_path, model_bundle_uri)