Source code for rastervision.core.data.label_source.chip_classification_label_source

from typing import TYPE_CHECKING, Any, Iterable

import geopandas as gpd

from rastervision.core.data.label import ChipClassificationLabels
from rastervision.core.data.label_source.label_source import LabelSource
from rastervision.core.box import Box

if TYPE_CHECKING:
    from rastervision.core.data import (ChipClassificationLabelSourceConfig,
                                        CRSTransformer, VectorSource)


[docs]def infer_cells(cells: list[Box], labels_df: gpd.GeoDataFrame, ioa_thresh: float, use_intersection_over_cell: bool, pick_min_class_id: bool, background_class_id: int) -> ChipClassificationLabels: """Infer ChipClassificationLabels grid from GeoJSON containing polygons. Given GeoJSON with polygons associated with class_ids, infer a grid of cells and class_ids that best captures the contents of each cell. For each cell, the problem is to infer the class_id that best captures the content of the cell. This is non-trivial since there can be multiple polygons of differing classes overlapping with the cell. Any polygons that sufficiently overlaps with the cell are in the running for setting the class_id. If there are none in the running, the cell is either considered null or background. Args: ioa_thresh: the minimum IOA of a polygon and cell for that polygon to be a candidate for setting the class_id use_intersection_over_cell: If ``True``, then use the area of the cell as the denominator in the IOA. Otherwise, use the area of the polygon. background_class_id: If not ``None``, class_id to use as the background class; ie. the one that is used when a window contains no boxes. pick_min_class_id: If ``True``, the class_id for a cell is the minimum class_id of the boxes in that cell. Otherwise, pick the class_id of the box covering the greatest area. """ cells_df = gpd.GeoDataFrame( data={'cell_id': range(len(cells))}, geometry=[c.to_shapely() for c in cells]) # duplicate geometry columns so that they are retained after the join cells_df.loc[:, 'geometry_cell'] = cells_df.geometry labels_df.loc[:, 'geometry_label'] = labels_df.geometry # Left-join cells to label polygons based on intersection. The result is a # table with each cell matched to all polygons that intersect it; i.e., # there will be a row for each unique (cell, polygon) combination. Cells # that didn't match any labels will have missing values as their class_ids. df: gpd.GeoDataFrame = cells_df.sjoin( labels_df, how='left', predicate='intersects') df.loc[:, 'geometry_intersection'] = df['geometry_cell'].intersection( df['geometry_label']) if use_intersection_over_cell: ioa = (df['geometry_intersection'].area / df['geometry_cell'].area) else: # intersection over label-polygon ioa = (df['geometry_intersection'].area / df['geometry_label'].area) df.loc[:, 'ioa'] = ioa.fillna(-1) # labels with IOA below threshold cannot contribute their class_id df.loc[df['ioa'] < ioa_thresh, 'class_id'] = None # Assign background_class_id to cells /wo a class_id. This includes both # unmatched cells and ones whose ioa fell below the ioa_thresh. df.loc[df['class_id'].isna(), 'class_id'] = background_class_id # break ties (i.e. one cell matched to multiple label polygons) if pick_min_class_id: df = df.sort_values('class_id').drop_duplicates( ['cell_id'], keep='first') else: # largest IOA df = df.sort_values('ioa').drop_duplicates(['cell_id'], keep='last') boxes = [Box.from_shapely(c).to_int() for c in df['geometry_cell']] class_ids = df['class_id'].astype(int) cells_to_class_id = { cell: (class_id, None) for cell, class_id in zip(boxes, class_ids) } labels = ChipClassificationLabels(cells_to_class_id) return labels
[docs]def read_labels(labels_df: gpd.GeoDataFrame, bbox: Box | None = None) -> ChipClassificationLabels: """Convert ``GeoDataFrame`` to ``ChipClassificationLabels``. If the ``GeoDataFrame`` already contains a grid of cells, then ``ChipClassificationLabels`` can be constructed in a straightforward manner without having to infer the class of cells. If ``bbox`` is given, only labels that intersect with it are returned. Args: geojson: dict in normalized GeoJSON format (see VectorSource) bbox: Box in pixel coords """ boxes = [Box.from_shapely(g).to_int() for g in labels_df.geometry] if bbox is not None: boxes = [b for b in boxes if b.intersects(bbox)] class_ids = labels_df['class_id'].astype(int) if 'scores' in labels_df.columns: scores = labels_df['scores'] else: scores = [None] * len(class_ids) cells_to_class_id = { cell: (class_id, class_scores) for cell, class_id, class_scores in zip(boxes, class_ids, scores) } labels = ChipClassificationLabels(cells_to_class_id) return labels
[docs]class ChipClassificationLabelSource(LabelSource): """A source of chip classification labels. Ideally the vector_source contains a square for each cell in the grid. But in reality, it can be difficult to label imagery in such an exhaustive way. So, this can also handle sources with non-overlapping polygons that do not necessarily cover the entire extent. It infers the grid of cells and associated class_ids using the extent and options if infer_cells is set to True. """
[docs] def __init__(self, label_source_config: 'ChipClassificationLabelSourceConfig', vector_source: 'VectorSource', bbox: Box | None = None, lazy: bool = False): """Constructor. Args: label_source_config: Config for class inference. vector_source: Source of vector labels. bbox: User-specified crop of the extent. If ``None``, the full extent available in the source file is used. lazy: If ``True``, labels are not populated during initialization. Defaults to ``False``. """ self.cfg = label_source_config self.vector_source = vector_source if bbox is None: bbox = vector_source.extent self._bbox = bbox self.lazy = lazy self.labels_df = vector_source.get_dataframe() self.validate_labels(self.labels_df) self.labels = ChipClassificationLabels.make_empty() if not self.lazy: self.populate_labels()
[docs] def populate_labels(self, cells: Iterable[Box] | None = None) -> None: """Populate ``self.labels`` by either reading or inferring. If cfg.infer_cells is True or specific cells are given, the labels are inferred. Otherwise, they are read from the geojson. """ if self.cfg.infer_cells or cells is not None: self.labels = self.infer_cells(cells=cells) else: self.labels = read_labels(self.labels_df, bbox=self.bbox)
[docs] def infer_cells(self, cells: Iterable[Box] | None = None ) -> ChipClassificationLabels: """Infer labels for a list of cells. Cells are assumed to be in ``bbox`` coords as opposed to global coords and are converted to global coords before inference. The returned labels are in global coords. Only cells whose labels are not already known are inferred. Args: cells: Cells (in ``bbox`` coords) whose labels are to be inferred. If ``None``, cells are assumed to be sliding windows of size and stride ``cell_sz`` (specified in :class:`.ChipClassificationLabelSourceConfig`). Defaults to ``None``. Returns: Labels (in global coords). """ if cells is None: cell_sz = self.cfg.cell_sz if cell_sz is None: raise ValueError('cell_sz is not set.') cells = self.extent.get_windows(cell_sz, cell_sz) cells = [cell.to_global_coords(self.bbox) for cell in cells] labels = self._infer_cells(cells) return labels
def _infer_cells(self, cells: Iterable[Box]) -> ChipClassificationLabels: """Infer labels for a list of cells. Cells are assumed to be in global coords as opposed to ``bbox`` coords. Only cells whose labels are not already known are inferred. Args: cells: Cells (in global coords) whose labels are to be inferred. Returns: Labels (in global coords). """ cfg = self.cfg known_cells = [c for c in cells if c in self.labels] unknown_cells = [c for c in cells if c not in self.labels] labels = infer_cells( cells=unknown_cells, labels_df=self.labels_df, ioa_thresh=cfg.ioa_thresh, use_intersection_over_cell=cfg.use_intersection_over_cell, pick_min_class_id=cfg.pick_min_class_id, background_class_id=cfg.background_class_id) for cell in known_cells: class_id = self.labels.get_cell_class_id(cell) labels.set_cell(cell, class_id) return labels
[docs] def get_labels(self, window: Box | None = None) -> ChipClassificationLabels: """Return label for a window, inferring it if not already known. If window is ``None``, returns all labels. """ if window is None: return self.labels window = window.to_global_coords(self.bbox) if window not in self.labels: self.labels += self._infer_cells(cells=[window]) labels = self.labels.get_singleton_labels(window) return labels
[docs] def __getitem__(self, key: Any) -> int: """Return class ID for a window, inferring it if not already known.""" if isinstance(key, Box): window = key window = window.to_global_coords(self.bbox) if window not in self.labels: self.labels += self._infer_cells(cells=[window]) class_id = self.labels[window].class_id return class_id else: return super().__getitem__(key)
[docs] def validate_labels(self, df: gpd.GeoDataFrame) -> None: geom_types = set(df.geom_type) if 'Point' in geom_types or 'LineString' in geom_types: raise ValueError( 'LineStrings and Points are not supported ' 'in ChipClassificationLabelSource. Use BufferTransformer ' 'to buffer them into Polygons.') if 'class_id' not in df.columns: raise ValueError('All label polygons must have a class_id.')
@property def bbox(self) -> Box: return self._bbox @property def crs_transformer(self) -> 'CRSTransformer': return self.vector_source.crs_transformer
[docs] def set_bbox(self, bbox: 'Box') -> None: self._bbox = bbox if not self.lazy: self.populate_labels()