Source code for rastervision.core.raster_stats

from typing import TYPE_CHECKING, Iterator, Optional, Sequence
import json

import numpy as np
from tqdm.auto import tqdm

from rastervision.pipeline.file_system import str_to_file, file_to_str

if TYPE_CHECKING:
    from rastervision.core.box import Box
    from rastervision.core.data import RasterSource

chip_sz = 300


[docs]def parallel_variance(mean_a, count_a, var_a, mean_b, count_b, var_b): """Compute the variance based on stats from two partitions of the data. See "Parallel Algorithm" in https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Args: mean_a: the mean of partition a count_a: the number of elements in partition a var_a: the variance of partition a mean_b: the mean of partition b count_b: the number of elements in partition b var_b: the variance of partition b Return: the variance of the two partitions if they were combined """ delta = mean_b - mean_a m_a = var_a * (count_a - 1) m_b = var_b * (count_b - 1) M2 = m_a + m_b + delta**2 * count_a * count_b / (count_a + count_b) var = M2 / (count_a + count_b - 1) return var
[docs]def parallel_mean(mean_a, count_a, mean_b, count_b): """Compute the mean based on stats from two partitions of the data. See "Parallel Algorithm" in https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Args: mean_a: the mean of partition a count_a: the number of elements in partition a mean_b: the mean of partition b count_b: the number of elements in partition b Return: the mean of the two partitions if they were combined """ mean = (count_a * mean_a + count_b * mean_b) / (count_a + count_b) return mean
[docs]class RasterStats():
[docs] def __init__(self): self.means = None self.stds = None
[docs] def compute(self, raster_sources: Sequence['RasterSource'], sample_prob: Optional[float] = None) -> None: """Compute the mean and stds over all the raster_sources. This ignores NODATA values. If sample_prob is set, then a subset of each scene is used to compute stats which speeds up the computation. Roughly speaking, if sample_prob=0.5, then half the pixels in the scene will be used. More precisely, the number of chips is equal to sample_prob * (width * height / 300^2), or 1, whichever is greater. Each chip is uniformly sampled from the scene with replacement. Otherwise, it uses a sliding window over the entire scene to compute stats. Args: raster_sources: list of RasterSource sample_prob: (float or None) between 0 and 1 """ stride = chip_sz nb_channels = raster_sources[0].num_channels_raw def get_chip(raster_source: 'RasterSource', window: 'Box') -> Optional[np.ndarray]: """Return chip or None if all values are NODATA.""" chip = raster_source.get_raw_chip(window).astype(float) # Convert shape from [h,w,c] to [c,h*w] chip = np.reshape(np.transpose(chip, [2, 0, 1]), (nb_channels, -1)) # Ignore NODATA values. chip[chip == 0.0] = np.nan if np.any(~np.isnan(chip)): return chip return None def sliding_chip_stream() -> Iterator[np.ndarray]: """Get stream of chips using a sliding window of size 300.""" for raster_source in raster_sources: windows = raster_source.extent.get_windows(chip_sz, stride) for window in windows: chip = get_chip(raster_source, window) if chip is not None: yield chip def random_chip_stream() -> Iterator[np.ndarray]: """Get random stream of chips.""" for raster_source in raster_sources: extent = raster_source.extent num_pixels = extent.area num_chips = round(sample_prob * (num_pixels / (chip_sz**2))) num_chips = max(1, num_chips) for _ in range(num_chips): window = raster_source.extent.make_random_square(chip_sz) chip = get_chip(raster_source, window) if chip is not None: yield chip # For each chip, compute the mean and var of that chip and then update the # running mean and var. count = 0 mean = np.zeros((nb_channels, )) var = np.zeros((nb_channels, )) chip_stream = (sliding_chip_stream() if sample_prob is None else random_chip_stream()) with tqdm(chip_stream, desc='Analyzing chips') as bar: for chip in bar: chip_means = np.nanmean(chip, axis=1) chip_vars = np.nanvar(chip, axis=1) chip_count = np.sum(chip[0] != np.nan) var = parallel_variance(chip_means, chip_count, chip_vars, mean, count, var) mean = parallel_mean(chip_means, chip_count, mean, count) count += chip_count self.means = mean self.stds = np.sqrt(var)
[docs] def save(self, stats_uri: str) -> None: # Ensure lists means = list(self.means) stds = list(self.stds) stats = {'means': means, 'stds': stds} str_to_file(json.dumps(stats), stats_uri)
[docs] @staticmethod def load(stats_uri: str) -> None: stats_json = json.loads(file_to_str(stats_uri)) stats = RasterStats() stats.means = stats_json['means'] stats.stds = stats_json['stds'] return stats