from .base_frequency_loss import FrequencyLossFunctionWrapper1D, FrequencyLossFunctionWrapper2D, squeeze_or_expand_to_same_rank
from keras import KerasTensor
from keras import ops
from keras import saving
@saving.register_keras_serializable(package="CustomLosses", name="normalized_error")
def normalized_error(y_true, y_pred):
"""
Normalized error (Perlin & Bustamante, cf. https://doi.org/10.1007/s10665-016-9849-7)
Definition of the Surface Similarity Parameter (SSP) for discrete signals
SSP = norm(y_true - y_pred) / (norm(y_true) + norm(y_pred))
Parameters
----------
y_true : KerasTensor
Ground truth signal
y_pred : KerasTensor
Predicted signal
Returns
-------
ssp : KerasTensor
normalized error (SSP) between 'y_true' and 'y_pred'
"""
numerator = ops.linalg.norm(y_true - y_pred, axis=-1)
denuminator = ops.add(
ops.linalg.norm(y_true, axis=-1),
ops.linalg.norm(y_pred, axis=-1)
)
return ops.divide_no_nan(
numerator,
denuminator
)
[docs]
@saving.register_keras_serializable(package="CustomLosses", name="SSP1D")
class SSP1D(FrequencyLossFunctionWrapper1D):
"""
Surface Similarity Parameter for 1-D signals with an optional lowpass filter.
The frequency filter helps the model to focus on the relevant frequency range without the need to, e.g., remove HF noise in additional preprocessing steps.
There are two lowpass filters to choose from, the `"static"` and the `"adaptive"` lowpass.
The `"static"` lowpass defines a global cut-off frequency at `f_filter`.
The `"adaptive"` lowpass analyzes the ground truth data, extracts the peak frequency, and sets a dynamic cut-off frequency at for each sample.
The parameter `f_filter` becomes a multiplier for the peak frequency, after which the frequency components are suppressed.
The definition of a lowpass {`"static"`, `"adaptive"`} requires a frequency range `f`.
It enables an additional step in the loss calculation, where
(1) the ground truth is transformed via 1-D FFT,
(2) a hard binary lowpass filter is applied to the Fourier spectrum to set all frequencies `f>f_filter` to (0+0j),
(3) the filtered ground truth is transformed back to its initial space.
If `lowpass is None`, the FFT calculation is skipped, and no `f` is required.
This class inherits from keras.losses.Loss and can thus be used directly in keras.Model.compile()
Parameters
----------
lowpass : str, optional {`None`, `"static"`, `"adaptive"`}
Lowpass filter that is applied to the ground truth in order to suppress the higher frequency range `f>f_filter`.
Defaults to `None`.
f : KerasTensor, optional
Frequency range for the data.
Is required once a lowpass {`"static"`, `"adaptive"`} is used.
Defaults to `None`.
f_filter : float, optional
Threshold for the lowpass filter.
With the static lowpass, the ground truth spectrum is set to 0+j0 for `f>f_filter`.
With the adaptive lowpass, the ground truth spectrum is set to 0+j0 for `f>f_filter*f_p`,
where `f_p` is the peak frequency that is automatically derived from the ground truth spectrum.
Defaults to 6.0.
f_min : float, optional
Cap for the lowest peak frequency for cases when the automatic estimation of the peak frequency fails (estimated `f_p<0` or `f_p` is Nan).
Defaults to 0.0.
p : float, optional
Exponent to weigh the spectrum towards the peak frequency (for the estimation of the peak frequency), c.f.
Mansard & Funke, "On the fitting of parametric models to measured wave spectra" (1988), and
Sobey & Young, "Hurricane Wind Waves---A discrete spectral model" (1986), https://ascelibrary.org/doi/10.1061/%28ASCE%290733-950X%281986%29112%3A3%28370%29.
Defaults to 7.0.
decay_start : int, optional
Epoch from which on the lowpass filter is linearly decreased from 0 to `f_filter`.
Defaults to 0.
**Requires `UseLossLowpassDecay` callback to work, cf. Notes**
decay_epochs : int, optional
Number of epochs over which the lowpass filter is linearly decreased from 0 to `f_filter`.
Defaults to 50.
**Requires `UseLossLowpassDecay` callback to work, cf. Notes**
data_format : str, optional {`"channels_last"`, `"channels_first"`}
The ordering of the dimensions in the inputs:
`"channels_last"` corresponds to inputs with shape `(batch_size, *dims, channels)`,
`"channels_first"` corresponds to inputs with shape `(batch_size, channels, *dims)`.
Defaults to `"channels_last"`.
reduction : str, optional {`"sum_over_batch_size"`, `None`, `"auto"`, `"sum"`}
Type of reduction to apply to the loss.
In almost all cases this should be `"sum_over_batch_size"`.
Supported options are `"sum"`, `"sum_over_batch_size"` or `None`.
name : str, optional
Name of the loss function. The name is inhereted from class name if `name=None`.
Defaults to `None`.
Notes
-----
Both the `"adaptive"` and `"static"` lowpass filter can be linearly increased from 0 to `f_filter` over `decay_epoch` epochs, starting at epoch `decay_start`.
For this to work, the training has to be conducted using the `UseLossLowpassDecay` callback, which sets the class variable `self.epoch` to the current training epoch.
Examples
--------
>>> from ssp.keras import SSP1D
>>> from keras import ops
>>> from math import pi
>>> t = ops.arange(0, 2 * pi, 2 * pi / 512)
>>> y1 = ops.sin(t)
>>> y2 = ops.sin(t + pi / 16) # sin with phase shift
>>> ssp = SSP1D()
>>> ops.convert_to_numpy(ssp(y1, y2))
array(0.09801717, dtype=float32)
Usage in an ML training
>>> from ssp.keras import SSP1D
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> t = ops.arange(0, 2 * pi, 2 * pi / 512)
>>> y = ops.expand_dims(ops.sin(t), axis=0) # shape (1, 512)
>>> x = ops.ones((1, 32)) # some input data with shape (1, 32)
>>> model = Sequential([layers.Dense(64), layers.Dense(512)])
>>> model.build(input_shape=x.shape)
>>> model.compile(optimizer="adam", loss=SSP1D())
>>> model.fit(x=x, y=y, epochs=1)
Usage in an ML training with static lowpass *(note that the lowpass does not make sense since we have a harmonic sine as input)*
>>> from ssp.keras import SSP1D
>>> from ssp.keras.ops import fftfreq
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> t = ops.arange(0, 2 * pi, 2 * pi / 512)
>>> f = fftfreq(n=512, d=2 * pi / 512)
>>> y = ops.expand_dims(ops.sin(t), axis=0) # shape (1, 512)
>>> x = ops.ones((1, 32)) # some input data with shape (1, 32)
>>> model = Sequential([layers.Dense(64), layers.Dense(512)])
>>> model.build(input_shape=x.shape)
>>> model.compile(optimizer="adam", loss=SSP1D(lowpass="static", f=f, f_filter=2.0))
>>> model.fit(x=x, y=y, epochs=1)
Usage in an ML training with adaptive lowpass *(note that the lowpass does not make sense since we have a harmonic sine as input)*
>>> from ssp.keras import SSP1D
>>> from ssp.keras.ops import fftfreq
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> t = ops.arange(0, 2 * pi, 2 * pi / 512)
>>> f = fftfreq(n=512, d=2 * pi / 512)
>>> y = ops.expand_dims(ops.sin(t), axis=0) # shape (1, 512)
>>> x = ops.ones((1, 32)) # some input data with shape (1, 32)
>>> model = Sequential([layers.Dense(64), layers.Dense(512)])
>>> model.build(input_shape=x.shape)
>>> model.compile(optimizer="adam", loss=SSP1D(lowpass="adaptive", f=f, f_filter=6.0))
>>> model.fit(x=x, y=y, epochs=1)
Usage in an ML training with adaptive lowpass and decay callback *(note that the lowpass does not make sense since we have a harmonic sine as y)*
>>> from ssp.keras import SSP1D
>>> from ssp.keras.callbacks import UseLossLowpassDecay
>>> from ssp.keras.ops import fftfreq
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> t = ops.arange(0, 2 * pi, 2 * pi / 512)
>>> f = fftfreq(n=512, d=2 * pi / 512)
>>> y = ops.expand_dims(ops.sin(t), axis=0) # shape (1, 512)
>>> x = ops.ones((1, 32)) # some input data with shape (1, 32)
>>> model = Sequential([layers.Dense(64), layers.Dense(512)])
>>> model.build(input_shape=x.shape)
>>> model.compile(optimizer="adam", loss=SSP1D(lowpass="adaptive", f=f, f_filter=6.0, decay_start=0, decay_epochs=5))
>>> model.fit(x=x, y=y, epochs=10, callbacks=[UseLossLowpassDecay()])
"""
def __init__(
self,
lowpass=None,
f=None,
f_filter=6,
f_min=0,
p=7,
decay_epochs=50,
decay_start=0,
data_format="channels_last",
reduction="sum_over_batch_size",
name=None
):
super().__init__(
fn=normalized_error,
lowpass=lowpass,
f=f,
f_filter=f_filter,
f_min=f_min,
p=p,
decay_epochs=decay_epochs,
decay_start=decay_start,
data_format=data_format,
reduction=reduction,
name=name
)
[docs]
def call(self, y_true, y_pred):
"""
Call method of SSP1D
Parameters
----------
y_true : KerasTensor
Ground truth signal
y_pred : KerasTensor
Predicted signal
Returns
-------
y : KerasTensor
Surface Similarity Parameter between arrays `y_true` and `y_pred`.
The shape is determined by `self.reduction`.
Notes
-----
The call method of `FrequencyLossFunctionWrapper1D` is overwritten, since with the SSP, we can remain in frequency domain and omit the ifft.
Magically, it is the same result if the SSP is applied in time- or frequency domain!
"""
y_true = ops.convert_to_tensor(y_true) # shape = (batch, x, ch) if data_format == 'channels_last' else (batch, x) or (batch, 1, x)
y_pred = ops.convert_to_tensor(y_pred)
# squeeze along channel axis
y_true, y_pred = squeeze_or_expand_to_same_rank(y_true, y_pred, axis=-1 if self.data_format == 'channels_last' else 1) # this adds channel dimension to y_true if there is none
# make channels first for FFT application
y_true = self.transpose_to_channels_first(y_true) # shape = (batch, ch, x) or (batch, x)
y_pred = self.transpose_to_channels_first(y_pred)
if self.lowpass: # i.e. lowpass is not None
# we are working with tuples, since keras has no complex dtype
y_true_real, y_true_imag = self.fft(y_true)
y_pred_real, y_pred_imag = self.fft(y_pred)
y_true_real, y_true_imag = self.apply_filter(real=y_true_real, imag=y_true_imag)
# the ssp loss is taking the L2 norm of the
y_true = ops.concatenate((y_true_real, y_true_imag), axis=1)
y_pred = ops.concatenate((y_pred_real, y_pred_imag), axis=1)
return self.fn(y_true=y_true, y_pred=y_pred)
[docs]
@saving.register_keras_serializable(package="CustomLosses", name="SSP2D")
class SSP2D(FrequencyLossFunctionWrapper2D):
"""
Surface Similarity Parameter for 2-D signals with an optional lowpass filter.
The frequency filter helps the model to focus on the relevant frequency range without the need to, e.g., remove HF noise in additional preprocessing steps.
There are two lowpass filters to choose from, the `"static"` and the `"adaptive"` lowpass.
The `"static"` lowpass defines a global cut-off frequency at `f_filter`.
The `"adaptive"` lowpass analyzes the ground truth data, extracts the peak frequency, and sets a dynamic cut-off frequency at for each sample.
The parameter `f_filter` becomes a multiplier for the peak frequency, after which the frequency components are suppressed.
The definition of a lowpass {`"static"`, `"adaptive"`} requires a frequency range `f`.
It enables an additional step in the loss calculation, where
(1) the ground truth is transformed via 1-D FFT,
(2) a hard binary lowpass filter is applied to the Fourier spectrum to set all frequencies `f>f_filter` to (0+0j),
(3) the filtered ground truth is transformed back to its initial space.
If `lowpass is None`, the FFT calculation is skipped, and no `f` is required.
This class inherits from keras.losses.Loss and can thus be used directly in keras.Model.compile()
Parameters
----------
lowpass : str, optional {`None`, `"static"`, `"adaptive"`}
Lowpass filter that is applied to the ground truth in order to suppress the higher frequency range `f>f_filter`.
Defaults to `None`.
f : KerasTensor, optional
Frequency range for the data.
Is required once a lowpass {`"static"`, `"adaptive"`} is used.
A 1-D `f` is automatically casted to a 2-D grid.
Defaults to `None`.
f_filter : float, optional
Threshold for the lowpass filter.
With the static lowpass, the ground truth spectrum is set to 0+j0 for `f>f_filter`.
With the adaptive lowpass, the ground truth spectrum is set to 0+j0 for `f>f_filter*f_p`,
where `f_p` is the peak frequency that is automatically derived from the ground truth spectrum.
Defaults to 6.0.
f_min : float, optional
Cap for the lowest peak frequency for cases when the automatic estimation of the peak frequency fails (estimated `f_p<0` or `f_p` is Nan).
Defaults to 0.0.
p : float, optional
Exponent to weigh the spectrum towards the peak frequency (for the estimation of the peak frequency), c.f.
Mansard & Funke, "On the fitting of parametric models to measured wave spectra" (1988), and
Sobey & Young, "Hurricane Wind Waves---A discrete spectral model" (1986), https://ascelibrary.org/doi/10.1061/%28ASCE%290733-950X%281986%29112%3A3%28370%29.
Defaults to 7.0.
decay_start : int, optional
Epoch from which on the lowpass filter is linearly decreased from 0 to `f_filter`.
Defaults to 0.
**Requires `UseLossLowpassDecay` callback to work, cf. Notes**
decay_epochs : int, optional
Number of epochs over which the lowpass filter is linearly decreased from 0 to `f_filter`.
Defaults to 50.
**Requires `UseLossLowpassDecay` callback to work, cf. Notes**
data_format : str, optional {`"channels_last"`, `"channels_first"`}
The ordering of the dimensions in the inputs:
`"channels_last"` corresponds to inputs with shape `(batch_size, *dims, channels)`,
`"channels_first"` corresponds to inputs with shape `(batch_size, channels, *dims)`.
Defaults to `"channels_last"`.
reduction : str, optional {`"sum_over_batch_size"`, `None`, `"auto"`, `"sum"`}
Type of reduction to apply to the loss.
In almost all cases this should be `"sum_over_batch_size"`.
Supported options are `"sum"`, `"sum_over_batch_size"` or `None`.
name : str, optional
Name of the loss function. The name is inhereted from class name if `name=None`.
Defaults to `None`.
Notes
-----
Both the `"adaptive"` and `"static"` lowpass filter can be linearly increased from 0 to `f_filter` over `decay_epoch` epochs, starting at epoch `decay_start`.
For this to work, the training has to be conducted using the `UseLossLowpassDecay` callback, which sets the class variable `self.epoch` to the current training epoch.
Examples
--------
>>> from ssp.keras import SSP2D
>>> from keras import ops
>>> from math import pi
>>> x = ops.arange(-2 * pi, 2 * pi, 4 * pi / 512)
>>> xx, yy = ops.meshgrid(x, x) # just a simple square domain from -2pi to 2pi in x and y
>>> y1 = ops.sin(xx) + ops.cos(yy)
>>> y2 = ops.sin(xx + pi / 16) + ops.cos(yy + pi / 16) # sin with phase shift in x and y direction
>>> ssp = SSP2D()
>>> ops.convert_to_numpy(ssp(y1, y2))
array(0.10495476, dtype=float32)
Usage in an ML training
>>> from ssp.keras import SSP2D
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> x = ops.arange(-2 * pi, 2 * pi, 4 * pi / 512)
>>> xx, yy = ops.meshgrid(x, x) # just a simple square domain from -2pi to 2pi in x and y
>>> y = ops.sin(xx) + ops.cos(yy)
>>> y = ops.expand_dims(y, axis=0)
>>> inputs = ops.ones((1, 512, 512))
>>> model = Sequential([layers.Conv2D(filters=1, kernel_size=3, padding="same"), layers.Reshape(target_shape=(512, 512))])
>>> model.build(input_shape=inputs.shape)
>>> model.compile(optimizer="adam", loss=SSP2D())
>>> model.fit(x=inputs, y=y, epochs=1)
Usage in an ML training with static lowpass
>>> from ssp.keras import SSP2D
>>> from ssp.keras.ops import fftfreq
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> x = ops.arange(-2 * pi, 2 * pi, 4 * pi / 512)
>>> xx, yy = ops.meshgrid(x, x) # just a simple square domain from -2pi to 2pi in x and y
>>> y = ops.sin(xx) + ops.cos(yy)
>>> y = ops.expand_dims(y, axis=0)
>>> inputs = ops.ones((1, 512, 512))
>>> model = Sequential([layers.Conv2D(filters=1, kernel_size=3, padding="same"), layers.Reshape(target_shape=(512, 512))])
>>> model.build(input_shape=inputs.shape)
>>> model.compile(optimizer="adam", loss=SSP2D(lowpass="static", f=f, f_filter=2.0))
>>> model.fit(x=inputs, y=y, epochs=1)
Usage in an ML training with adaptive lowpass
>>> from ssp.keras import SSP2D
>>> from ssp.keras.ops import fftfreq
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> x = ops.arange(-2 * pi, 2 * pi, 4 * pi / 512)
>>> xx, yy = ops.meshgrid(x, x) # just a simple square domain from -2pi to 2pi in x and y
>>> y = ops.sin(xx) + ops.cos(yy)
>>> y = ops.expand_dims(y, axis=0)
>>> inputs = ops.ones((1, 512, 512, 1))
>>> model = Sequential([layers.Conv2D(filters=1, kernel_size=3, padding="same"), layers.Reshape(target_shape=(512, 512))])
>>> model.build(input_shape=x.shape)
>>> model.compile(optimizer="adam", loss=SSP2D(lowpass="adaptive", f=f, f_filter=6.0))
>>> model.fit(x=inputs, y=y, epochs=1)
Usage in an ML training with adaptive lowpass and decay callback
>>> from src.ssp.keras import SSP2D
>>> from src.ssp.keras.callbacks import UseLossLowpassDecay
>>> from src.ssp.keras.ops import fftfreq
>>> from keras import ops, Sequential, layers
>>> from math import pi
>>> x = ops.arange(-2 * pi, 2 * pi, 4 * pi / 512)
>>> k = fftfreq(n=512, d=4 * pi / 512, rad=True)
>>> xx, yy = ops.meshgrid(x, x) # just a simple square domain from -2pi to 2pi in x and y
>>> y = ops.sin(xx) + ops.cos(yy)
>>> y = ops.expand_dims(y, axis=0)
>>> inputs = ops.ones((1, 512, 512, 1)) # input has to have a channel dimension with data format "channels_last"
>>> model = Sequential([layers.Conv2D(filters=1, kernel_size=3, padding="same"), layers.Reshape(target_shape=(512, 512))])
>>> model.build(input_shape=inputs.shape)
>>> model.compile(optimizer="adam", loss=SSP2D(lowpass="adaptive", f=k, f_filter=6.0, decay_start=0, decay_epochs=5))
>>> model.fit(x=inputs, y=y, epochs=10, callbacks=[UseLossLowpassDecay()])
"""
def __init__(
self,
lowpass=None,
f=None,
f_filter=6,
f_min=0,
p=7,
decay_epochs=50,
decay_start=0,
data_format="channels_last",
reduction="sum_over_batch_size",
name=None
):
super().__init__(
fn=normalized_error,
lowpass=lowpass,
f=f,
f_filter=f_filter,
f_min=f_min,
p=p,
decay_epochs=decay_epochs,
decay_start=decay_start,
data_format=data_format,
reduction=reduction,
name=name
)
[docs]
def call(self, y_true, y_pred):
"""
Call method of SSP2D
Parameters
----------
y_true : KerasTensor
Ground truth signal
y_pred : KerasTensor
Predicted signal
Returns
-------
y : KerasTensor
Surface Similarity Parameter between arrays `y_true` and `y_pred`.
The shape is determined by `self.reduction`.
Notes
-----
The call method of `FrequencyLossFunctionWrapper2D` is overwritten, since with the SSP, we can remain in frequency domain and omit the ifft.
Magically, it is the same result if the SSP is applied in time- or frequency domain!
The 2-D implementation of the SSP uses the same `normalized_error` function as the 1-D implementation.
This is made possible by flattening the Fourier-transformed arrays along all dimensions that are not the batch dimension.
Then, the norm along the feature axis is taken to calculate the SSP2D.
"""
y_true = ops.convert_to_tensor(y_true) # shape = (batch, x, ch) if data_format == 'channels_last' else (batch, x) or (batch, 1, x)
y_pred = ops.convert_to_tensor(y_pred)
# squeeze along channel axis
y_true, y_pred = squeeze_or_expand_to_same_rank(y_true, y_pred, axis=-1 if self.data_format == 'channels_last' else 1) # this adds channel dimension to y_true if there is none
# make channels first for FFT application
y_true = self.transpose_to_channels_first(y_true) # shape = (batch, ch, x) or (batch, x)
y_pred = self.transpose_to_channels_first(y_pred)
# derive batch size for reshape (flatten) later
b = ops.shape(y_true)[0]
if self.lowpass: # i.e. lowpass is not None
y_true_real, y_true_imag = self.fft(y_true)
y_pred_real, y_pred_imag = self.fft(y_pred)
y_true_real, y_true_imag = self.apply_filter(real=y_true_real, imag=y_true_imag)
# the ssp loss is taking the L2 norm of the flattened spectra
# 1st, reshape all arrays, i.e., flatten along feature axes
y_true_real = ops.reshape(y_true_real, newshape=(b, -1))
y_true_imag = ops.reshape(y_true_imag, newshape=(b, -1))
y_pred_real = ops.reshape(y_pred_real, newshape=(b, -1))
y_pred_imag = ops.reshape(y_pred_imag, newshape=(b, -1))
y_true = ops.concatenate((y_true_real, y_true_imag), axis=1)
y_pred = ops.concatenate((y_pred_real, y_pred_imag), axis=1)
else:
# flatten arrays
y_true = ops.reshape(y_true, newshape=(b, -1))
y_pred = ops.reshape(y_pred, newshape=(b, -1))
return self.fn(y_true=y_true, y_pred=y_pred)