ml module
hydrafloods.ml
apply_fcnn(image, project_name, model_name, model_kwargs=None, output_probas=False, output_names=None)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
ee.Image |
input image for FCNN model, must have all of the features as bands |
required |
project_name |
str |
cloud project name to reference the model |
required |
model_name |
str |
ai platform model name |
required |
model_kwargs |
dict |
dictionary of keyword arguments to pass to ee.Model. default = None |
None |
output_probas |
bool |
flag to set the output image as class probabilities. If False then the ouput will be a one band output of the classification. default = False |
False |
output_bands |
Iterable |
list of band names to set for the output image |
required |
Source code in hydrafloods/ml.py
@decorators.keep_attrs
def apply_fcnn(
image,
project_name,
model_name,
model_kwargs=None,
output_probas=False,
output_names=None,
):
"""
args:
image (ee.Image): input image for FCNN model, must have all of the features as bands
project_name (str): cloud project name to reference the model
model_name (str): ai platform model name
model_kwargs (dict, optional): dictionary of keyword arguments to pass to ee.Model. default = None
output_probas (bool, optional): flag to set the output image as class probabilities. If False
then the ouput will be a one band output of the classification. default = False
output_bands (Iterable, optional): list of band names to set for the output image
"""
if model_kwargs is None:
model_kwargs = OrderedDict(projectName=project_name, modelName=model_name)
else:
positional = OrderedDict(projectName=project_name, modelName=model_name)
model_kwargs = OrderedDict({**positional, **model_kwargs})
# Load the trained model and use it for prediction.
model = ee.Model.fromAiPlatformPredictor(**model_kwargs)
# run the predictions
predictions = model.predictImage(image.toFloat().toArray())
if output_probas:
if output_names is None:
raise ValueError(
"please provide `output_names` when `ouput_probas` is set to True"
)
output = predictions.arrayFlatten([output_names])
else:
output_names = "classification" if output_names is None else output_names
# find highest probability class
output = predictions.arrayArgmax().arrayFlatten([[output_names]])
return output
apply_feature_pca(fc, eigen_vecs, names, center=None)
Applies Principal component decomposition on features
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
feature collection to caluculate pricipal components from |
required |
eigen_vecs |
ee.Array |
eigen vectors of PCA to transform features |
required |
names |
list[str] |
property names to uses as features in PCA |
required |
center |
ee.Array | None |
Array of mean values to center features. If None then no centering is applies. default = None |
None |
Returns:
Type | Description |
---|---|
ee.FeatureCollection |
feacture collection with new properties within each feature being the principal components |
Source code in hydrafloods/ml.py
def apply_feature_pca(fc, eigen_vecs, names, center=None):
"""Applies Principal component decomposition on features
args:
fc (ee.FeatureCollection): feature collection to caluculate pricipal components from
eigen_vecs (ee.Array): eigen vectors of PCA to transform features
names (list[str]): property names to uses as features in PCA
center (ee.Array | None, optional): Array of mean values to center features. If None then no
centering is applies. default = None
returns:
ee.FeatureCollection: feacture collection with new properties within each feature being the principal components
"""
array_ = ee.Array(
ee.List(
fc.makeArray(names)
.aggregate_array("array")
.map(lambda x: ee.Array(x).toList())
)
)
if center is not None:
centered = array_.subtract(
ee.Array.cat([center], 1).transpose().repeat(0, array_.length().get([0]))
)
else:
centered = array_
pca_arr = eigen_vecs.matrixMultiply(centered.transpose()).transpose()
out_band_names = [f"pc_{i}" for i in range(len(names))]
fc_size = fc.size()
fc_list = fc.toList(fc_size)
fc_pca = ee.FeatureCollection(
ee.List.sequence(0, fc_size.subtract(1)).map(
lambda x: ee.Feature(fc_list.get(x)).set(
ee.Dictionary.fromLists(
out_band_names,
pca_arr.slice(0, x, ee.Number(x).add(1), 1).project([1]).toList(),
)
)
)
)
return fc_pca
apply_image_pca(img, eigen_vecs, names, center=None)
Applies Principal component decomposition on image
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img |
ee.Image |
image to caluculate pricipal components from |
required |
eigen_vecs |
ee.Array |
eigen vectors of PCA to transform features |
required |
names |
list[str] |
band names to uses as features in PCA |
required |
center |
ee.Array | None |
Array of mean values to center features. If None then no centering is applies. default = None |
None |
Returns:
Type | Description |
---|---|
ee.Image |
principal components calculated from image |
Source code in hydrafloods/ml.py
@decorators.keep_attrs
def apply_image_pca(img, eigen_vecs, names, center=None):
"""Applies Principal component decomposition on image
args:
img (ee.Image): image to caluculate pricipal components from
eigen_vecs (ee.Array): eigen vectors of PCA to transform features
names (list[str]): band names to uses as features in PCA
center (ee.Array | None, optional): Array of mean values to center features. If None then no
centering is applies. default = None
returns:
ee.Image: principal components calculated from image
"""
if center is not None:
arrayImage = (
img.select(names)
.subtract(ee.Image.constant(center.toList()))
.toArray()
.toArray(1)
)
else:
arrayImage = img.select(names).toArray().toArray(1)
principalComponents = ee.Image(eigen_vecs).matrixMultiply(arrayImage)
out_band_names = [f"pc_{i}" for i in range(len(names))]
pcaImage = (
principalComponents
# Throw out an an unneeded dimension, [[]] -> [].
.arrayProject([0])
# Make the one band array image a multi-band image, [] -> image.
.arrayFlatten([out_band_names])
)
return pcaImage
calc_feature_pca(fc, names, is_centered=False, method='svd')
Principal component decomposition of features
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
feature collection to caluculate PCA from |
required |
names |
list[str] |
property names to uses as features in PCA |
required |
is_centered |
bool |
boolean to identify if features need to be centered before PCA. False means apply centering. default = False |
False |
method |
str |
the decomposition method for obtaining the eigen vectors and values options are 'svd' or 'eigendecomp'. note: svd is usually faster as is does not need to compute the covariance matrix of input features. default = 'svd' |
'svd' |
Returns:
Type | Description |
---|---|
ee.Array |
eigen vectors of PCA ee.Array: eigen values of PCA ee.Array: mean values of each feature |
Source code in hydrafloods/ml.py
def calc_feature_pca(fc, names, is_centered=False, method="svd"):
"""Principal component decomposition of features
args:
fc (ee.FeatureCollection): feature collection to caluculate PCA from
names (list[str]): property names to uses as features in PCA
is_centered (bool, optional): boolean to identify if features need to be centered before PCA.
False means apply centering. default = False
method (str, optional): the decomposition method for obtaining the eigen vectors and values
options are 'svd' or 'eigendecomp'. note: svd is usually faster as is does not need to
compute the covariance matrix of input features. default = 'svd'
returns:
ee.Array: eigen vectors of PCA
ee.Array: eigen values of PCA
ee.Array: mean values of each feature
"""
array_ = ee.Array(
ee.List(
fc.makeArray(names)
.aggregate_array("array")
.map(lambda x: ee.Array(x).toList())
)
)
center = array_.reduce(ee.Reducer.mean(), [0]).repeat(0, array_.length().get([0]))
if not is_centered:
centered = array_.subtract(center)
else:
centered = array_
if method == "svd":
svd = centered.matrixSingularValueDecomposition()
eigen_vecs = ee.Array(svd.get("V")).transpose()
eigen_vals = ee.Array(svd.get("S")).matrixDiagonal()
elif method == "eigendecomp":
# Compute the covariance of the bands within the region.
covar = centered.transpose().matrixMultiply(centered)
# Perform an eigen analysis and slice apart the values and vectors.
eigens = covar.eigen()
eigen_vecs = eigens.slice(1, 1)
eigen_vals = eigens.slice(1, 0, 1)
else:
raise ValueError(
"could not understand provided method keyword. Options are 'svd' or 'eigendecomp'"
)
out_band_names = [f"pc_{i}" for i in range(len(names))]
return eigen_vecs, eigen_vals, center.slice(0, 0, 1).project([1])
calc_image_pca(image, region=None, scale=90, max_pixels=1000000000.0, method='svd')
Principal component analysis decomposition of image bands
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
ee.Image |
image to apply pca to |
required |
region |
ee.Geometry | None |
region to sample values for covariance matrix,
if set to |
None |
scale |
int |
scale at which to perform reduction operations, setting higher will prevent OOM errors. default = 90 |
90 |
max_pixels |
int |
maximum number of pixels to use in reduction operations. default = 1e9 |
1000000000.0 |
method |
str |
the decomposition method for obtaining the eigen vectors and values options are 'svd' or 'eigendecomp'. note: svd is usually faster as is does not need to compute the covariance matrix of input features. default = 'svd' |
'svd' |
Returns:
Type | Description |
---|---|
ee.Image |
principal components scaled by eigen values |
Source code in hydrafloods/ml.py
def calc_image_pca(image, region=None, scale=90, max_pixels=1e9, method="svd"):
"""Principal component analysis decomposition of image bands
args:
image (ee.Image): image to apply pca to
region (ee.Geometry | None, optional): region to sample values for covariance matrix,
if set to `None` will use img.geometry(). default = None
scale (int, optional): scale at which to perform reduction operations, setting higher will prevent OOM errors. default = 90
max_pixels (int, optional): maximum number of pixels to use in reduction operations. default = 1e9
method (str, optional): the decomposition method for obtaining the eigen vectors and values
options are 'svd' or 'eigendecomp'. note: svd is usually faster as is does not need to
compute the covariance matrix of input features. default = 'svd'
returns:
ee.Image: principal components scaled by eigen values
"""
bandNames = image.bandNames()
out_band_names = ee.List.sequence(1, bandNames.length()).map(
lambda x: ee.String("pc_").cat(ee.Number(x).int())
)
# Mean center the data to enable a faster covariance reducer
# and an SD stretch of the principal components.
meanDict = image.reduceRegion(
reducer=ee.Reducer.mean(), geometry=region, scale=scale, maxPixels=max_pixels
)
means = ee.Image.constant(meanDict.values(bandNames))
centered = image.subtract(means)
# Collapse the bands of the image into a 1D array per pixel.
arrays = centered.toArray()
if method == "svd":
svd = arrays.toArray(1).matrixSingularValueDecomposition()
eigen_vecs = svd.select("V") # .arrayTranspose()
eigen_vals = svd.select("S").matrixDiagonal()
elif method == "eigendecomp":
# Compute the covariance of the bands within the region.
covar = arrays.reduceRegion(
reducer=ee.Reducer.centeredCovariance(),
geometry=region,
scale=scale,
maxPixels=max_pixels,
)
# Get the 'array' covariance result and cast to an array.
# This represents the band-to-band covariance within the region.
covarArray = ee.Array(covar.get("array"))
# Perform an eigen analysis and slice apart the values and vectors.
eigens = covarArray.eigen()
# This is a P-length vector of Eigenvalues.
eigenValues = eigens.slice(1, 0, 1)
# This is a PxP matrix with eigenvectors in rows.
eigenVectors = eigens.slice(1, 1)
# Convert the array image to 2D arrays for matrix computations.
arrayImage = arrays.toArray(1)
# Left multiply the image array by the matrix of eigenvectors.
principalComponents = ee.Image(eigenVectors).matrixMultiply(arrayImage)
# Turn the square roots of the Eigenvalues into a P-band image.
sdImage = (
ee.Image(eigenValues.sqrt()).arrayProject([0]).arrayFlatten([out_band_names])
)
# Turn the PCs into a P-band image, normalized by SD.
return (
principalComponents
# Throw out an an unneeded dimension, [[]] -> [].
.arrayProject([0])
# Make the one band array image a multi-band image, [] -> image.
.arrayFlatten([out_band_names])
# Normalize the PCs by their SDs.
.divide(sdImage)
)
gradient_boosting_ee(n_trees, feature_collection, feature_names, label, scaling=None, mode='classification', shrinkage=0.01, loss='LeastAbsoluteDeviation')
Helper function to scale feature collection and train gradient tree boosting model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_trees |
int |
number of trees for gradient boosting model |
required |
feature_collection |
ee.FeatureCollection |
features to train random forest model |
required |
feature_names |
list[str] |
names of feature columns to use in random forest model (x values) |
required |
label |
str |
name of feature column to fit random forest model (y value) |
required |
scaling |
str | None |
name of scaling to apply before training. One of: "minmax", "standard", |
None |
mode |
str |
The output mode of the random forest model. One of: "classification", "regression", "probability". default = "classification" |
'classification' |
learning_rate |
float,optional |
The shrinkage parameter in (0, 1] controls the learning rate of procedure. default = 0.01 |
required |
loss |
str |
Loss function to be optimized. default = "LeastAbsoluteDeviation" |
'LeastAbsoluteDeviation' |
Source code in hydrafloods/ml.py
def gradient_boosting_ee(
n_trees,
feature_collection,
feature_names,
label,
scaling=None,
mode="classification",
shrinkage=0.01,
loss="LeastAbsoluteDeviation",
):
"""Helper function to scale feature collection and train gradient tree boosting model
args:
n_trees (int): number of trees for gradient boosting model
feature_collection (ee.FeatureCollection): features to train random forest model
feature_names (list[str]): names of feature columns to use in random forest model (x values)
label (str): name of feature column to fit random forest model (y value)
scaling (str | None, optional): name of scaling to apply before training. One of: "minmax", "standard", `None`.
default = `None`
mode (str, optional): The output mode of the random forest model. One of: "classification", "regression",
"probability". default = "classification"
learning_rate (float,optional): The shrinkage parameter in (0, 1] controls the learning rate of procedure. default = 0.01
loss (str, optional): Loss function to be optimized. default = "LeastAbsoluteDeviation"
"""
if scaling == "minmax":
scaling_dict = minmax_scaling_dict(feature_collection, feature_names)
fc_norm = minmax_feature_scaling(
feature_collection, scaling_dict, feature_names
)
elif scaling == "standard":
scaling_dict = standard_scaling_dict(feature_collection, feature_names)
fc_norm = standard_feature_scaling(
feature_collection, scaling_dict, feature_names
)
elif scaling is None:
scaling_dict = None
fc_norm = feature_collection
else:
raise ValueError(
"Could not determine scaling option. Options are ['minmax', 'standard', or None]"
)
classifier = (
ee.Classifier.smileGradientTreeBoost(
numberOfTrees=n_trees, shrinkage=shrinkage, loss=loss
)
.setOutputMode(mode.upper())
.train(fc_norm, label, feature_names)
)
return classifier, scaling_dict
hist_matching(samples, predictor, target, n_estimators=50)
Trains classifiers to perform histogram matching
Parameters:
Name | Type | Description | Default |
---|---|---|---|
samples |
ee.FeatureCollection |
feature collection with samples for histogram matching |
required |
predictor |
str |
column name of values to transform |
required |
target |
str |
column name of values to match |
required |
n_estimators |
int |
number of trees to create random forest models from. default = 50 |
50 |
Returns:
Type | Description |
---|---|
list[ee.Classifier] |
list of classifiers with first element being the val to proba and second being proba to val classifiers |
Source code in hydrafloods/ml.py
def hist_matching(samples, predictor, target, n_estimators=50):
"""Trains classifiers to perform histogram matching
args:
samples (ee.FeatureCollection): feature collection with samples for histogram matching
predictor (str): column name of values to transform
target (str): column name of values to match
n_estimators (int, optional): number of trees to create random forest models from. default = 50
returns:
list[ee.Classifier]: list of classifiers with first element being the val to proba and second being proba to val classifiers
"""
def get_cdf(fc, column):
def array_to_features(l):
return ee.Feature(
None, {column: ee.List(l).get(0), "probability": ee.List(l).get(1)}
)
# Histogram equalization start:
histo = ee.Dictionary(
fc.reduceColumns(
ee.Reducer.histogram(
maxBuckets=2 ** 12,
),
[column],
).get("histogram")
)
valsList = ee.List(histo.get("bucketMeans"))
freqsList = ee.List(histo.get("histogram"))
cdfArray = ee.Array(freqsList).accum(0)
total = cdfArray.get([-1])
normalizedCdf = cdfArray.divide(total)
array = ee.Array.cat([valsList, normalizedCdf], 1)
return ee.FeatureCollection(array.toList().map(array_to_features))
pred_cdf = get_cdf(samples, predictor)
target_cdf = get_cdf(samples, target)
proba_to_val = (
ee.Classifier.smileRandomForest(n_estimators)
.setOutputMode("REGRESSION")
.train(
features=target_cdf, classProperty=target, inputProperties=["probability"]
)
)
val_to_proba = (
ee.Classifier.smileRandomForest(n_estimators)
.setOutputMode("REGRESSION")
.train(
features=pred_cdf, classProperty="probability", inputProperties=[predictor]
)
)
return val_to_proba, proba_to_val
minmax_feature_scaling(fc, scaling_dict, feature_names)
Function to apply min/max scaling to feature collection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
feature collection to scale |
required |
scaling_dict |
ee.Dictionary |
dictionary of min/max values to scale to |
required |
feature_names |
list[str] |
names of feature columns to calculate apply scaling to |
required |
Returns:
Type | Description |
---|---|
ee.FeatureCollection |
scaled feature collection |
Source code in hydrafloods/ml.py
def minmax_feature_scaling(fc, scaling_dict, feature_names):
"""Function to apply min/max scaling to feature collection
args:
fc (ee.FeatureCollection): feature collection to scale
scaling_dict (ee.Dictionary): dictionary of min/max values to scale to
feature_names (list[str]): names of feature columns to calculate apply scaling to
returns:
ee.FeatureCollection: scaled feature collection
"""
def feature_scaling(feature):
"""Nested closure function to apply scaling on each column in each feature"""
def iter_cols(i):
"""Loops through feature columns"""
i = ee.String(i)
v = ee.Number(feature.get(i))
minv = ee.Number(scaling_dict.get(i.cat("_min")))
maxv = ee.Number(scaling_dict.get(i.cat("_max")))
return v.subtract(minv).divide(maxv.subtract(minv))
# apply scaling on each column of feature
scaled = ee_feature_names.map(iter_cols)
# get a dictionary of new values with old feature names
newVals = ee.Dictionary.fromLists(ee_feature_names, scaled)
# set feature columns new values
return feature.set(newVals)
# force ee types
fc = ee.FeatureCollection(fc)
ee_feature_names = ee.List(feature_names)
# normalize the features in the entire featureCollection
fc_norm = fc.map(feature_scaling)
return fc_norm
minmax_image_scaling(image, scaling_dict, feature_names)
Function to scale image between min/max values Expects that scaling_dict keys match bands
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
ee.Image |
image to scale |
required |
scaling_dict |
ee.Dictionary |
dictionary of min/max values to scale to |
required |
returns ee.Image: scaled image
Source code in hydrafloods/ml.py
@decorators.keep_attrs
def minmax_image_scaling(image, scaling_dict, feature_names):
"""Function to scale image between min/max values
Expects that scaling_dict keys match bands
args:
image (ee.Image): image to scale
scaling_dict (ee.Dictionary): dictionary of min/max values to scale to
returns
ee.Image: scaled image
"""
# get dict as image
scaling_img = scaling_dict.toImage()
# extract the min/max values per band
min_img = scaling_img.select(".*_min")
max_img = scaling_img.select(".*_max")
# apply scaling
return (
image.select(sorted(feature_names))
.subtract(min_img)
.divide(max_img.subtract(min_img))
.float()
)
minmax_scaling_dict(fc, feature_names)
Function to calculate the minimum and maximum values of feautures in a collection Expects that fc has all feature names
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
feature collection with the features used to calculate min/max value |
required |
feature_names |
list[str] |
names of feature columns to calculat min/max values from |
required |
returns ee.Dictionary: dictionary of minimum and maximum values for each feature name
Source code in hydrafloods/ml.py
def minmax_scaling_dict(fc, feature_names):
"""Function to calculate the minimum and maximum values of feautures in a collection
Expects that fc has all feature names
args:
fc (ee.FeatureCollection): feature collection with the features used to calculate min/max value
feature_names (list[str]): names of feature columns to calculat min/max values from
returns
ee.Dictionary: dictionary of minimum and maximum values for each feature name
"""
# force ee types
fc = ee.FeatureCollection(fc)
ee_feature_names = ee.List(feature_names)
# apply reducer on each feature column
feature_min_max = fc.reduceColumns(
ee.Reducer.minMax().repeat(ee_feature_names.length()), ee_feature_names
)
# min/max feature names
names = ee_feature_names.map(
lambda x: ee.List([ee.String(x).cat("_min"), ee.String(x).cat("_max")])
).flatten()
# get the min/max values for each feature
# used to scale values from 0-1
min_max_dict = ee.Dictionary.fromLists(
names,
ee.List(feature_min_max.get("min")).zip(feature_min_max.get("max")).flatten(),
)
return min_max_dict
onehot_feature_encoding(fc, column_name, classes, class_names=None)
Function to calculate one-hot encoded columns from categorial columns where each new column equals 1 where the class value is the column index
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
Feature collection with categorial data to encode |
required |
column_name |
str | ee.String |
name of column that is categorial to encode |
required |
classes |
list[int] | ee.List |
list of class values to encode |
required |
!!! kwargs class_names (ee.List, optional): list of names to rename output bands. if None then bands will be named b0, b1, ...,bn. default = None
Returns:
Type | Description |
---|---|
ee.Image |
Feature collection with one-hot encoded columns with n new column as n classes |
Source code in hydrafloods/ml.py
def onehot_feature_encoding(fc, column_name, classes, class_names=None):
"""Function to calculate one-hot encoded columns from categorial columns
where each new column equals 1 where the class value is the column index
args:
fc (ee.FeatureCollection): Feature collection with categorial data to encode
column_name (str | ee.String): name of column that is categorial to encode
classes (list[int] | ee.List): list of class values to encode
kwargs:
class_names (ee.List, optional): list of names to rename output bands.
if None then bands will be named b0, b1, ...,bn. default = None
returns:
ee.Image: Feature collection with one-hot encoded columns with n new column as n classes
"""
def feature_encoding(feature):
c = ee.Number(feature.get(column_name))
encoded = classes.map(lambda x: c.eq(ee.Number(x)))
new_cols = ee.Dictionary.fromLists(class_names, encoded)
return feature.set(new_cols)
if class_names is None:
class_names = ee.List.sequence(0, classes.length()).map(
lambda x: ee.String("b").cat(ee.String(x))
)
fc_encoded = fc.map(feature_encoding)
return fc_encoded
onehot_image_encoding(img, classes, class_names=None, band=None)
Function to convert an categorial image image to one-hot encoded image where each new band equals 1 where the class value is the band index
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img |
ee.Image |
categorical image to encode |
required |
classes |
list[int] | ee.List |
list of class values to encode |
required |
!!! kwargs class_names (ee.List, optional): list of names to rename output bands. if None then bands will be named b0, b1, ...,bn. default = None band (str | ee.String, optional): name of band from input image to endcode. if None then the first band is used. default = None
Returns:
Type | Description |
---|---|
ee.Image |
one-hot encoded image with n bands as n classes |
Source code in hydrafloods/ml.py
@decorators.keep_attrs
def onehot_image_encoding(img, classes, class_names=None, band=None):
"""Function to convert an categorial image image to one-hot encoded image
where each new band equals 1 where the class value is the band index
args:
img (ee.Image): categorical image to encode
classes (list[int] | ee.List): list of class values to encode
kwargs:
class_names (ee.List, optional): list of names to rename output bands.
if None then bands will be named b0, b1, ...,bn. default = None
band (str | ee.String, optional): name of band from input image to endcode.
if None then the first band is used. default = None
returns:
ee.Image: one-hot encoded image with n bands as n classes
"""
classes = ee.List(classes)
if class_names is None:
class_names = ee.List.sequence(0, classes.length()).map(
lambda x: ee.String("b").cat(ee.String(x))
)
if band is None:
img = img.select([0])
else:
img = img.select(band)
encoded_imgs = classes.map(lambda x: img.eq(ee.Number(x)))
return (
ee.ImageCollection.fromImages(ee.List(encoded_imgs))
.toBands()
.rename(class_names)
)
random_forest_ee(n_trees, feature_collection, feature_names, label, scaling=None, mode='classification', min_samples_leaf=1)
Helper function to scale feature collection and train random forest model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_trees |
int |
number of trees for random forest model |
required |
feature_collection |
ee.FeatureCollection |
features to train random forest model |
required |
feature_names |
list[str] |
names of feature columns to use in random forest model (x values) |
required |
label |
str |
name of feature column to fit random forest model (y value) |
required |
scaling |
str | None |
name of scaling to apply before training. One of: "minmax", "standard", |
None |
mode |
str |
The output mode of the random forest model. One of: "classification", "regression", "probability". default = "classification" |
'classification' |
Source code in hydrafloods/ml.py
def random_forest_ee(
n_trees,
feature_collection,
feature_names,
label,
scaling=None,
mode="classification",
min_samples_leaf=1,
):
"""Helper function to scale feature collection and train random forest model
args:
n_trees (int): number of trees for random forest model
feature_collection (ee.FeatureCollection): features to train random forest model
feature_names (list[str]): names of feature columns to use in random forest model (x values)
label (str): name of feature column to fit random forest model (y value)
scaling (str | None, optional): name of scaling to apply before training. One of: "minmax", "standard", `None`.
default = `None`
mode (str, optional): The output mode of the random forest model. One of: "classification", "regression",
"probability". default = "classification"
"""
if scaling == "minmax":
scaling_dict = minmax_scaling_dict(feature_collection, feature_names)
fc_norm = minmax_feature_scaling(
feature_collection, scaling_dict, feature_names
)
elif scaling == "standard":
scaling_dict = standard_scaling_dict(feature_collection, feature_names)
fc_norm = standard_feature_scaling(
feature_collection, scaling_dict, feature_names
)
elif scaling is None:
scaling_dict = None
fc_norm = feature_collection
else:
raise ValueError(
"Could not determine scaling option. Options are ['minmax', 'standard', or None]"
)
classifier = (
ee.Classifier.smileRandomForest(n_trees, minLeafPopulation=min_samples_leaf)
.setOutputMode(mode.upper())
.train(fc_norm, label, feature_names)
)
return classifier, scaling_dict
standard_feature_scaling(fc, scaling_dict, feature_names)
Function to apply standard (Z-score) scaling to feature collection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
feature collection to scale |
required |
scaling_dict |
ee.Dictionary |
dictionary of mean/std dev values for scaling |
required |
feature_names |
list[str] |
names of feature columns to calculate apply scaling to |
required |
Returns:
Type | Description |
---|---|
ee.FeatureCollection |
scaled feature collection |
Source code in hydrafloods/ml.py
def standard_feature_scaling(fc, scaling_dict, feature_names):
"""Function to apply standard (Z-score) scaling to feature collection
args:
fc (ee.FeatureCollection): feature collection to scale
scaling_dict (ee.Dictionary): dictionary of mean/std dev values for scaling
feature_names (list[str]): names of feature columns to calculate apply scaling to
returns:
ee.FeatureCollection: scaled feature collection
"""
def feature_scaling(feature):
"""Nested closure function to apply scaling on each column in each feature"""
def iter_cols(i):
"""Loops through feature columns"""
i = ee.String(i)
v = ee.Number(feature.get(i))
mean = ee.Number(scaling_dict.get(i.cat("_mean")))
stddev = ee.Number(scaling_dict.get(i.cat("_stdDev")))
return v.subtract(mean).divide(stddev)
# apply scaling on each column of feature
scaled = ee_feature_names.map(iter_cols)
# get a dictionary of new values with old feature names
newVals = ee.Dictionary.fromLists(ee_feature_names, scaled)
# set feature columns new values
return feature.set(newVals)
# force ee types
fc = ee.FeatureCollection(fc)
ee_feature_names = ee.List(feature_names)
# normalize the features in the entire featureCollection
fc_norm = fc.map(feature_scaling)
return fc_norm
standard_image_scaling(image, scaling_dict, feature_names)
Function to apply z-score scaling to image Expects that scaling_dict keys match bands
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
ee.Image |
image to scale |
required |
scaling_dict |
ee.Dictionary |
dictionary of mean/std dev values to scale to |
required |
returns ee.Image: scaled image
Source code in hydrafloods/ml.py
@decorators.keep_attrs
def standard_image_scaling(image, scaling_dict, feature_names):
"""Function to apply z-score scaling to image
Expects that scaling_dict keys match bands
args:
image (ee.Image): image to scale
scaling_dict (ee.Dictionary): dictionary of mean/std dev values to scale to
returns
ee.Image: scaled image
"""
# get dict as image
scaling_img = scaling_dict.toImage()
# extract the min/max values per band
mean_img = scaling_img.select(".*_mean")
stddev_img = scaling_img.select(".*_stdDev")
# apply scaling
return (
image.select(sorted(feature_names))
.subtract(mean_img)
.divide(stddev_img)
.float()
)
standard_scaling_dict(fc, feature_names)
Function to calculate the mean and standard deviation values of feautures in a collection Expects that fc has all feature names
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fc |
ee.FeatureCollection |
feature collection with the features used to calculate mean/std dev value |
required |
feature_names |
list[str] |
names of feature columns to calculat mean/std dev values from |
required |
returns ee.Dictionary: dictionary of mean and standard deviation values for each feature name
Source code in hydrafloods/ml.py
def standard_scaling_dict(fc, feature_names):
"""Function to calculate the mean and standard deviation values of feautures in a collection
Expects that fc has all feature names
args:
fc (ee.FeatureCollection): feature collection with the features used to calculate mean/std dev value
feature_names (list[str]): names of feature columns to calculat mean/std dev values from
returns
ee.Dictionary: dictionary of mean and standard deviation values for each feature name
"""
# force ee types
fc = ee.FeatureCollection(fc)
ee_feature_names = ee.List(feature_names)
# get a combined reducer for caluclating mean and standard dev
mean_stddev = ee.Reducer.mean().combine(ee.Reducer.stdDev(), None, True)
# apply reducer on each feature column
feature_mean_stddev = fc.reduceColumns(
mean_stddev.repeat(ee_feature_names.length()), ee_feature_names
)
# mean / std dev feature names
names = ee_feature_names.map(
lambda x: ee.List([ee.String(x).cat("_mean"), ee.String(x).cat("_stdDev")])
).flatten()
# get the mean / std dev values for each feature
# used to scale values from ~ -3 to 3
mean_stddev_dict = ee.Dictionary.fromLists(
names,
ee.List(feature_mean_stddev.get("mean"))
.zip(feature_mean_stddev.get("stdDev"))
.flatten(),
)
return mean_stddev_dict
unsupervised_rf(n_trees, samples, features=None, rank_feature=None, ranking='min')
Unserpersived machine learning workflow to classify water Methods similar to: https://doi.org/10.1016/j.rse.2020.112209
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_trees |
int |
number of trees to creat random forest model for class generalization |
required |
samples |
ee.FeatureCollection |
input samples to create water classifier for |
required |
features |
list | ee.List |
property names from samples to use for the semi supervised classification, If none then all properties are used. default = None |
None |
rank_feature |
str |
property name used to rank which unserpervised class is water. If None then first band name in |
None |
ranking |
str |
method to rank the classes by |
'min' |
Returns:
Type | Description |
---|---|
ee.Classifier.RandomForest |
random forest classifier to estimate probability that a pixel is water |
Source code in hydrafloods/ml.py
def unsupervised_rf(
n_trees,
samples,
features=None,
rank_feature=None,
ranking="min",
):
"""Unserpersived machine learning workflow to classify water
Methods similar to: https://doi.org/10.1016/j.rse.2020.112209
args:
n_trees (int): number of trees to creat random forest model for class generalization
samples (ee.FeatureCollection): input samples to create water classifier for
features (list | ee.List): property names from samples to use for the semi supervised classification, If none then all properties are used. default = None
rank_feature (str, optional): property name used to rank which unserpervised class is water. If None then first band name in `bands` is used. default = None
ranking (str, optional): method to rank the classes by `rank_band`. Options are 'min' or 'max'. If 'min', then the lowest class mean is considered water. default = 'min'
returns:
ee.Classifier.RandomForest: random forest classifier to estimate probability that a pixel is water
"""
def _cluster_center(x):
return (
feature_arr.mask(classes.eq(ee.Number(x))).reduce(ee.Reducer.mean(), [0])
).get([0])
if features is None:
bands = samples.first().propertyNames()
else:
features = ee.List(features)
if rank_feature is None:
rank_feature = ee.String(features.get(0))
clusterer = ee.Clusterer.wekaXMeans(3, 12, 5).train(samples, features)
samples = samples.cluster(clusterer, "init_classes")
classes = samples.aggregate_array("init_classes")
unique = classes.distinct().sort()
classes = ee.Array(classes)
feature_arr = ee.Array(samples.aggregate_array(rank_feature))
class_means = unique.map(_cluster_center)
if ranking == "min":
ranker = ee.Reducer.min()
elif ranking == "max":
ranker = ee.Reducer.max()
else:
raise NotImplementedError(
"ranking selection is not implemented. options are 'min' or 'max'"
)
ranked_mean = class_means.reduce(ranker)
water_class = class_means.indexOf(ranked_mean)
binary_samples = samples.map(
lambda x: (
ee.Feature(x).set(
"init_classes", ee.Number(x.get("init_classes")).eq(water_class)
)
)
)
classifier = (
ee.Classifier.smileRandomForest(numberOfTrees=n_trees)
.setOutputMode("PROBABILITY")
.train(binary_samples, "init_classes", features)
)
return classifier