Source code for rastervision.core.data.label_source.chip_classification_label_source

from typing import TYPE_CHECKING, Any, Iterable, List, Optional

import numpy as np
import geopandas as gpd

from rastervision.core.data.label import ChipClassificationLabels
from rastervision.core.data.label_source.label_source import LabelSource
from rastervision.core.box import Box

if TYPE_CHECKING:
    from rastervision.core.data import (ChipClassificationLabelSourceConfig,
                                        VectorSource)


[docs]def infer_cells(cells: List[Box], labels_df: gpd.GeoDataFrame, ioa_thresh: float, use_intersection_over_cell: bool, pick_min_class_id: bool, background_class_id: int) -> ChipClassificationLabels: """Infer ChipClassificationLabels grid from GeoJSON containing polygons. Given GeoJSON with polygons associated with class_ids, infer a grid of cells and class_ids that best captures the contents of each cell. For each cell, the problem is to infer the class_id that best captures the content of the cell. This is non-trivial since there can be multiple polygons of differing classes overlapping with the cell. Any polygons that sufficiently overlaps with the cell are in the running for setting the class_id. If there are none in the running, the cell is either considered null or background. Args: ioa_thresh: (float) the minimum IOA of a polygon and cell for that polygon to be a candidate for setting the class_id use_intersection_over_cell: (bool) If true, then use the area of the cell as the denominator in the IOA. Otherwise, use the area of the polygon. background_class_id: (None or int) If not None, class_id to use as the background class; ie. the one that is used when a window contains no boxes. pick_min_class_id: If true, the class_id for a cell is the minimum class_id of the boxes in that cell. Otherwise, pick the class_id of the box covering the greatest area. Returns: ChipClassificationLabels """ cells_df = gpd.GeoDataFrame( data={'cell_id': range(len(cells))}, geometry=[c.to_shapely() for c in cells]) # duplicate geometry columns so that they are retained after the join cells_df.loc[:, 'geometry_cell'] = cells_df.geometry labels_df.loc[:, 'geometry_label'] = labels_df.geometry # Left-join cells to label polygons based on intersection. The result is a # table with each cell matched to all polygons that intersect it; i.e., # there will be a row for each unique (cell, polygon) combination. Cells # that didn't match any labels will have missing values as their class_ids. df: gpd.GeoDataFrame = cells_df.sjoin( labels_df, how='left', predicate='intersects') df.loc[:, 'geometry_intersection'] = df['geometry_cell'].intersection( df['geometry_label']) if use_intersection_over_cell: ioa = (df['geometry_intersection'].area / df['geometry_cell'].area) else: # intersection over label-polygon ioa = (df['geometry_intersection'].area / df['geometry_label'].area) df.loc[:, 'ioa'] = ioa.fillna(-1) # labels with IOA below threshold cannot contribute their class_id df.loc[df['ioa'] < ioa_thresh, 'class_id'] = None # Assign background_class_id to cells /wo a class_id. This includes both # unmatched cells and ones whose ioa fell below the ioa_thresh. df.loc[df['class_id'].isna(), 'class_id'] = background_class_id # break ties (i.e. one cell matched to multiple label polygons) if pick_min_class_id: df = df.sort_values('class_id').drop_duplicates( ['cell_id'], keep='first') else: # largest IOA df = df.sort_values('ioa').drop_duplicates(['cell_id'], keep='last') boxes = [Box.from_shapely(c).to_int() for c in df['geometry_cell']] class_ids = df['class_id'].astype(int) cells_to_class_id = { cell: (class_id, None) for cell, class_id in zip(boxes, class_ids) } labels = ChipClassificationLabels(cells_to_class_id) return labels
[docs]def read_labels(labels_df: gpd.GeoDataFrame, extent: Optional[Box] = None) -> ChipClassificationLabels: """Convert GeoDataFrame to ChipClassificationLabels. If the GeoDataFrame already contains a grid of cells, then ChipClassificationLabels can be constructed in a straightforward manner without having to infer the class of cells. If extent is given, only labels that intersect with it are returned. Args: geojson: dict in normalized GeoJSON format (see VectorSource) extent: Box in pixel coords Returns: ChipClassificationLabels """ if extent is not None: extent_polygon = extent.to_shapely() labels_df = labels_df[labels_df.intersects(extent_polygon)] boxes = np.array([ Box.from_shapely(c).to_int().shift_origin(extent) for c in labels_df.geometry ]) else: boxes = np.array( [Box.from_shapely(c).to_int() for c in labels_df.geometry]) class_ids = labels_df['class_id'].astype(int) cells_to_class_id = { cell: (class_id, None) for cell, class_id in zip(boxes, class_ids) } labels = ChipClassificationLabels(cells_to_class_id) return labels
[docs]class ChipClassificationLabelSource(LabelSource): """A source of chip classification labels. Ideally the vector_source contains a square for each cell in the grid. But in reality, it can be difficult to label imagery in such an exhaustive way. So, this can also handle sources with non-overlapping polygons that do not necessarily cover the entire extent. It infers the grid of cells and associated class_ids using the extent and options if infer_cells is set to True. """
[docs] def __init__(self, label_source_config: 'ChipClassificationLabelSourceConfig', vector_source: 'VectorSource', extent: Box = None, lazy: bool = False): """Constructs a LabelSource for chip classification. Args: label_source_config (ChipClassificationLabelSourceConfig): Config for class inference. vector_source (VectorSource): Source of vector labels. extent (Box): Box used to filter the labels by extent or compute grid. lazy (bool, optional): If True, labels are not populated during initialization. Defaults to False. """ self.cfg = label_source_config self._extent = extent self.labels_df = vector_source.get_dataframe() self.validate_labels(self.labels_df) self.labels = ChipClassificationLabels.make_empty() if not lazy: self.populate_labels()
[docs] def populate_labels(self, cells: Optional[Iterable[Box]] = None) -> None: """Populate self.labels by either reading or inferring. If cfg.infer_cells is True or specific cells are given, the labels are inferred. Otherwise, they are read from the geojson. """ if self.cfg.infer_cells or cells is not None: self.labels = self.infer_cells(cells=cells) else: self.labels = read_labels(self.labels_df, extent=self.extent)
[docs] def infer_cells(self, cells: Optional[Iterable[Box]] = None ) -> ChipClassificationLabels: """Infer labels for a list of cells. Only cells whose labels are not already known are inferred. Args: cells (Optional[Iterable[Box]], optional): Cells whose labels are to be inferred. Defaults to None. Returns: ChipClassificationLabels: labels """ cfg = self.cfg if cells is None: if cfg.cell_sz is None: raise ValueError('cell_sz is not set.') cells = self.extent.get_windows(cfg.cell_sz, cfg.cell_sz) else: cells = [cell.shift_origin(self.extent) for cell in cells] known_cells = [c for c in cells if c in self.labels] unknown_cells = [c for c in cells if c not in self.labels] labels = infer_cells( cells=unknown_cells, labels_df=self.labels_df, ioa_thresh=cfg.ioa_thresh, use_intersection_over_cell=cfg.use_intersection_over_cell, pick_min_class_id=cfg.pick_min_class_id, background_class_id=cfg.background_class_id) for cell in known_cells: class_id = self.labels.get_cell_class_id(cell) labels.set_cell(cell, class_id) return labels
[docs] def get_labels(self, window: Optional[Box] = None) -> ChipClassificationLabels: if window is None: return self.labels window = window.shift_origin(self.extent) return self.labels.get_singleton_labels(window)
def __getitem__(self, key: Any) -> int: """Return label for a window, inferring it if it is not already known. """ if isinstance(key, Box): window = key if window not in self.labels: self.labels += self.infer_cells(cells=[window]) return self.labels[window].class_id else: return super().__getitem__(key)
[docs] def validate_labels(self, df: gpd.GeoDataFrame) -> None: geom_types = set(df.geom_type) if 'Point' in geom_types or 'LineString' in geom_types: raise ValueError( 'LineStrings and Points are not supported ' 'in ChipClassificationLabelSource. Use BufferTransformer ' 'to buffer them into Polygons.') if 'class_id' not in df.columns: raise ValueError('All label polygons must have a class_id.')
@property def extent(self) -> Box: return self._extent