Source code for museotoolbox.stats

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
# ___  ___                       _____           _______
# |  \/  |                      |_   _|         | | ___ \
# | .  . |_   _ ___  ___  ___     | | ___   ___ | | |_/ / _____  __
# | |\/| | | | / __|/ _ \/ _ \    | |/ _ \ / _ \| | ___ \/ _ \ \/ /
# | |  | | |_| \__ \  __/ (_) |   | | (_) | (_) | | |_/ / (_) >  <
# \_|  |_/\__,_|___/\___|\___/    \_/\___/ \___/|_\____/ \___/_/\_\
#
# @author:  Nicolas Karasiak
# @site:    www.karasiak.net
# @git:     www.github.com/nkarasiak/MuseoToolBox
# =============================================================================
"""
The :mod:`museotoolbox.stats` module gathers stats functions and classes.
"""
import numpy as np
from museotoolbox.processing import RasterMath, extract_ROI, _add_vector_unique_fid

from scipy.ndimage.filters import generic_filter  # to compute moran


[docs]def zonal_stats(in_image, in_vector, unique_id, stats=[ 'mean', 'median', 'std'], verbose=False): """ Extract zonal stats according to an predifined id. Parameters ----------- in_image : str. Path of the raster file where the vector file will be rasterize. in_vector : str. Path of the vector file to rasterize. unique_id : str or False. If False, MuseoToolBox will create a field called 'uniquefid' using thefunction :func:`_add_vector_unique_fid`. stats : list, optional (default=['mean','median','std']). str in list must be a function available from numpy. For example ['var'] will output the variance per band and per unique id. verbose : bool or int, optional (default=True). The higher is the int verbose, the more it will returns informations. Returns -------- stats : np.ndarray Returns as many np.ndarray as number of stats asked. Stats ordered by bands. For example (each line correspond to the unique_id ordered asc) : +-------------+---------------+-------------+ | mean band_1 + mean band_2 | mean band_3 | +-------------+---------------+-------------+ | mean band_1 + mean band_2 | mean band_3 | +-------------+---------------+-------------+ Examples --------- >>> raster,vector = mtb.datasets.load_historical_data() >>> mean,var = mtb.stats.zonal_stats(raster,vector,'uniquefid',stats=['mean','var']) >>> mean.shape (17, 3) >>> mean[0,:] # mean of the first unique_id array([117.75219446, 109.80958812, 79.64213369]) >>> var[0,:] array([1302.29983482, 1250.59980003, 1015.76659747]) """ if unique_id is False: _add_vector_unique_fid(in_vector, 'unique_id', verbose=verbose) unique_id = 'unique_id' X, y = extract_ROI(in_image, in_vector, unique_id) n_unique_id = len(np.unique(y)) n_bands = X.shape[1] out_stats = [np.zeros([n_unique_id, n_bands]) for n in range(len(stats))] for idx_stat, stat in enumerate(stats): stat_function = getattr(__import__('numpy'), stat) for pos, label in enumerate(np.unique(y)): res = stat_function(X[np.where(y == label)], axis=0) out_stats[idx_stat][pos, :] = res return out_stats
[docs]class Moran:
[docs] def __init__(self, in_image, in_image_mask=False, lag=1, weights=False): """ Compute Moran's I for raster. Parameters ---------- in_image : str. A filename or path of a raster file. It could be any file that GDAL can open. lag : int, optional (default=1) The distance from the cell. weights :False or array-like, optional (default=False). Weights with the same shape as the square size. """ self.scores = dict(lag=[], I=[], band=[], EI=[]) self.lags = [] rM = RasterMath( in_image, in_image_mask=in_image_mask, return_3d=True, verbose=False) for band, arr in enumerate(rM.read_band_per_band()): if isinstance(lag, int): lag = [lag] for l in lag: squareSize = l * 2 + 1 footprint = np.ones((squareSize, squareSize)) weights = np.zeros((footprint.shape[0], footprint.shape[1])) weights[:, 0] = 1 weights[0, :] = 1 weights[-1, :] = 1 weights[:, -1] = 1 n = arr.count() arr = arr.astype(np.float64) # convert masked to nan for generic_filter np.where(arr.mask, np.nan, arr.data) x_ = np.nanmean(arr) num = generic_filter( arr, self._compute_view_for_global_moran, footprint=footprint, mode='constant', cval=np.nan, extra_keywords=dict( x_=x_, footprint=footprint, weights=weights, transform='r')) den = (arr - x_)**2 self.z = arr - x_ # need to mask z/den/num/neighbors den[arr.mask] = np.nan # local = np.nansum(num) / np.nansum(den) l1 = np.where(arr.mask, np.nan, num) l2 = np.where(arr.mask, np.nan, den) local = np.nansum(l1) / np.nansum(l2) self.I = local self.EI = -1 / (n - 1) self.lags.append(l) self.scores['lag'].append(l) self.scores['band'].append(band + 1) self.scores['I'].append(self.I) self.scores['EI'].append(self.EI)
[docs] def get_n_neighbors(self, array, footprint, weights): """ Get number of neighbors according to your array and to a footprint. Parameters ----------- array : array-like, shape = [X,Y] The input array. footprint : array-like, shape = [X,Y] The footprint array. weights : array-like, shape = [X,Y] The weight for each cell. Returns --------- w : int The number of neighbors. """ b = np.reshape( array, (footprint.shape[0], footprint.shape[1])).astype(np.float64) xCenter = int((footprint.shape[0] - 1) / 2) yCenter = int((footprint.shape[1] - 1) / 2) b[xCenter, yCenter] = np.nan w = np.count_nonzero(~np.isnan(b)) if w == 0: w = np.nan if weights is not False: weightsWindow = weights * footprint weightsNAN = weightsWindow[~np.isnan(b)] w = np.nansum(weightsNAN) return w
def _compute_view_for_global_moran( self, a, x_, footprint, weights, transform='r'): xSize, ySize = footprint.shape a = np.reshape(a, (xSize, ySize)) xCenter = int((xSize - 1) / 2) yCenter = int((ySize - 1) / 2) xi = a[xCenter, yCenter] if np.isnan(xi): return np.nan else: a[xCenter, yCenter] = np.nan w = np.count_nonzero(~np.isnan(a)) if w == 0: num = np.nan else: w = np.copy(weights) if transform == 'r': w = 1 / np.count_nonzero(~np.isnan(a)) num = np.nansum(w * (a - x_) * (xi - x_)) return num
[docs]def commission_omission(table): """ Compute commission and omission from a confusion matrix Parameters ---------- table : array. The confusion matrix (same number of lines and columns) Returns ------- com : commissions (list) om : omission (list) """ com, om = [[], []] for i in range(table.shape[0]): com.append((np.sum(table[i, :]) - table[i, i] ) / float(np.sum(table[i, :])) * 100) om.append((np.sum(table[:, i]) - table[i, i]) / float(np.sum(table[:, i])) * 100) return com, om
[docs]class ComputeConfusionMatrix:
[docs] def __init__(self, yp, yr, kappa=False, OA=False, F1=False): """ Compute confusion matrix given label predicted and label reality. Parameters ---------- yp : array. The label predicted. yr : array. The label truth. kappa : bool, default False. If true, computes kappa. OA : bool, default False. If True, computes Overall Accuracy. F1 : bool, default False. If True, computes F1-Score per class. """ # Initialization if isinstance(yp, list): yp = np.asarray(yp) yr = np.asarray(yr) n = yp.size C = np.amax((int(yr.max()), int(yp.max()))) self.confusion_matrix = np.zeros((C, C), dtype=np.int64) # Compute confusion matrix for i in range(n): self.confusion_matrix[yp[i].astype( np.int64) - 1, yr[i].astype(np.int64) - 1] += 1 # Compute overall accuracy if OA: self.OA = np.sum(np.diag(self.confusion_matrix)) / n # Compute Kappa if kappa: nl = np.sum(self.confusion_matrix, axis=1) nc = np.sum(self.confusion_matrix, axis=0) self.Kappa = ((n**2) * np.sum(np.diag(self.confusion_matrix) ) / n - np.sum(nc * nl)) / (n**2 - np.sum(nc * nl)) # if F1: F1 = [] for label in range(self.confusion_matrix.shape[0]): TP = self.confusion_matrix[label, label] #TN = np.sum(sp.diag(currentCsv))-currentCsv[label,label] FN = np.sum( self.confusion_matrix[:, label]) - self.confusion_matrix[label, label] FP = np.sum( self.confusion_matrix[label, :]) - self.confusion_matrix[label, label] denum = (2 * TP + FP + FN) if denum != 0: F1.append(2 * TP / (2 * TP + FP + FN)) self.F1 = F1
[docs]def retrieve_y_from_confusion_matrix(confusion_matrix): """ Retrieve y_predict and y_truth from confusion matrix. Parameters ----------- confusion_matrix : nd-array of shape [number of labels, number of labels] The confusion matrix Returns -------- yt,yp : two nd-array of shape [sum of confusion matrix,] """ confusion_matrix = np.asarray(confusion_matrix) yp = [] for j in range(confusion_matrix.shape[0]): for i in range(confusion_matrix.shape[1]): yp.extend([i + 1] * confusion_matrix[j, i]) yp = np.asarray(yp) yt = [[i + 1] * np.sum(confusion_matrix[i, :]) for i in range(confusion_matrix.shape[0])] yt = np.asarray([item for sublist in yt for item in sublist]) return yt, yp