from typing import (TYPE_CHECKING, Any, Iterable, Sequence)
from abc import abstractmethod
import numpy as np
from rasterio.features import rasterize
from shapely.ops import transform
from rastervision.core.box import Box
from rastervision.core.data.label import Labels
from rastervision.core.data.label.utils import discard_prediction_edges
if TYPE_CHECKING:
from typing import Self
from shapely.geometry import Polygon
from rastervision.core.data import (ClassConfig, CRSTransformer,
VectorOutputConfig)
[docs]class SemanticSegmentationLabels(Labels):
"""Representation of Semantic Segmentation labels."""
[docs] def __init__(self, extent: Box, num_classes: int, dtype: np.dtype):
"""Constructor.
Args:
extent (Box): The extent of the region to which the labels belong,
in global coordinates.
num_classes (int): Number of classes.
"""
self.extent = extent
self.num_classes = num_classes
self.ymin, self.xmin, self.width, self.height = extent.to_xywh()
self.dtype = dtype
[docs] @abstractmethod
def __add__(self, other: 'Self') -> 'Self':
"""Merge self with other labels."""
[docs] def __setitem__(self, window: Box, values: np.ndarray) -> None:
"""Set labels for the given window."""
self.add_window(window, values)
@abstractmethod
def __delitem__(self, window: Box) -> None:
"""Delete labels for the given window."""
[docs] @abstractmethod
def __getitem__(self, window: Box) -> np.ndarray:
"""Get labels for the given window."""
[docs] @abstractmethod
def add_window(self, window: Box, values: np.ndarray) -> list[Box]:
"""Set labels for the given window."""
[docs] @abstractmethod
def get_label_arr(self, window: Box,
null_class_id: int = -1) -> np.ndarray:
"""Get labels as a 2D array of class IDs.
Note: The returned array is not guaranteed to be the same size as the
input window.
"""
[docs] @abstractmethod
def get_score_arr(self, window: Box,
null_class_id: int = -1) -> np.ndarray:
"""Get (C, H, W) array of pixel scores."""
[docs] def get_class_mask(self,
window: Box,
class_id: int,
threshold: float | None = None) -> np.ndarray:
"""Get a binary mask representing all pixels of a class."""
scores = self.get_score_arr(window)
if threshold is None:
threshold = (1 / self.num_classes)
mask = scores[class_id] >= threshold
return mask
[docs] def get_windows(self, **kwargs) -> list[Box]:
"""Generate sliding windows over the local extent.
The keyword args are passed to :meth:`.Box.get_windows` and can
therefore be used to control the specifications of the windows.
If the keyword args do not contain size, a list of length 1,
containing the full extent is returned.
Args:
**kwargs: Extra args for :meth:`.Box.get_windows`.
"""
size: int | None = kwargs.pop('size', None)
if size is None:
return [self.extent]
return self.extent.get_windows(size, size, **kwargs)
[docs] def filter_by_aoi(self, aoi_polygons: list['Polygon'], null_class_id: int,
**kwargs) -> 'Self':
"""Keep only the values that lie inside the AOI.
This is an inplace operation.
Args:
aoi_polygons (list[Polygon]): AOI polygons to filter by, in pixel
coordinates.
null_class_id (int): Class ID to assign to pixels falling outside
the AOI polygons.
**kwargs: Extra args for
:meth:`.SemanticSegmentationLabels.get_windows`.
"""
if not aoi_polygons:
return self
for window in self.get_windows(**kwargs):
self._filter_window_by_aoi(window, aoi_polygons, null_class_id)
return self
[docs] @abstractmethod
def mask_fill(self, window: Box, mask: np.ndarray,
fill_value: Any) -> None:
"""Given a window and a binary mask, set all the pixels in the window
for which the mask is ON to the fill_value.
"""
def _filter_window_by_aoi(self, window: Box, aoi_polygons: list['Polygon'],
null_class_id: int) -> None:
window_geom = window.to_shapely()
label_arr = self[window]
# For each aoi_polygon, intersect with window, and
# put in window frame of reference.
window_aois = []
for aoi in aoi_polygons:
window_aoi = aoi.intersection(window_geom)
if not window_aoi.is_empty:
def transform_shape(x, y, z=None):
return (x - window.xmin, y - window.ymin)
window_aoi = transform(transform_shape, window_aoi)
window_aois.append(window_aoi)
# If window doesn't overlap with any AOI, then it won't be in
# new_labels.
if window_aois:
# If window intersects with AOI, set pixels outside the
# AOI polygon to 0 so they are ignored during eval.
mask = rasterize(
[(p, 0) for p in window_aois],
out_shape=label_arr.shape[-2:],
fill=1,
dtype=np.uint8)
mask = mask.astype(bool)
self.mask_fill(window, mask, null_class_id)
else:
del self[window]
[docs] @classmethod
def make_empty(cls, extent: Box, num_classes: int,
smooth: bool = False) -> 'Self':
"""Instantiate an empty instance.
Args:
extent (Box): The extent of the region to which the labels belong,
in global coordinates.
num_classes (int): Number of classes.
smooth (bool): If True, creates a
SemanticSegmentationSmoothLabels object. If False, creates a
SemanticSegmentationDiscreteLabels object. Defaults to False.
Returns:
If smooth=True, returns a SemanticSegmentationSmoothLabels.
Otherwise, a SemanticSegmentationDiscreteLabels.
Raises:
ValueError: if num_classes and extent are not specified, but
smooth=True.
"""
if not smooth:
return SemanticSegmentationDiscreteLabels.make_empty(
extent=extent, num_classes=num_classes)
else:
return SemanticSegmentationSmoothLabels.make_empty(
extent=extent, num_classes=num_classes)
[docs] @classmethod
def from_predictions(cls,
windows: Iterable['Box'],
predictions: Iterable[Any],
extent: Box,
num_classes: int,
smooth: bool = False,
crop_sz: int | None = None) -> 'Self':
"""Instantiate from windows and their corresponding predictions.
Args:
windows: Boxes in pixel coords, specifying chips in the raster.
predictions: The model predictions for each chip specified by the
windows.
extent: The extent of the region to which the labels belong, in
global coordinates.
num_classes: Number of classes.
smooth: If ``True``, creates a ``SemanticSegmentationSmoothLabels``
object. If ``False``, creates a
``SemanticSegmentationDiscreteLabels`` object.
Defaults to ``False``.
crop_sz: Number of rows/columns of pixels from the
edge of prediction windows to discard. This is useful because
predictions near edges tend to be lower quality and can result
in very visible artifacts near the edges of chips. This should
only be used if the given windows represent a sliding-window
grid over the scene extent with overlap between adjacent
windows. Defaults to None.
Returns:
If smooth=True, returns a SemanticSegmentationSmoothLabels.
Otherwise, a SemanticSegmentationDiscreteLabels.
"""
labels = cls.make_empty(extent, num_classes, smooth=smooth)
labels.add_predictions(windows, predictions, crop_sz=crop_sz)
return labels
[docs] def add_predictions(self,
windows: Iterable['Box'],
predictions: Iterable[Any],
crop_sz: int | None = None) -> None:
"""Populate predictions.
Args:
windows: Boxes in pixel coords, specifying chips in the raster.
predictions: The model predictions for each chip specified by the
windows.
crop_sz: Number of rows/columns of pixels from the
edge of prediction windows to discard. This is useful because
predictions near edges tend to be lower quality and can result
in very visible artifacts near the edges of chips. This should
only be used if the given windows represent a sliding-window
grid over the scene extent with overlap between adjacent
windows. Defaults to None.
"""
if crop_sz is not None:
windows, predictions = discard_prediction_edges(
windows, predictions, crop_sz)
# If predictions is tqdm-wrapped, it needs to be the first arg to zip()
# or the progress bar won't terminate with the correct count.
for prediction, window in zip(predictions, windows):
self[window] = prediction
[docs]class SemanticSegmentationDiscreteLabels(SemanticSegmentationLabels):
"""Vote-counts for each pixel belonging to each class.
Maintains a num_classes x H x W array where value_{ijk} represents how
many times pixel_{jk} has been classified as class i. A label array can be
obtained from this by argmax'ing along the class dimension. Can also be
turned into a score converting counts to probabilities.
"""
[docs] def __init__(self, extent: Box, num_classes: int, dtype: Any = np.uint8):
"""Constructor.
Args:
extent: The extent of the region to which the labels belong, in
global coordinates.
num_classes: Number of classes.
dtype: dtype of the counts array. Defaults to np.uint8.
"""
super().__init__(extent, num_classes, dtype)
self.pixel_counts = np.zeros(
(self.num_classes, self.height, self.width), dtype=self.dtype)
# track which pixels have been hit at all
self.hit_mask = np.zeros((self.height, self.width), dtype=bool)
[docs] def __add__(self, other: 'Self') -> 'Self':
"""Merge self with other labels by adding the pixel counts."""
if self.extent != other.extent:
raise ValueError('Cannot add labels with unqeual extents.')
self.pixel_counts += other.pixel_counts
return self
def __eq__(self, other: 'Self') -> bool:
if not isinstance(other, SemanticSegmentationDiscreteLabels):
return False
if self.extent != other.extent:
return False
mask_equal = np.all(self.hit_mask == other.hit_mask)
if not mask_equal:
return False
counts_equal = np.all(self.pixel_counts == other.pixel_counts)
return counts_equal
def __delitem__(self, window: Box) -> None:
"""Reset counts to zero for pixels in the window."""
y0, x0, y1, x1 = window.intersection(self.extent)
self.pixel_counts[..., y0:y1, x0:x1] = 0
self.hit_mask[y0:y1, x0:x1] = False
[docs] def __getitem__(self, window: Box) -> np.ndarray:
return self.get_label_arr(window)
[docs] def add_window(self, window: Box, pixel_class_ids: np.ndarray) -> None:
# sub-window in self.extent coords to write to
window_dst = window.intersection(self.extent)
# sub-window in pixel_class_ids coords to read from
window_src = window_dst.to_global_coords(
self.extent).to_local_coords(window)
# read sub-window from source array
src_yslice, src_xslice = window_src.to_slices()
pixel_class_ids = pixel_class_ids.astype(self.dtype)
pixel_class_ids = pixel_class_ids[..., src_yslice, src_xslice]
# write sub-window in destination array
dst_yslice, dst_xslice = window_dst.to_slices()
window_pixel_counts = self.pixel_counts[:, dst_yslice, dst_xslice]
for ch_class_id, ch in enumerate(window_pixel_counts):
ch[pixel_class_ids == ch_class_id] += 1
self.hit_mask[dst_yslice, dst_xslice] = True
[docs] def get_label_arr(self, window: Box,
null_class_id: int = -1) -> np.ndarray:
"""Get labels as array of class IDs.
Returns null_class_id for pixels for which there is no data.
"""
y0, x0, y1, x1 = window.intersection(self.extent)
label_arr = self.pixel_counts[..., y0:y1, x0:x1].argmax(axis=0)
hit_mask = self.hit_mask[y0:y1, x0:x1]
return np.where(hit_mask, label_arr, null_class_id)
[docs] def get_score_arr(self, window: Box) -> np.ndarray:
"""Get array of pixel scores."""
y0, x0, y1, x1 = window.intersection(self.extent)
class_counts = self.pixel_counts[..., y0:y1, x0:x1]
scores = class_counts / class_counts.sum(axis=0)
return scores
[docs] def mask_fill(self, window: Box, mask: np.ndarray,
fill_value: Any) -> None:
"""Set fill_value'th class ID's count to 1 and all others to zero."""
class_id = fill_value
y0, x0, y1, x1 = window.intersection(self.extent)
h, w = y1 - y0, x1 - x0
mask = mask[:h, :w]
self.pixel_counts[:, y0:y1, x0:x1][..., mask] = 0
self.pixel_counts[class_id, y0:y1, x0:x1][mask] = 1
[docs] @classmethod
def make_empty(cls, extent: Box, num_classes: int) -> 'Self':
"""Instantiate an empty instance."""
return cls(extent=extent, num_classes=num_classes)
[docs] @classmethod
def from_predictions(cls,
windows: Iterable['Box'],
predictions: Iterable[Any],
extent: Box,
num_classes: int,
crop_sz: int | None = None) -> 'Self':
labels = cls.make_empty(extent, num_classes)
labels.add_predictions(windows, predictions, crop_sz=crop_sz)
return labels
[docs] def save(self,
uri: str,
crs_transformer: 'CRSTransformer',
class_config: 'ClassConfig',
bbox: Box | None = None,
tmp_dir: str | None = None,
save_as_rgb: bool = False,
raster_output: bool = True,
rasterio_block_size: int = 512,
vector_outputs: 'Sequence[VectorOutputConfig] | None' = None,
profile_overrides: dict | None = None) -> None:
"""Save labels as a raster and/or vectors.
If URI is remote, all files will be first written locally and then
uploaded to the URI.
Args:
uri: URI of directory in which to save all output files.
crs_transformer: CRSTransformer to configure CRS and affine
transform of the output GeoTiff.
class_config: The ClassConfig.
bbox: User-specified crop of the extent. Must be provided if the
corresponding RasterSource has ``bbox != extent``.
tmp_dir: Temporary directory to use. If None, will be
auto-generated. Defaults to ``None``.
save_as_rgb: If ``True``, Saves labels as an RGB image, using the
class-color mapping in the class_config. Defaults to ``False``.
raster_output: If ``True``, saves labels as a raster of class IDs
(one band). Defaults to ``True``.
rasterio_block_size: Value to set `blockxsize` and ``blockysize``
to. Defaults to ``512``.
vector_outputs: List of VectorOutputConfig's containing
vectorization configuration information. Only classes for which
a ``VectorOutputConfig`` is specified will be saved as vectors.
If ``None``, no vector outputs will be produced.
Defaults to ``None``.
profile_overrides: This can be used to arbitrarily override
properties in the profile used to create the output GeoTiff.
Defaults to ``None``.
"""
from rastervision.core.data import SemanticSegmentationLabelStore
label_store = SemanticSegmentationLabelStore(
uri=uri,
crs_transformer=crs_transformer,
class_config=class_config,
bbox=bbox,
tmp_dir=tmp_dir,
save_as_rgb=save_as_rgb,
discrete_output=raster_output,
smooth_output=False,
rasterio_block_size=rasterio_block_size,
vector_outputs=vector_outputs)
label_store.save(self, profile=profile_overrides)
[docs]class SemanticSegmentationSmoothLabels(SemanticSegmentationLabels):
"""Membership-scores for each pixel for each class.
Maintains a num_classes x H x W array where value_{ijk} represents the
probability (or some other measure) of pixel_{jk} belonging to class i.
A discrete label array can be obtained from this by argmax'ing along the
class dimension.
"""
[docs] def __init__(self,
extent: Box,
num_classes: int,
dtype: Any = np.float16,
dtype_hits: Any = np.uint8):
"""Constructor.
Args:
extent: The extent of the region to which the labels belong,
in global coordinates.
num_classes: Number of classes.
dtype: ``dtype`` of the scores array. Defaults to ``np.float16``.
dtype_hits: ``dtype`` of the hits array. Defaults to ``np.uint8``.
"""
super().__init__(extent, num_classes, dtype)
self.pixel_scores = np.zeros(
(self.num_classes, self.height, self.width), dtype=self.dtype)
self.pixel_hits = np.zeros((self.height, self.width), dtype=dtype_hits)
[docs] def __add__(self, other: 'Self') -> 'Self':
"""Merge self with other by adding pixel scores and hits."""
if self.extent != other.extent:
raise ValueError('Cannot add labels with unqeual extents.')
self.pixel_scores += other.pixel_scores
self.pixel_hits += other.pixel_hits
return self
def __eq__(self, other: 'Self') -> bool:
if not isinstance(other, SemanticSegmentationSmoothLabels):
return False
if self.extent != other.extent:
return False
scores_equal = np.allclose(self.pixel_scores, other.pixel_scores)
hits_equal = np.array_equal(self.pixel_hits, other.pixel_hits)
return (scores_equal and hits_equal)
def __delitem__(self, window: Box) -> None:
"""Reset scores and hits to zero for pixels in the window."""
y0, x0, y1, x1 = window.intersection(self.extent)
self.pixel_scores[..., y0:y1, x0:x1] = 0
self.pixel_hits[..., y0:y1, x0:x1] = 0
[docs] def __getitem__(self, window: Box) -> np.ndarray:
return self.get_score_arr(window)
[docs] def add_window(self, window: Box, pixel_class_scores: np.ndarray) -> None:
# sub-window in self.extent coords to write to
window_dst = window.intersection(self.extent)
# sub-window in pixel_class_scores coords to read from
window_src = window_dst.to_global_coords(
self.extent).to_local_coords(window)
# read sub-window from source array
src_yslice, src_xslice = window_src.to_slices()
pixel_class_scores = pixel_class_scores.astype(self.dtype)
pixel_class_scores = pixel_class_scores[..., src_yslice, src_xslice]
# write sub-window in destination array
dst_yslice, dst_xslice = window_dst.to_slices()
self.pixel_scores[..., dst_yslice, dst_xslice] += pixel_class_scores
self.pixel_hits[dst_yslice, dst_xslice] += 1
[docs] def get_score_arr(self, window: Box) -> np.ndarray:
"""Get array of pixel scores."""
y0, x0, y1, x1 = window.intersection(self.extent)
scores = self.pixel_scores[..., y0:y1, x0:x1]
hits = self.pixel_hits[y0:y1, x0:x1]
avg_scores = scores / hits
return avg_scores
[docs] def get_label_arr(self, window: Box,
null_class_id: int = -1) -> np.ndarray:
"""Get labels as array of class IDs.
Returns null_class_id for pixels for which there is no data.
"""
avg_scores = self.get_score_arr(window)
label_arr = np.argmax(avg_scores, axis=0)
mask = np.isnan(avg_scores[0])
return np.where(mask, null_class_id, label_arr)
[docs] def mask_fill(self, window: Box, mask: np.ndarray,
fill_value: Any) -> None:
"""Set fill_value'th class ID's score to 1 and all others to zero."""
class_id = fill_value
y0, x0, y1, x1 = window.intersection(self.extent)
h, w = y1 - y0, x1 - x0
mask = mask[:h, :w]
self.pixel_scores[..., y0:y1, x0:x1][..., mask] = 0
self.pixel_scores[class_id, y0:y1, x0:x1][mask] = 1
self.pixel_hits[y0:y1, x0:x1][mask] = 1
[docs] @classmethod
def make_empty(cls, extent: Box, num_classes: int) -> 'Self':
"""Instantiate an empty instance."""
return cls(extent=extent, num_classes=num_classes)
[docs] @classmethod
def from_predictions(cls,
windows: Iterable['Box'],
predictions: Iterable[Any],
extent: Box,
num_classes: int,
crop_sz: int | None = None) -> 'Self':
labels = cls.make_empty(extent, num_classes)
labels.add_predictions(windows, predictions, crop_sz=crop_sz)
return labels
[docs] def save(self,
uri: str,
crs_transformer: 'CRSTransformer',
class_config: 'ClassConfig',
bbox: Box | None = None,
tmp_dir: str | None = None,
save_as_rgb: bool = False,
discrete_output: bool = True,
smooth_output: bool = True,
smooth_as_uint8: bool = False,
rasterio_block_size: int = 512,
vector_outputs: 'Sequence[VectorOutputConfig] | None' = None,
profile_overrides: dict | None = None) -> None:
"""Save labels as rasters and/or vectors.
If URI is remote, all files will be first written locally and then
uploaded to the URI.
Args:
uri: URI of directory in which to save all output files.
crs_transformer: CRSTransformer to configure CRS and affine
transform of the output GeoTiff(s).
class_config: The ClassConfig.
bbox: User-specified crop of the extent. Must be provided if the
corresponding RasterSource has bbox != extent.
tmp_dir: Temporary directory to use. If ``None``, will be
auto-generated. Defaults to ``None``.
save_as_rgb: If True, saves labels as an RGB image, using the
class-color mapping in the ``class_config``.
Defaults to ``False``.
discrete_output: If ``True``, saves labels as a raster of class IDs
(one band). Defaults to ``True``.
smooth_output: If ``True``, saves labels as a raster of class
scores (one band for each class). Defaults to ``True``.
smooth_as_uint8: If ``True``, stores smooth class scores as
``np.uint8`` (0-255) values rather than as ``np.float32``
discrete labels, to help save memory/disk space.
Defaults to ``False``.
rasterio_block_size: Value to set ``blockxsize`` and ``blockysize``
to. Defaults to ``512``.
vector_outputs: List of VectorOutputConfig's containing
vectorization configuration information. Only classes for which
a ``VectorOutputConfig`` is specified will be saved as vectors.
If ``None``, no vector outputs will be produced.
Defaults to ``None``.
profile_overrides: This can be used to arbitrarily override
properties in the profile used to create the output GeoTiff(s).
Defaults to ``None``.
"""
from rastervision.core.data import SemanticSegmentationLabelStore
label_store = SemanticSegmentationLabelStore(
uri=uri,
crs_transformer=crs_transformer,
class_config=class_config,
bbox=bbox,
tmp_dir=tmp_dir,
save_as_rgb=save_as_rgb,
discrete_output=discrete_output,
smooth_output=smooth_output,
smooth_as_uint8=smooth_as_uint8,
rasterio_block_size=rasterio_block_size,
vector_outputs=vector_outputs)
label_store.save(self, profile=profile_overrides)