Source code for

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
# ___  ___                       _____           _______
# |  \/  |                      |_   _|         | | ___ \
# | .  . |_   _ ___  ___  ___     | | ___   ___ | | |_/ / _____  __
# | |\/| | | | / __|/ _ \/ _ \    | |/ _ \ / _ \| | ___ \/ _ \ \/ /
# | |  | | |_| \__ \  __/ (_) |   | | (_) | (_) | | |_/ / (_) >  <
# \_|  |_/\__,_|___/\___|\___/    \_/\___/ \___/|_\____/ \___/_/\_\
# @author:  Nicolas Karasiak
# @site:
# @git:
# =============================================================================
The :mod:`` module gathers artificial intelligence tools.
from joblib import Parallel, delayed
import os
import numpy as np
from sklearn import metrics
from sklearn.base import clone
from ..processing import RasterMath, get_gdt_from_minmax_values, convert_dt, _reshape_ndim
from ..internal_tools import ProgressBar, push_feedback

[docs]class SuperLearner:
[docs] def __init__(self, classifier, param_grid=None, n_jobs=1, verbose=False): """ SuperLearner, shortname for Supervised Learning, ease the way to learn a model via an array or a raster using Scikit-Learn algorithm. After learning a model via :func:`fit`, you can predict via :func:`predict_image` or :func:`predict_array`. Parameters ----------- classifier : algorithm compatible with scikit-learn. For example ``RandomForestClassifier(n_estimators=100)`` from ``from sklearn.ensemble import RandomForestClassifier`` param_grid : False or dict, optional (default=False). param_grid for the grid_search. E.g. for RandomForestClassifier : ``param_grid=dict(n_estimators=[10,100],max_features=[1,3])`` n_jobs : int, default 1. Number of cores to be used by ``sklearn`` in grid-search. verbose : bool or int, optional (default=False) The higher it is the more sequential will show progression. Examples --------- >>> import museotoolbox as mtb >>> from sklearn.ensemble import RandomForestClassifier >>> X,y = mtb.datasets.load_historical_data(return_X_y=True) >>> RS50 = mtb.cross_validation.RandomStratifiedKFold(n_splits=2,n_repeats=5, random_state=12,verbose=False) >>> classifier = RandomForestClassifier() >>> SL =,classifier=classifier) >>>,y,cv=RS50,param_grid=dict(n_estimators=[100,200])) Fitting 10 folds for each of 2 candidates, totalling 20 fits best score : 0.966244859222 best n_estimators : 200 >>> for kappa in SL.get_stats_from_cv(confusion_matrix=False,kappa=True): print(kappa) [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers. {'kappa': 0.94145803865870303} {'kappa': 0.94275572196698443} {'kappa': 0.94566553229314054} {'kappa': 0.94210064101370472} {'kappa': 0.94566137634353153} {'kappa': 0.94085890364956737} {'kappa': 0.94136385707385184} {'kappa': 0.9383201352573155} {'kappa': 0.93887726891376944} {'kappa': 0.94450020549861891} [Parallel(n_jobs=-1)]: Done 10 out of 10 | eSLsed: 8.7s finished >>> SL.predict_image(raster,'/tmp/classification.tif') Total number of blocks : 15 Prediction... [########################################]100% Saved /tmp/classification.tif using function predictArray """ self.n_jobs = n_jobs self.verbose = verbose if self.verbose <= 1 or self.verbose is False: self.verbose_gridsearch = 0 else: self.verbose_gridsearch = self.verbose - 1 self.classifier = classifier self.param_grid = param_grid self.xFunction = False self.standardize = False self._is_standardized = False self._array_is_customized = False self.xKwargs = {} self.CV = False self.cloneModel = False
[docs] def standardize_array(self, X=None): """ Scale X data using StandardScaler from ``sklearn``. If X is None, initialize StandardScaler. Parameters ---------- X : np.ndarray, or None, optional (default=None). The array to scale the data from. need_transformation : bool, default True If you used function to transform your array. """ from sklearn.preprocessing import StandardScaler try: self.StandardScaler except BaseException: self.StandardScaler = StandardScaler() if X is not None: if self._is_standardized is False: self._is_standardized = True Xt = self.StandardScaler.transform(X) return Xt
[docs] def fit( self, X, y, group=None, standardize=True, cv=None, scoring='accuracy', refit=True, **gridSearchCVParams): """ Fit model from array. Parameters ---------- X : np.ndarray. Array with values of each label variable. y : np.ndarray. Array with labels only. group : str or False. If you use a cross-validation which needs group-splitting. strandardize : Bool, default True. If True, will standardize features by removing the mean and scaling to unit variance. cv : Cross-Validation or int or None. Default None. if cv, choose one from cross_validation. if int, uses :class:`museotoolbox.cross_validation.RandomStratifiedKFold` with K = the int value. """ self.y = y = group if self._array_is_customized: X = self.xFunction(X, **self.xKwargs) X = _reshape_ndim(X) self.X = X self.standardize = standardize if standardize: self.standardize_array() self.X = self.standardize_array(X) self._fit( self.X, self.y,, self.classifier, self.param_grid, cv, scoring, refit, **gridSearchCVParams)
def _fit( self, X, y, groups, classifier, param_grid, cv, scoring='accuracy', refit=True, **gridSearchCVParams): if isinstance(cv, int) and cv: from ..cross_validation import RandomStratifiedKFold cv = RandomStratifiedKFold(n_splits=cv) if cv is not None and cv is not False: if isinstance(cv, list): self.CV = cv else: self.CV = [] if groups is None: for tr, vl in (cv for cv in cv.split( X, y) if cv is not None): self.CV.append((tr, vl)) else: for tr, vl in (cv for cv in cv.split( X, y, groups) if cv is not None): self.CV.append((tr, vl)) from sklearn.model_selection import GridSearchCV if isinstance(param_grid, dict) and cv is not False: if gridSearchCVParams == {}: self.model = GridSearchCV( self.classifier, param_grid=param_grid, cv=cv, scoring=scoring, refit=refit, n_jobs=self.n_jobs, verbose=self.verbose) else: self.model = GridSearchCV( self.classifier, param_grid=param_grid, cv=cv, scoring=scoring, refit=refit, n_jobs=self.n_jobs, verbose=self.verbose, **gridSearchCVParams) if groups is None:, y) else:, y, groups=groups) # self.model = self.gS.best_estimator_ self.cloneModel = clone(self.model.best_estimator_), y, groups) if self.verbose: push_feedback('best score : ' + str(self.model.best_score_)) for key in self.param_grid.keys(): message = 'best ' + key + ' : ' + \ str(self.model.best_params_[key]) push_feedback(message) else: if cv is not False or param_grid is not False: raise ValueError( 'Cannot fit model because a CV or a param_grid is given and and no param_grid was defined?\ If you want to fit your mode with no param_grid, please set cv=False and param_grid=False.') if groups is None: self.model =, y) else: self.model =, y, groups)
[docs] def save_model(self, path): """ Save model 'myModel.npz' to be loaded later via `SuperLearner.load_model(path)` Parameters ---------- path : str. If path ends with npz, perfects, else will add '.npz' after your fileName. Returns ------- path : str. Path and filename with mtb extension. """ if not path.endswith('npz'): path += '.npz' np.savez_compressed(path, SL=self.__dict__) return path
[docs] def load_model(self, path): """ Load model previously saved with `SuperLearner.save_model(path)`. Parameters ---------- path : str. If path ends with npy, perfects, else will add '.npy' after your fileName. """ if not path.endswith('npz'): path += '.npz' model = np.load(path, allow_pickle=True) self.__dict__.update(model['SL'].tolist())
def _convert_array(self, X): if self._array_is_customized is True: X = self.xFunction(X, **self.xKwargs) if self.standardize: if if X.mask.ndim == 1: X = X.reshape(-1, 1) tmpMask = X.mask[:, 0] X = _reshape_ndim(X) X = self.StandardScaler.transform(X) if tmpMask = np.repeat(tmpMask.reshape(-1, 1), X.shape[-1], axis=1) X =, tmpMask) X = _reshape_ndim(X) return X
[docs] def predict_array(self, X): """ Predict label from array. Parameters ---------- X : np.ndarray (n_size,). The array to predict. Must have the same number of bands of the initial array/raster. **kwargs : Xfunction : a custom function to modify directly the array from the raster. """ X = self._convert_array(X) self.Xpredict = self.model.predict(X) return self.Xpredict
[docs] def predict_confidence_per_class(self, X): """ Predict confidence for each class. Parameters ---------- X : np.ndarray. The array to predict proba. Must have the same number of bands of the initial array/raster. Returns ---------- Xpredict : np.ndarray (n_size,n_class). The probability from 0 to 100. """ X = self._convert_array(X) Xpredict_proba = self.model.predict_proba(X) * 100 # share prediction in class in case of confidence for only predicted # class self.Xpredict_proba = Xpredict_proba return Xpredict_proba
[docs] def predict_higher_confidence(self, X): """ Get confidence of the predicted label. Parameters ---------- X : np.ndarray. The array to predict proba. Must have the same number of bands of the initial array/raster. Returns ---------- Xpredict : np.ndarray (n_size,). The probability from 0 to 100. """ Xpredict_proba = np.amax( self.model.predict_proba( self._convert_array( X)) * 100, axis=1) return Xpredict_proba
[docs] def predict_image( self, in_image, out_image, confidence_per_class=False, higher_confidence=False, in_image_mask=False, out_nodata=0, compress=True, n_jobs=1): """ Predict label from raster using previous learned model. This function will call self.predictArray(X). Parameters ---------- in_image : str. A filename or path of a raster file. It could be any file that GDAL can open. out_image : str. A geotiff extension filename corresponding to a raster image to create. confidence_per_class : str or bool, optional (default=False) A path to a geotiff extension filename to store each confidence per class (one band = one label). higher_confidence: str or bool, optional (default=False). A path to a geotiff extension filename to store the max confidence from all classes. in_image_mask : str or False, optional (default=False). Path of the raster where 0 is mask and value above are no mask. outNumpyDT : numpy datatype, default will get the datatype according to your maximum class value. Get numpy datatype throught : convert_dt(get_gdt_from_minmax_values(maximumClassValue))) out_nodata : int, optional (default=0). Value of no data only for the out_image. """ rM = RasterMath(in_image, in_image_mask, message='Prediction...') numpyDT = convert_dt( get_gdt_from_minmax_values(np.amax(self.model.classes_))) rM.add_function( self.predict_array, out_image, out_n_bands=1, out_np_dt=numpyDT, out_nodata=out_nodata, compress=compress) if confidence_per_class: rM.add_function( self.predict_confidence_per_class, confidence_per_class, out_n_bands=False, out_np_dt=np.int16, out_nodata=np.iinfo(np.int16).min, compress=compress) if higher_confidence: rM.add_function( self.predict_higher_confidence, higher_confidence, out_n_bands=1, out_np_dt=np.int16, out_nodata=np.iinfo(np.int16).min, compress=compress)
def _get_stats_from_each_cv(self, trvl, confusion_matrix=True, kappa=False, OA=False, F1=False, nTrain=False, resampler = False): """ Compute stats per each fold """ X_train, X_test = self.X[trvl[0]], self.X[trvl[1]] Y_train, Y_test = self.y[trvl[0]], self.y[trvl[1]] if resampler : X_train, Y_train = resampler.fit_resample(X_train,Y_train) empty_model = clone(self.model.best_estimator_), Y_train) X_pred = empty_model.predict(X_test) accuracies = {} if confusion_matrix: accuracies['confusion_matrix'] = metrics.confusion_matrix( Y_test, X_pred) if kappa: accuracies['kappa'] = metrics.cohen_kappa_score(Y_test, X_pred) if OA: accuracies['OA'] = metrics.accuracy_score(Y_test, X_pred) if F1: accuracies['F1'] = metrics.f1_score(Y_test, X_pred, average=None) if nTrain: accuracies['nTrain'] = np.unique(Y_train, return_counts=True)[1] return accuracies
[docs] def save_cm_from_cv(self, savePath, prefix='', header=True, resampler = False, n_jobs=1): """ Save each confusion matrix (csv format) from cross-validation. For each matrix, will save as header : - The number of training samples per class, - The F1-score per class, - Overall Accuracy, - Kappa. Example of confusion matrix saved as csv : +-------------------------+------------+ | # Training samples : 90,80 | +-------------------------+------------+ | # F1 : 91.89,90.32 | +-------------------------+------------+ | # OA : 91.18 | +-------------------------+------------+ | # Kappa : 82.23 | +-------------------------+------------+ | 85 | 5 | +-------------------------+------------+ | 10 | 70 | +-------------------------+------------+ - **In X (columns)** : prediction (95 predicted labels for class 1). - **In Y (lines)** : reference (90 labels from class 1). Parameters ---------- savePath : str. The path where to save the different csv. If not exists, will be created prefix : str, default ''. If prefix, will add this prefix before the csv name (i.e. 0.csv) header : boolean, default True. If True, will save F1, OA, Kappa and number of training samples. If False, will only save confusion matrix resampler : imblearn class or False, optional Default is False If class is given, each fold will be retreated resampler.fit_resample(X,y) method. Returns ------- None Examples -------- After having learned with :mod:`` : >>> SL.saveCMFromCV('/tmp/testMTB/',prefix='RS50_') [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers. [Parallel(n_jobs=-1)]: Done 10 out of 10 | eSLsed: 3.4s finished >>> np.loadtxt('/tmp/testMTB/RS50_0.csv') array([[85, 5], [10, 70]]) """ def _compute_stats_per_cv(statsidx, trvl, savePath, prefix, header, resampler = False): outFile = savePath + '/' + prefix + str(statsidx) + '.csv' dictStats = self._get_stats_from_each_cv(trvl, True, header, header, header, header, resampler = resampler) if header: np_header = 'Training samples : ' + ','.join(str(tr) for tr in dictStats['nTrain']) +\ '\nF1 : ' + ','.join(str(np.round(f * 100, 2)) for f in dictStats['F1']) +\ '\nOA : {}'.format(np.round(dictStats['OA'] * 100), 2) +\ '\nKappa : {}'.format( np.round(dictStats['kappa'] * 100), 2) else: np_header = '' np.savetxt( outFile, dictStats['confusion_matrix'], header=np_header, fmt='%0.d') if not os.path.exists(savePath): os.makedirs(savePath) # ============================================================================= # Parallel(n_jobs=n_jobs, # verbose=self.verbose + 1)(delayed(_compute_stats_per_cv)(statsidx, # trvl, # savePath, # prefix, # header, # resampler=resampler) for statsidx, # trvl in enumerate(self.CV)) # # # ============================================================================= from sklearn.model_selection import cross_val_score for statsidx,trvl in enumerate(self.CV) : _compute_stats_per_cv( statsidx, trvl, savePath, prefix, header, resampler=resampler)
[docs] def get_stats_from_cv( self, confusion_matrix=True, kappa=False, OA=False, F1=False, nTrain=False, resampler=False): """ Extract statistics from the Cross-Validation. If Cross-Validation is 5-fold, getStatsFromCV will return 5 confusion matrix, 5 kappas... Parameters ----------- confusion_matrix : bool, default True. If True, will return first the Confusion Matrix. kappa : bool, default False. If True, will return in kappa. OA : bool, default False. If True, will return Overall Accuracy/ F1 : bool, default False. If True, will return F1 Score per class. nTrain : bool, default False. If True, will return number of train samples ordered asc. per label. resampler : imblearn class or False, optional Default is False If class is given, each fold will be retreated resampler.fit_resample(X,y) method. Returns ------- Accuracies : dict A dictionary of each statistic asked. Examples -------- After having learned with :mod:`` : >>> for stats in SL.get_stats_from_cv(confusion_matrix=False,kappa=True): >>> stats['kappa'] 0.942560083148 0.94227598585 0.942560083148 ... """ def _computeStatsPerCV(statsidx, trvl, **kwargs): dictStats = self._get_stats_from_each_cv(trvl, **kwargs) return dictStats statsCV = Parallel( n_jobs=self.n_jobs, verbose=self.verbose)( delayed(_computeStatsPerCV)( statsidx, trvl, confusion_matrix=confusion_matrix, kappa=kappa, OA=OA, F1=F1, nTrain=nTrain, resampler=resampler) for statsidx, trvl in enumerate( self.CV)) return statsCV
[docs] def customize_array(self, xFunction, **kwargs): self._array_is_customized = True self.xFunction = xFunction self.xKwargs = kwargs
[docs]class SequentialFeatureSelection: """ Sequential Feature Selection Parameters ---------- classifier : class. Classifier from scikit-learn. param_grid : np.ndarray. param_grid for hyperparameters of the classifier. path_to_save_models : False or str, optional (default=False). If False, will store best model per combination in memory. If str, must be path to save each model and accuracy per feature. n_comp : int, optional (default=1). The number of component per feature. If 4, each feature has 4 columns. verbose : bool or int, optional (default=False) The higher it is the more sequential will show progression. """
[docs] def __init__(self, classifier, param_grid, path_to_save_models=False, n_comp=1, verbose=False): # share args self.n_comp = n_comp self.classifier = classifier self.param_grid = param_grid self.verbose = verbose if self.verbose < 1: self.verbose_gridsearch = 0 else: self.verbose_gridsearch = self.verbose - 1 self.xFunction = False self.xKwargs = False self.path_to_save_models = path_to_save_models
[docs] def fit(self, X, y, group=None, cv=5, scoring='accuracy', standardize=True, max_features=False, resampler = False, recompute_best = False, n_jobs=1, **kwargs): """ Parameters ---------- X : np.ndarray shape of np.ndarray is (n_size,n_bands). y : np.ndarray Size of X.shape[0]. group : None, optional group for cross-validation cv : int, or cross_validation method, optional (default=5). Default will use scoring : str or class, optional (default='accuracy'). default is 'accuracy'. See sklearn.metrics.make_scorer from scikit-learn. standardize : optional Default True. max_features : int or bool. Default False, if value int. resampler : imblearn class or False, optional Default is False If class is given, each fold will be retreated resampler.fit_resample(X,y) method. recompute_best : bool, default : False. If True, will recompute the model with the best already found variables. This can be usefull if you changed algorithms, parameters, cross-validation... n_jobs : int. Number of job to compute cross-validation. """ self.X = X self.X_ = np.copy(X) self.y = y = group = cv self.scoring = scoring self.models_path_ = [] if self.xFunction: self.X = self.xFunction(X, **self.xKwargs) self.X = _reshape_ndim(self.X) xSize = self.X.shape[1] self.n_features = int(xSize / self.n_comp) self.max_features = self.n_features if max_features is not False: if max_features < self.n_features: self.max_features = max_features totalIter = np.sum( [self.n_features - i for i in range(self.max_features)]) if self.verbose: pB = ProgressBar(totalIter, message='SFFS:') self.mask = np.ones(xSize, dtype=bool) self.models_, self.best_scores_, self.best_features_ = [[], [], []] self.subsets_ = dict() for j in range(self.max_features): resPerFeatures = list() need_fit = True n_features_to_test = int( self.X[:, self.mask].shape[1] / self.n_comp) if self.path_to_save_models: all_scores_file = os.path.join( self.path_to_save_models, 'all_scores_{}.csv'.format(j)) if os.path.exists(all_scores_file): need_fit = False push_feedback('Feature {} already computed'.format(j)) scores = np.loadtxt(all_scores_file, delimiter=',') if scores.ndim == 1: all_scores = [scores[1]] best_candidate = 0 else: all_scores = scores[:, 1] best_candidate = np.argmax(scores[:, 1]) SL = SuperLearner( classifier=self.classifier, param_grid=self.param_grid, n_jobs=n_jobs, verbose=self.verbose_gridsearch) SL.load_model( os.path.join( self.path_to_save_models, 'model_{}.npz'.format(j))) self.models_path_.append( os.path.join( self.path_to_save_models, 'model_{}.npz'.format(j))) SL.scoring = scoring """ if recompute_best is True: SL.save_cm_from_cv( os.path.join( self.path_to_save_models, str(j)), resampler=resampler, n_jobs=n_jobs ) """ if need_fit is True: for idx in range( n_features_to_test): # at each loop, remove best candidate if self.verbose: pB.add_position() SL = SuperLearner( classifier=self.classifier, param_grid=self.param_grid, n_jobs=n_jobs, verbose=self.verbose_gridsearch) curX = self._transform_in_fit(self.X, idx) if standardize is False: scale = False else: scale = True curX, y, group=group, standardize=scale, scoring=self.scoring, resPerFeatures.append(SL) all_scores = [np.amax(SL.model.best_score_) for SL in resPerFeatures] best_candidate = np.argmax(all_scores) # self._bestSLs.append(resPerFeatures[best_candidate]) SL = resPerFeatures[best_candidate] if self.path_to_save_models: if not os.path.exists(os.path.join( self.path_to_save_models, str(j))): os.makedirs(os.path.join( self.path_to_save_models, str(j))) SL.save_model( os.path.join( self.path_to_save_models, 'model_{}.npz'.format(j))) SL.save_cm_from_cv( os.path.join( self.path_to_save_models, str(j)), resampler=resampler, n_jobs=n_jobs) if self.n_comp == 1: bandidx = np.where(self.mask == 1)[0].reshape(-1, 1) else: bandidx = np.arange( 0, self.mask.shape[0], self.n_comp).reshape(-1, 1) bandidx = np.int32( bandidx[np.in1d(self.mask[bandidx], 1)] / self.n_comp) scoreWithIdx = np.hstack((bandidx, np.asarray( all_scores, dtype=np.float32).reshape(-1, 1))) if self.path_to_save_models: np.savetxt(all_scores_file, scoreWithIdx, fmt='%0.d,%.4f') self.models_path_.append( os.path.join( self.path_to_save_models, 'model_{}.npz'.format(j))) else: self.models_.append(resPerFeatures[best_candidate]) # store results best_feature_id = int( self._get_feature_id(best_candidate) / self.n_comp) self.best_scores_.append(all_scores[best_candidate]) self.best_features_.append(best_feature_id) self.best_idx_ = np.argmax(self.best_scores_) if self.verbose: push_feedback( '\nBest feature with %s feature(s) : %s' % (j + 1, best_feature_id)) push_feedback('Best mean score : %s' % np.amax(all_scores)) self.subsets_[ str(j)] = dict( avg_score=np.amax(all_scores), feature_idx=self.best_features_.copy(), cv_score=SL.model.cv_results_, best_score_=np.amax(all_scores), best_feature_=best_feature_id) self._maskIdx(best_candidate)
[docs] def predict(self, X, idx): """ Predict in raster using the best features. Parameters ----------- X : np.ndarray. The array to predict. Must have the same number of bands of the initial array/raster. idx : int. The combination (from 0). """ self._reset_mask() if idx == 'best': idx = self.best_idx_ if self.path_to_save_models is False: SL = self.models_[idx] else: SL = SuperLearner( classifier=self.classifier, param_grid=self.param_grid, n_jobs=1, verbose=self.verbose_gridsearch) SL.load_model(self.models_path_[idx]) SL.customize_array(self.transform, idx=idx, customizeX=True) return SL.predict_array(X)
[docs] def predict_best_combination( self, in_image, out_image, in_image_mask=False, higher_confidence=False): """ Predict in raster using the best features. Parameters ----------- in_image : str. A filename or path of a raster file. It could be any file that GDAL can open. out_image : str. A geotiff extension filename corresponding to a raster image to create. in_image_mask : str or False, optional (default=False). Path to a geotiff extension filename corresponding to a raster image to create. higher_confidence : str or False, optional (default=False). Path to a geotiff extension filename corresponding to a raster image to create. """ self._reset_mask() push_feedback('Predict with combination ' + str(self.best_idx_)) if self.path_to_save_models is False: SL = self.models_[self.best_idx_] else: SL = SuperLearner(classifier=False, n_jobs=1, verbose=self.verbose) SL.load_model(self.models_path_[self.best_idx_]) SL.customize_array(self.transform, idx=self.best_idx_, customizeX=True) SL.predict_image(in_image, out_image, in_image_mask=in_image_mask, higher_confidence=higher_confidence)
[docs] def predict_images(self, in_image, out_image_prefix, in_image_mask=False, higher_confidence=False): """ Predict each best found features with,y). Parameters ---------- in_image : str. Path of the raster to predict. out_image_prefix : str. Prefix of each raster to save. Will add in suffix the iteration number then .tif. E.g. outRasterPrefix = `classification_`, will give `classification_0.tif` for the first prediction. in_image_mask : str or False, optional (default=False). Path to the image mask where 0 values are masked data. higher_confidence : False or str. Default False. If str, same as outRasterPrefix. """ self._reset_mask() for idx, model in enumerate(self.models_): if self.path_to_save_models is False: SL = self.models_[idx] else: SL = SuperLearner(n_jobs=1, verbose=self.verbose) SL.load_model(model) SL.customize_array(self.transform, idx=idx, customizeX=True) out_image = out_image_prefix + str(idx) + '.tif' SL.predict_image( in_image, out_image, higher_confidence=higher_confidence, in_image_mask=in_image_mask)
[docs] def customize_array(self, xFunction, **kwargs): self.xFunction = xFunction self.xKwargs = kwargs
def _transform_in_fit(self, X, idx=0, customizeX=False): mask = np.copy(self.mask) if customizeX is False: fieldsToKeep = self._convertIdxToNComp(idx) mask[fieldsToKeep] = 0 X = X[:, ~mask] if customizeX is True: self.mask[self.best_features_[idx]] = 0 X = X[:, ~self.mask] X = _reshape_ndim(X) return X
[docs] def transform(self, X, idx=0, customizeX=False): """ Parameters ---------- idx : int, or str with 'best'. The idx to return X array """ self._reset_mask() self.best_idx_ = np.argmax(self.best_scores_) if idx == 'best': idx = self.best_idx_ if self.n_comp > 1: for candidate in range(idx + 1): if candidate <= idx: idxToMask = [ self.best_features_[candidate] * self.n_comp + i for i in range( self.n_comp)] self.mask[idxToMask] = 0 else: self.mask[self.best_features_[:idx + 1]] = 0 if self.xFunction: X = self.xFunction(X, **self.xKwargs) if customizeX is False: # fieldsToKeep = self.__convertIdxToNComp(idx) X = X[:, ~self.mask] # X = np.hstack((X[:, ~self.mask], xToStack)) if customizeX is True: # if self.n_comp>1: # idxToMask = [self.best_features_[candidate]*self.n_comp + i for i in range(self.n_comp)] # self.mask[idxToMask] = 0 # else: # self.mask[self.best_features_[idx]] = 0 X = X[:, ~self.mask] X = _reshape_ndim(X) return X
def _get_feature_id(self, candidate): """ """ return np.where(self.mask == 1)[0][candidate * self.n_comp] def _convertIdxToNComp(self, idx): """ """ idxUnmask = self._get_feature_id(idx) n_features_to_get = [idxUnmask + j for j in range(self.n_comp)] return n_features_to_get def _maskIdx(self, idx): """ Add the idx to the mask """ self.mask[self._convertIdxToNComp(idx)] = 0 def _reset_mask(self): """ """ self.mask[:] = 1
[docs] def get_best_model(self, clone=False): self.best_idx_ = np.argmax(self.best_scores_) if self.path_to_save_models: SL = SuperLearner(classifier=None, n_jobs=1, verbose=self.verbose) SL.load_model(self.models_path_[self.best_idx_]) else: SL = self.models_[self.best_idx_] return SL