Skip to content

corrections module

hydrafloods.corrections

illumination_correction(image, elevation, model='rotation', scale=90, sensor='LC8')

This function applies a terrain correction to optical imagery based on solar and viewing geometry

Parameters:

Name Type Description Default
image ee.Image

Optical image to perform correction on

required
elevation ee.Image

Input DEM to calculate illumination corrections from

required
model str

correction model to be applied. Options are 'cosine', 'c', 'scsc', or 'rotation' default = rotation

'rotation'
scale int

reduction scale to process satellite heading compared to ground. Increasing will reduce chance of OOM errors but reduce local scale correction accuracy. default = 90

90
sensor str

name of sensor to correct. options are 'LC8' or 'S2' (lower case also accepted). default = LC8

'LC8'

Returns:

Type Description
ee.Image

illumination corrected optical imagery

Exceptions:

Type Description
NotImplementedError

when keyword sensor is not of 'LC8' or 'S2'

NotImplementedError

when keyword model is not of 'cosine', 'c', 'scsc', or 'rotation'

Source code in hydrafloods/corrections.py
@decorators.keep_attrs
def illumination_correction(image, elevation, model="rotation", scale=90, sensor="LC8"):
    """This function applies a terrain correction to optical imagery based on solar and viewing geometry

    args:
        image (ee.Image): Optical image to perform correction on
        elevation (ee.Image): Input DEM to calculate illumination corrections from
        model (str, optional): correction model to be applied. Options are 'cosine', 'c', 'scsc', or 'rotation'
            default = rotation
        scale (int, optional): reduction scale to process satellite heading compared to ground. Increasing will reduce
            chance of OOM errors but reduce local scale correction accuracy. default = 90
        sensor (str, optional): name of sensor to correct. options are 'LC8' or 'S2' (lower case also accepted).
            default = LC8

    returns:
        ee.Image: illumination corrected optical imagery

    raises:
        NotImplementedError: when keyword sensor is not of 'LC8' or 'S2'
        NotImplementedError: when keyword model is not of 'cosine', 'c', 'scsc', or 'rotation'
    """

    def _get_band_coeffs(band_name):
        """Closure function to find illumination correction fit across the different bands

        args:
            band_name (str | ee.String): band name to find correction coefficients for
        """

        #  Create the image to apply the linear regression.The first band
        # is the cosi and the second band is the response variable, the reflectance (the bands).
        # L (y) = a + b*cosi(x); a = intercept, b = slope
        #  Dependent: Reflectance
        y = image.select([band_name])
        #  create an image with the three variables by concatenating them
        reg_image = ee.Image.cat([cosi, one, y])
        #  specify the linear regression reducer
        lr_reducer = ee.Reducer.linearRegression(numX=2, numY=1)
        #  fit the model
        fit = reg_image.reduceRegion(
            reducer=lr_reducer, geometry=image.geometry(), scale=scale, maxPixels=1e10
        )

        #  Get the coefficients as a nested list, cast it to an array, and get
        #  just the selected column
        slope = ee.Array(fit.get("coefficients")).get([0, 0])
        intercept = ee.Array(fit.get("coefficients")).get([1, 0])

        return ee.List([slope, intercept])

    if sensor.lower() == "lc8":
        sz_property = "SOLAR_ZENITH_ANGLE"
        sa_property = "SOLAR_AZIMUTH_ANGLE"
    elif sensor.lower() == "s2":
        sz_property = "MEAN_SOLAR_ZENITH_ANGLE"
        sa_property = "MEAN_SOLAR_AZIMUTH_ANGLE"
    else:
        raise NotImplementedError(
            f"Selected sensor, {sensor}, is not available. Options are 'LC8' or 'S2' (lower case also accepted)"
        )

    # value convert angle to radians
    to_radians = ee.Number((math.pi / 180))
    # constand image of 1
    one = ee.Image.constant(1).rename("one")

    # calculate terrain info from elevation data
    terrain = ee.Algorithms.Terrain(elevation)
    # Extract slope in radians for each pixel in the image
    p = terrain.select(["slope"]).multiply(to_radians)
    # Extract aspect in radians for each pixel in the image
    o = terrain.select(["aspect"]).multiply(to_radians)
    # Extract solar zenith angle from the image
    z = ee.Image.constant(ee.Number(image.get(sz_property)).multiply(to_radians))
    # Extract solar azimuth from the image
    az = ee.Image.constant(ee.Number(image.get(sa_property)).multiply(to_radians))

    cosao = (o.subtract(az)).cos()
    # cos(ϕa−ϕo)
    # Calculate the cosine of the local solar incidence for every pixel in the image in radians (cosi=cosp*cosz+sinp*sinz*cos(ϕa−ϕo)
    cosi = image.expression(
        "((cosp * cosz) + (sinp * sinz * cosao))",
        {
            "cosp": p.cos(),
            "cosz": z.cos(),
            "sinp": p.sin(),
            "sinz": z.sin(),
            "cosao": cosao,
        },
    )

    if model == "cosine":
        # if cosine model correction, return early as we don't need to do extra processing
        return image.expression(
            "((image * cosz) / cosi) ", {"image": image, "cosz": z.cos(), "cosi": cosi}
        )

    bnames = image.bandNames()
    ab = ee.Array(bnames.map(_get_band_coeffs))

    # get the coefficients as images
    a = ee.Image(ee.Array(ab.slice(1, 0, 1))).arrayProject([0]).arrayFlatten([bnames])
    b = ee.Image(ee.Array(ab.slice(1, 1, 2))).arrayProject([0]).arrayFlatten([bnames])
    C = b.divide(a)

    if model == "c":
        newimage = image.expression(
            "((image * (cosz + C)) / (cosi + C))",
            {"image": image, "cosz": z.cos(), "cosi": cosi, "C": C},
        )

    elif model == "scsc":
        newimage = image.expression(
            "((image * ((cosp * cosz) + C))/(cosi + C))",
            {"image": image, "cosp": p.cos(), "cosz": z.cos(), "cosi": cosi, "C": C},
        )

    elif model == "rotation":
        # Apply the empirical rotation model
        newimage = image.expression(
            "image - a * (cosi - cosz)",
            {"image": image, "cosz": z.cos(), "cosi": cosi, "a": a},
        )

    else:
        raise NotImplementedError(
            f"Defined model, {model}, has not been implemented. Options are 'cosine', 'c', 'scsc', or 'rotation'"
        )

    return newimage

