from typing import TYPE_CHECKING, Any, Iterable
import geopandas as gpd
from rastervision.core.data.label import ChipClassificationLabels
from rastervision.core.data.label_source.label_source import LabelSource
from rastervision.core.box import Box
if TYPE_CHECKING:
from rastervision.core.data import (ChipClassificationLabelSourceConfig,
CRSTransformer, VectorSource)
[docs]def infer_cells(cells: list[Box], labels_df: gpd.GeoDataFrame,
ioa_thresh: float, use_intersection_over_cell: bool,
pick_min_class_id: bool,
background_class_id: int) -> ChipClassificationLabels:
"""Infer ChipClassificationLabels grid from GeoJSON containing polygons.
Given GeoJSON with polygons associated with class_ids, infer a grid of
cells and class_ids that best captures the contents of each cell.
For each cell, the problem is to infer the class_id that best captures the
content of the cell. This is non-trivial since there can be multiple
polygons of differing classes overlapping with the cell. Any polygons that
sufficiently overlaps with the cell are in the running for setting the
class_id. If there are none in the running, the cell is either
considered null or background.
Args:
ioa_thresh: the minimum IOA of a polygon and cell for that
polygon to be a candidate for setting the class_id
use_intersection_over_cell: If ``True``, then use the area of the
cell as the denominator in the IOA. Otherwise, use the area of the
polygon.
background_class_id: If not ``None``, class_id to use as the
background class; ie. the one that is used when a window contains
no boxes.
pick_min_class_id: If ``True``, the class_id for a cell is the minimum
class_id of the boxes in that cell. Otherwise, pick the class_id of
the box covering the greatest area.
"""
cells_df = gpd.GeoDataFrame(
data={'cell_id': range(len(cells))},
geometry=[c.to_shapely() for c in cells])
# duplicate geometry columns so that they are retained after the join
cells_df.loc[:, 'geometry_cell'] = cells_df.geometry
labels_df.loc[:, 'geometry_label'] = labels_df.geometry
# Left-join cells to label polygons based on intersection. The result is a
# table with each cell matched to all polygons that intersect it; i.e.,
# there will be a row for each unique (cell, polygon) combination. Cells
# that didn't match any labels will have missing values as their class_ids.
df: gpd.GeoDataFrame = cells_df.sjoin(
labels_df, how='left', predicate='intersects')
df.loc[:, 'geometry_intersection'] = df['geometry_cell'].intersection(
df['geometry_label'])
if use_intersection_over_cell:
ioa = (df['geometry_intersection'].area / df['geometry_cell'].area)
else:
# intersection over label-polygon
ioa = (df['geometry_intersection'].area / df['geometry_label'].area)
df.loc[:, 'ioa'] = ioa.fillna(-1)
# labels with IOA below threshold cannot contribute their class_id
df.loc[df['ioa'] < ioa_thresh, 'class_id'] = None
# Assign background_class_id to cells /wo a class_id. This includes both
# unmatched cells and ones whose ioa fell below the ioa_thresh.
df.loc[df['class_id'].isna(), 'class_id'] = background_class_id
# break ties (i.e. one cell matched to multiple label polygons)
if pick_min_class_id:
df = df.sort_values('class_id').drop_duplicates(
['cell_id'], keep='first')
else:
# largest IOA
df = df.sort_values('ioa').drop_duplicates(['cell_id'], keep='last')
boxes = [Box.from_shapely(c).to_int() for c in df['geometry_cell']]
class_ids = df['class_id'].astype(int)
cells_to_class_id = {
cell: (class_id, None)
for cell, class_id in zip(boxes, class_ids)
}
labels = ChipClassificationLabels(cells_to_class_id)
return labels
[docs]def read_labels(labels_df: gpd.GeoDataFrame,
bbox: Box | None = None) -> ChipClassificationLabels:
"""Convert ``GeoDataFrame`` to ``ChipClassificationLabels``.
If the ``GeoDataFrame`` already contains a grid of cells, then
``ChipClassificationLabels`` can be constructed in a straightforward manner
without having to infer the class of cells.
If ``bbox`` is given, only labels that intersect with it are returned.
Args:
geojson: dict in normalized GeoJSON format (see VectorSource)
bbox: Box in pixel coords
"""
boxes = [Box.from_shapely(g).to_int() for g in labels_df.geometry]
if bbox is not None:
boxes = [b for b in boxes if b.intersects(bbox)]
class_ids = labels_df['class_id'].astype(int)
if 'scores' in labels_df.columns:
scores = labels_df['scores']
else:
scores = [None] * len(class_ids)
cells_to_class_id = {
cell: (class_id, class_scores)
for cell, class_id, class_scores in zip(boxes, class_ids, scores)
}
labels = ChipClassificationLabels(cells_to_class_id)
return labels
[docs]class ChipClassificationLabelSource(LabelSource):
"""A source of chip classification labels.
Ideally the vector_source contains a square for each cell in the grid. But
in reality, it can be difficult to label imagery in such an exhaustive way.
So, this can also handle sources with non-overlapping polygons that
do not necessarily cover the entire extent. It infers the grid of cells
and associated class_ids using the extent and options if infer_cells is
set to True.
"""
[docs] def __init__(self,
label_source_config: 'ChipClassificationLabelSourceConfig',
vector_source: 'VectorSource',
bbox: Box | None = None,
lazy: bool = False):
"""Constructor.
Args:
label_source_config: Config for class inference.
vector_source: Source of vector labels.
bbox: User-specified crop of the extent. If ``None``, the full
extent available in the source file is used.
lazy: If ``True``, labels are not populated during initialization.
Defaults to ``False``.
"""
self.cfg = label_source_config
self.vector_source = vector_source
if bbox is None:
bbox = vector_source.extent
self._bbox = bbox
self.lazy = lazy
self.labels_df = vector_source.get_dataframe()
self.validate_labels(self.labels_df)
self.labels = ChipClassificationLabels.make_empty()
if not self.lazy:
self.populate_labels()
[docs] def populate_labels(self, cells: Iterable[Box] | None = None) -> None:
"""Populate ``self.labels`` by either reading or inferring.
If cfg.infer_cells is True or specific cells are given, the labels are
inferred. Otherwise, they are read from the geojson.
"""
if self.cfg.infer_cells or cells is not None:
self.labels = self.infer_cells(cells=cells)
else:
self.labels = read_labels(self.labels_df, bbox=self.bbox)
[docs] def infer_cells(self, cells: Iterable[Box] | None = None
) -> ChipClassificationLabels:
"""Infer labels for a list of cells.
Cells are assumed to be in ``bbox`` coords as opposed to global coords
and are converted to global coords before inference. The returned
labels are in global coords. Only cells whose
labels are not already known are inferred.
Args:
cells: Cells (in ``bbox`` coords) whose labels are to be inferred.
If ``None``, cells are assumed to be sliding windows of size
and stride ``cell_sz`` (specified in
:class:`.ChipClassificationLabelSourceConfig`).
Defaults to ``None``.
Returns:
Labels (in global coords).
"""
if cells is None:
cell_sz = self.cfg.cell_sz
if cell_sz is None:
raise ValueError('cell_sz is not set.')
cells = self.extent.get_windows(cell_sz, cell_sz)
cells = [cell.to_global_coords(self.bbox) for cell in cells]
labels = self._infer_cells(cells)
return labels
def _infer_cells(self, cells: Iterable[Box]) -> ChipClassificationLabels:
"""Infer labels for a list of cells.
Cells are assumed to be in global coords as opposed to ``bbox`` coords.
Only cells whose labels are not already known are inferred.
Args:
cells: Cells (in global coords) whose labels are to be inferred.
Returns:
Labels (in global coords).
"""
cfg = self.cfg
known_cells = [c for c in cells if c in self.labels]
unknown_cells = [c for c in cells if c not in self.labels]
labels = infer_cells(
cells=unknown_cells,
labels_df=self.labels_df,
ioa_thresh=cfg.ioa_thresh,
use_intersection_over_cell=cfg.use_intersection_over_cell,
pick_min_class_id=cfg.pick_min_class_id,
background_class_id=cfg.background_class_id)
for cell in known_cells:
class_id = self.labels.get_cell_class_id(cell)
labels.set_cell(cell, class_id)
return labels
[docs] def get_labels(self,
window: Box | None = None) -> ChipClassificationLabels:
"""Return label for a window, inferring it if not already known.
If window is ``None``, returns all labels.
"""
if window is None:
return self.labels
window = window.to_global_coords(self.bbox)
if window not in self.labels:
self.labels += self._infer_cells(cells=[window])
labels = self.labels.get_singleton_labels(window)
return labels
[docs] def __getitem__(self, key: Any) -> int:
"""Return class ID for a window, inferring it if not already known."""
if isinstance(key, Box):
window = key
window = window.to_global_coords(self.bbox)
if window not in self.labels:
self.labels += self._infer_cells(cells=[window])
class_id = self.labels[window].class_id
return class_id
else:
return super().__getitem__(key)
[docs] def validate_labels(self, df: gpd.GeoDataFrame) -> None:
geom_types = set(df.geom_type)
if 'Point' in geom_types or 'LineString' in geom_types:
raise ValueError(
'LineStrings and Points are not supported '
'in ChipClassificationLabelSource. Use BufferTransformer '
'to buffer them into Polygons.')
if 'class_id' not in df.columns:
raise ValueError('All label polygons must have a class_id.')
@property
def bbox(self) -> Box:
return self._bbox
@property
def crs_transformer(self) -> 'CRSTransformer':
return self.vector_source.crs_transformer
[docs] def set_bbox(self, bbox: 'Box') -> None:
self._bbox = bbox
if not self.lazy:
self.populate_labels()