slope_correction(image, elevation, model='volume', buffer=0, scale=1000, in_units='db', out_units='same')

This function applies the slope correction on a Sentinel-1 image. Function based on https:# doi.org/10.3390/rs12111867. Adapted from https:# github.com/ESA-PhiLab/radiometric-slope-correction/blob/master/notebooks/1%20-%20Generate%20Data.ipynb

Parameters:

Name Type Description Default
image ee.Image

Sentinel-1 to perform correction on

required
elevation ee.Image

Input DEM to calculate slope corrections from

required
model str

physical reference model to be applied. Options are 'volume' or 'surface'. default = volume

'volume'
buffer int

buffer in meters for layover/shadow mask. If zero then no buffer will be applied. default = 0

0
scale int

reduction scale to process satellite heading compared to ground. Increasing will reduce chance of OOM errors but reduce local scale correction accuracy. default = 1000

1000

Returns:

Type Description
ee.Image

slope corrected SAR imagery with look and local incidence angle bands

Exceptions:

Type Description
NotImplementedError

when keyword model is not of 'volume' or 'surface'

Source code in hydrafloods/corrections.py
@decorators.keep_attrs
def slope_correction(
    image,
    elevation,
    model="volume",
    buffer=0,
    scale=1000,
    in_units="db",
    out_units="same",
):
    """This function applies the slope correction on a Sentinel-1 image.
    Function based on https:# doi.org/10.3390/rs12111867.
    Adapted from https:# github.com/ESA-PhiLab/radiometric-slope-correction/blob/master/notebooks/1%20-%20Generate%20Data.ipynb

    args:
        image (ee.Image): Sentinel-1 to perform correction on
        elevation (ee.Image): Input DEM to calculate slope corrections from
        model (str, optional): physical reference model to be applied. Options are 'volume' or 'surface'.
            default = volume
        buffer (int, optional): buffer in meters for layover/shadow mask. If zero then no buffer will be applied. default = 0
        scale (int, optional): reduction scale to process satellite heading compared to ground. Increasing will reduce
            chance of OOM errors but reduce local scale correction accuracy. default = 1000

    returns:
        ee.Image: slope corrected SAR imagery with look and local incidence angle bands

    raises:
        NotImplementedError: when keyword model is not of 'volume' or 'surface'
    """

    def _volumetric_model_SCF(theta_iRad, alpha_rRad):
        """Closure funnction for calculation of volumetric model SCF

        args:
            theta_iRad (ee.Image): incidence angle in radians
            alpha_rRad (ee.Image): slope steepness in range

        returns:
            ee.Image
        """

        # model
        nominator = (ninetyRad.subtract(theta_iRad).add(alpha_rRad)).tan()
        denominator = (ninetyRad.subtract(theta_iRad)).tan()
        return nominator.divide(denominator)

    def _surface_model_SCF(theta_iRad, alpha_rRad, alpha_azRad):
        """Closure funnction for calculation of direct model SCF

        args:
            theta_iRad (ee.Image): incidence angle in radians
            alpha_rRad (ee.Image): slope steepness in range
            alpha_azRad (ee.Image): slope steepness in azimuth

        returns:
            ee.Image
        """

        # model
        nominator = (ninetyRad.subtract(theta_iRad)).cos()
        denominator = alpha_azRad.cos().multiply(
            (ninetyRad.subtract(theta_iRad).add(alpha_rRad)).cos()
        )

        return nominator.divide(denominator)

    def _erode(image, distance):
        """Closure function to buffer raster values

        args:
            image (ee.Image): image that should be buffered
            distance (int): distance of buffer in meters

        returns:
            ee.Image
        """

        d = (
            image.Not()
            .unmask(1)
            .fastDistanceTransform(10)
            .sqrt()
            .multiply(ee.Image.pixelArea().sqrt())
        )

        return image.updateMask(d.gt(distance))

    def _masking(alpha_rRad, theta_iRad, buffer):
        """Closure function for masking of layover and shadow

        args:
            alpha_rRad (ee.Image): slope steepness in range
            theta_iRad (ee.Image): incidence angle in radians
            buffer (int): buffer in meters

        returns:
            ee.Image
        """
        # layover, where slope > radar viewing angle
        layover = alpha_rRad.lt(theta_iRad).rename("layover")

        # shadow
        shadow = alpha_rRad.gt(
            ee.Image.constant(-1).multiply(ninetyRad.subtract(theta_iRad))
        ).rename("shadow")

        # add buffer to layover and shadow
        if buffer > 0:
            layover = _erode(layover, buffer)
            shadow = _erode(shadow, buffer)

        # combine layover and shadow
        no_data_mask = layover.And(shadow).rename("no_data_mask")

        return no_data_mask

    # get the image geometry and projection
    geom = image.geometry(scale)
    proj = image.select(1).projection()
    angle_band = image.select("angle")

    # image to convert angle to radians
    to_radians = ee.Image.constant((math.pi / 180))
    # create a 90 degree image in radians
    ninetyRad = ee.Image.constant(90).multiply(to_radians)

    # calculate the look direction
    heading = (
        ee.Terrain.aspect(image.select("angle"))
        .reduceRegion(ee.Reducer.mean(), geom, scale)
        .get("aspect")
    )

    if in_units not in ["db", "power"]:
        raise ValueError(
            "could not understand input units. needs to be either 'db' or 'power'"
        )

    if out_units not in ["same", "other", None]:
        raise ValueError(
            "could not understand output units option. needs to be either 'same' or 'other'"
        )

    # Sigma0 to Power of input image
    if in_units == "db":
        sigma0Pow = geeutils.db_to_power(image)
    else:
        sigma0Pow = image

    # the numbering follows the article chapters
    # 2.1.1 Radar geometry
    theta_iRad = image.select("angle").multiply(to_radians)
    phi_iRad = ee.Image.constant(heading).multiply(to_radians)

    # 2.1.2 Terrain geometry
    alpha_sRad = (
        ee.Terrain.slope(elevation)
        .select("slope")
        .multiply(to_radians)
        .setDefaultProjection(proj)
    )

    phi_sRad = (
        ee.Terrain.aspect(elevation)
        .select("aspect")
        .multiply(to_radians)
        .setDefaultProjection(proj)
    )

    # 2.1.3 Model geometry
    # reduce to 3 angle
    phi_rRad = phi_iRad.subtract(phi_sRad)

    # slope steepness in range (eq. 2)
    alpha_rRad = (alpha_sRad.tan().multiply(phi_rRad.cos())).atan()

    # slope steepness in azimuth (eq 3)
    alpha_azRad = (alpha_sRad.tan().multiply(phi_rRad.sin())).atan()

    # local incidence angle (eq. 4)
    theta_liaRad = (
        alpha_azRad.cos().multiply((theta_iRad.subtract(alpha_rRad)).cos())
    ).acos()
    theta_liaDeg = theta_liaRad.multiply(180 / math.pi)

    # 2.2
    # Gamma_nought
    gamma0 = sigma0Pow.divide(theta_iRad.cos())
    gamma0dB = geeutils.db_to_power(gamma0)

    if model == "volume":
        scf = _volumetric_model_SCF(theta_iRad, alpha_rRad)

    elif model == "surface":
        scf = _surface_model_SCF(theta_iRad, alpha_rRad, alpha_azRad)

    else:
        raise NotImplementedError(
            f"Defined model, {model}, has not been implemented. Options are 'volume' or 'surface'"
        )

    # apply model for Gamm0_f
    gamma0_flat = gamma0.divide(scf)

    if out_units == "same" and in_units == "db":
        gamma0_flatDB = geeutils.power_to_db(gamma0_flat).select(["^(V).*"])

    elif out_units != "same" and in_units == "power":
        gamma0_flatDB = geeutils.power_to_db(gamma0_flat).select(["^(V).*"])

    else:
        gamma0_flatDB = gamma0_flat.select(["^(V).*"])

    # calculate layover and shadow mask
    masks = _masking(alpha_rRad, theta_iRad, buffer)

    return (
        gamma0_flatDB.updateMask(masks)
        .addBands(angle_band)
        .addBands(theta_liaDeg.rename("local_inc_angle"))
    )