# -*- coding: utf-8 -*-
"""**RosenPy: An Open Source Python Framework for Complex-Valued Neural Networks**.
*Copyright © A. A. Cruz, K. S. Mayer, D. S. Arantes*.
*License*
This file is part of RosenPy.
RosenPy is an open source framework distributed under the terms of the GNU General
Public License, as published by the Free Software Foundation, either version 3 of
the License, or (at your option) any later version. For additional information on
license terms, please open the Readme.md file.
RosenPy is distributed in the hope that it will be useful to every user, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with RosenPy. If not, see <http://www.gnu.org/licenses/>.
"""
from rosenpy.utils import reg_func, init_func, act_func, decay_func
from .rp_layer import Layer
from .rp_nn import NeuralNetwork
from . import rp_optimizer as opt
[docs]class PTRBFNN(NeuralNetwork):
"""
Specification for the Deep Phase Transmittance Radial Basis Function Neural Network
to be passed to the model in construction.
This includes the feedforward, backpropagation, and adding layer methods specifics.
This class derives from NeuralNetwork class.
"""
def _matrix_c(self, phi, weights, trans_type):
"""
Generates the coupling matrix C responsible for converting the linear combination
from the fully connected operation into a convolutional operation.
Parameters:
-----------
phi : array-like
Array containing the values of the basis functions.
weights : array-like
Array containing the values of the weights.
trans_type : int
Type of transformation (Transient and steady-state: 1; or steady-state: 0).
Returns:
--------
array-like
The coupling matrix C.
"""
xp = self.xp # Use cupy or numpy depending on the backend
c_list = []
if trans_type == 1: # Transient and steady-state
for phi_row in range(phi.shape[0]):
m = phi.shape[1]
n = weights.shape[1]
c_matrix = xp.zeros((m + n - 1, n), dtype=phi.dtype)
for row in range(m + n - 1):
for col in range(n):
if 0 <= row - col < m:
c_matrix[row, col] = phi[phi_row, row - col]
c_list.append(c_matrix)
else: # steady-state
for phi_row in range(phi.shape[0]):
m = phi.shape[1]
n = weights.shape[1]
if m > n:
c_matrix = xp.zeros((m - n + 1, n), dtype=phi.dtype)
for row in range(m - n + 1):
for col in range(n):
c_matrix[row, col] = phi[phi_row, row + n - 1 - col]
else:
c_matrix = xp.zeros((n - m + 1, n), dtype=phi.dtype)
for row in range(n - m + 1):
for col in range(n):
if 0 <= n - m + row - col - 1 < m:
c_matrix[row, col] = phi[phi_row, n - m + row - col - 1]
c_list.append(c_matrix)
return xp.stack(c_list)
def _matrix_k(self, phi, weights, trans_type):
"""
Generates the coupling matrix K responsible for transforming between
transient and steady-state or steady-state operations.
Parameters:
-----------
phi : array-like
Array containing the values of the basis functions.
weights : array-like
Array containing the values of the weights.
trans_type : int
Type of transformation (Transient and steady-state: 1; or steady-state: 0).
Returns:
--------
array-like
The coupling matrix K.
"""
xp = self.xp # Use cupy or numpy depending on the backend
k_list = []
if trans_type == 1: # Transient and steady-state
for phi_row in range(phi.shape[0]):
m = phi.shape[1]
n = weights.shape[1]
k_matrix = xp.zeros((m, m + n - 1), dtype=xp.complex128)
for row in range(n):
for col in range(m + n - 1):
if 0 <= col - row < n:
k_matrix[row, col] = weights[0, col - row]
k_list.append(k_matrix)
else: # steady-state
for phi_row in range(phi.shape[0]):
m = phi.shape[1]
n = weights.shape[1]
if m > n:
k_matrix = xp.zeros((m, m - n + 1), dtype=phi.dtype)
for row in range(m):
for col in range(m - n + 1):
if 0 <= col - row + n - 1 < n:
k_matrix[row, col] = weights[0, col - row + n - 1]
else:
k_matrix = xp.zeros((m, n - m + 1), dtype=phi.dtype)
for row in range(m):
for col in range(n - m + 1):
if 0 <= m - row + col - 1 < n:
k_matrix[row, col] = weights[0, m - row + col - 1]
k_list.append(k_matrix)
return xp.stack(k_list)
def _fully_feedforward(self, y_pred, layer):
"""
Performs the feedforward operation specific to a fully connected layer.
Parameters:
-----------
y_pred : array-like
The input data to be fed into the fully connected layer.
layer : FullyConnectedLayer
The fully connected layer object.
Returns:
--------
array-like
The output of the fully connected layer after the feedforward operation.
"""
layer.kern = y_pred[:, self.xp.newaxis, :].repeat(layer.neurons, axis=1) - layer.gamma
layer.seuc = (self.xp.sum(layer.kern.real ** 2, axis=2) / layer.sigma.real +
1j * self.xp.sum(layer.kern.imag ** 2, axis=2) / layer.sigma.imag)
layer.phi = self.xp.exp(-layer.seuc.real) + 1j * self.xp.exp(-layer.seuc.imag)
layer.activ_out = self.xp.dot(layer.phi, layer.weights) + layer.biases
return layer.activ_out
def _conv_feedforward_tp(self, x, layer):
"""
Performs the feedforward operation specific to a convolutional layer.
Parameters:
-----------
x : array-like
The input data to be fed into the convolutional layer.
layer : ConvLayer
The convolutional layer object.
Returns:
--------
array-like
The output of the convolutional layer after the feedforward operation.
"""
layer.input = self.xp.transpose(self.xp.tile(x, (layer.neurons, 1, 1)), axes=[1, 0, 2])
layer.kern = layer.input - self.xp.tile(layer.gamma, (layer.input.shape[0], 1, 1))
aux_r = self.xp.sum(layer.kern.real * layer.kern.real, axis=2)
aux_i = self.xp.sum(layer.kern.imag * layer.kern.imag, axis=2)
seuc_r = aux_r / layer.sigma.real
seuc_i = aux_i / layer.sigma.imag
layer.seuc = seuc_r + 1j * seuc_i
layer.phi = self.xp.exp(-seuc_r) + 1j * self.xp.exp(-seuc_i)
layer.C = self._matrix_c(layer.phi, layer.weights, layer.category)
aux = self.xp.dot(layer.weights, self.xp.transpose(layer.C, (0, 2, 1)))
layer.activ_out = self.xp.squeeze(aux) + layer.biases
return layer.activ_out
[docs] def feedforward(self, x):
"""
Performs the feedforward operation on the neural network.
Parameters:
-----------
x : array-like
The input data to be fed into the neural network.
Returns:
--------
array-like
The output of the neural network after the feedforward operation.
"""
conv_layer_found = False
fully_connected_found = False
for layer in self.layers:
if layer.layer_type == "Conv":
conv_layer_found = True
x = self._conv_feedforward_tp(x, layer)
elif layer.layer_type == "Fully":
if not conv_layer_found:
fully_connected_found = True
else:
if fully_connected_found:
raise ValueError("If there are convolutional layers, the last layer must be fully connected.")
x = self._fully_feedforward(x, layer)
return x
[docs] def backprop(self, y, y_pred, epoch):
"""
Performs the backpropagation operation on the neural network.
Parameters:
-----------
y : array-like
The true labels or target values.
y_pred : array-like
The predicted values from the neural network.
epoch : int
The current epoch number.
Returns:
--------
array-like
The gradients of the loss function with respect to the network parameters.
"""
last = True
aux_k = aux_r = aux_i = 0
for layer in reversed(self.layers):
if layer.layer_type == "Conv":
aux_k, last, aux_r, aux_i = self._conv_backprop_tp(y, y_pred, epoch, layer, aux_k, last, aux_r, aux_i)
elif layer.layer_type == "Fully":
aux_k, last, aux_r, aux_i = self._fully_backprop(y, y_pred, epoch, layer, aux_k, last, aux_r, aux_i)
def _conv_backprop_tp(self, y, y_pred, epoch, layer, aux_k, last, aux_r, aux_i):
"""
Performs the backpropagation operation specific to a convolutional layer.
Parameters:
-----------
y : array-like
The true labels or target values.
y_pred : array-like
The predicted values from the convolutional layer.
epoch : int
The current epoch number.
layer : ConvLayer
The convolutional layer object.
aux_k : array-like
A kernel from the previous layer, which is obtained by subtracting the input by the gamma.
last : bool
Flag indicating if the current layer is the last layer in the network.
aux_r : array-like
Array containing the real part resulting from the multiplication of epsilon by phi under sigma.
aux_i : array-like
Array containing the imaginary part resulting from the multiplication of epsilon by phi under sigma.
Returns:
--------
tuple
A tuple containing the values to be used in the calculations of the following layers.
"""
psi = -self.xp.sum(self.xp.matmul(self.xp.transpose(aux_k.real, (0, 2, 1)), aux_r[:, :, self.xp.newaxis]) +
1j * self.xp.matmul(self.xp.transpose(aux_k.imag, (0, 2, 1)), aux_i[:, :, self.xp.newaxis]), axis=2)
aux_k = layer.kern
k_matrix = self._matrix_k(layer.phi, layer.weights, layer.category)
epsilon = self.xp.einsum('ij,ikj->ik', psi, self.xp.conj(k_matrix))
psi_expanded = self.xp.transpose(self.xp.expand_dims(psi, axis=-1), axes=[1, 0, 2])
beta_r = layer.phi.real / layer.sigma.real
beta_i = layer.phi.imag / layer.sigma.imag
aux_r = epsilon.real * beta_r
aux_i = epsilon.imag * beta_i
reg_l2 = reg_func.l2_regularization(self.xp, layer.lambda_init, layer.reg_strength, epoch)
grad_w = (self.xp.tensordot(psi_expanded, self.xp.conj(layer.C), axes=([0, 1], [1, 0])) / layer.C.shape[0] -
(reg_l2 if layer.reg_strength else 0) * layer.weights)
grad_b = self.xp.mean(psi, axis=0) - (reg_l2 if layer.reg_strength else 0) * layer.biases
s_a = self.xp.multiply(aux_r, layer.seuc.real) + 1j * self.xp.multiply(aux_i, layer.seuc.imag)
grad_s = self.xp.mean(s_a, axis=0) - (reg_l2 if layer.reg_strength else 0) * layer.sigma
g_a = (self.xp.multiply(aux_r[:, :, self.xp.newaxis], layer.kern.real) +
1j * self.xp.multiply(aux_i[:, :, self.xp.newaxis], layer.kern.imag))
grad_g = self.xp.mean(g_a, axis=0) - (reg_l2 if layer.reg_strength else 0) * layer.gamma
layer.weights, layer.biases, layer.sigma, layer.gamma, layer.mt, layer.vt, layer.ut = self.optimizer.update_parameters(
[layer.weights, layer.biases, layer.sigma, layer.gamma],
[grad_w, grad_b, grad_s, grad_g],
layer.learning_rates,
epoch, layer.mt, layer.vt, layer.ut
)
layer.sigma = self.xp.maximum(layer.sigma, 0.0001)
return aux_k, last, aux_r, aux_i
def _fully_backprop(self, y, y_pred, epoch, layer, aux_k, last, aux_r, aux_i):
"""
Performs the backpropagation operation specific to a fully connected layer.
Parameters:
-----------
y : array-like
The true labels or target values.
y_pred : array-like
The predicted values from the fully connected layer.
epoch : int
The current epoch number.
layer : FullyConnectedLayer
The fully connected layer object.
aux_k : array-like
A kernel from the previous layer, which is obtained by subtracting the input by the gamma.
last : bool
Flag indicating if the current layer is the last layer in the network.
aux_r : array-like
Array containing the real part resulting from the multiplication of epsilon by phi under sigma.
aux_i : array-like
Array containing the imaginary part resulting from the multiplication of epsilon by phi under sigma.
Returns:
--------
tuple
A tuple containing the values to be used in the calculations of the following layers.
"""
error = y - y_pred
psi = error if last else -self.xp.sum(self.xp.matmul(self.xp.transpose(aux_k.real, (0, 2, 1)), aux_r[:, :, self.xp.newaxis]) +
1j * self.xp.matmul(self.xp.transpose(aux_k.imag, (0, 2, 1)), aux_i[:, :, self.xp.newaxis]), axis=2)
last = False
aux_k = layer.kern
epsilon = self.xp.dot(psi, self.xp.conj(layer.weights.T))
beta_r = layer.phi.real / layer.sigma.real
beta_i = layer.phi.imag / layer.sigma.imag
aux_r = epsilon.real * beta_r
aux_i = epsilon.imag * beta_i
reg_l2 = reg_func.l2_regularization(self.xp, layer.lambda_init, layer.reg_strength, epoch)
grad_w = self.xp.dot(self.xp.conj(layer.phi.T), psi) - (reg_l2 if layer.reg_strength else 0) * layer.weights
grad_b = self.xp.mean(psi, axis=0) - (reg_l2 if layer.reg_strength else 0) * layer.biases
s_a = self.xp.multiply(aux_r, layer.seuc.real) + 1j * self.xp.multiply(aux_i, layer.seuc.imag)
grad_s = self.xp.mean(s_a, axis=0) - (reg_l2 if layer.reg_strength else 0) * layer.sigma
g_a = (self.xp.multiply(aux_r[:, :, self.xp.newaxis], layer.kern.real) +
1j * self.xp.multiply(aux_i[:, :, self.xp.newaxis], layer.kern.imag))
grad_g = self.xp.mean(g_a, axis=0) - (reg_l2 if layer.reg_strength else 0) * layer.gamma
layer.weights, layer.biases, layer.sigma, layer.gamma, layer.mt, layer.vt, layer.ut = self.optimizer.update_parameters(
[layer.weights, layer.biases, layer.sigma, layer.gamma],
[grad_w, grad_b, grad_s, grad_g],
layer.learning_rates,
epoch, layer.mt, layer.vt, layer.ut
)
layer.sigma = self.xp.maximum(layer.sigma, 0.0001)
return aux_k, last, aux_r, aux_i
[docs] def normalize_data(self, input_data, mean, std_dev):
"""
Normalize the input data.
Args:
input_data (cupy/numpy.ndarray): Input data to be normalized.
Returns:
cupy/numpy.ndarray: Normalized input data.
"""
return ((input_data - mean) / std_dev) * (1 / self.xp.sqrt(input_data.shape[1]))
[docs] def denormalize_outputs(self, normalized_output_data, mean, std_dev):
"""
Denormalize the output data.
Args:
normalized_output_data (cupy/numpy.ndarray): Normalized output data to be denormalized.
Returns:
cupy/numpy.ndarray: Denormalized output data.
"""
return (normalized_output_data * std_dev) / (1 / self.xp.sqrt(normalized_output_data.shape[1])) + mean
[docs] def add_layer(self, neurons, ishape=0, oshape=0, weights_initializer=init_func.opt_ptrbf_weights,
bias_initializer=init_func.zeros, sigma_initializer=init_func.ones, gamma_initializer=init_func.opt_ptrbf_gamma,
reg_strength=0.0, lambda_init=0.1, weights_rate=0.001, biases_rate=0.001, gamma_rate=0.01, sigma_rate=0.01,
lr_decay_method=decay_func.none_decay, lr_decay_rate=0.0, lr_decay_steps=1,
kernel_initializer=init_func.opt_ptrbf_gamma, kernel_size=3,
module=None, category=1,
layer_type="Fully"):
"""
Adds a layer to the neural network.
This method is responsible for appending a new layer to the neural network structure.
The layer can be fully connected or convolutional, depending on the parameters provided.
Parameters
----------
neurons : int
The number of neurons in the hidden layer. If `ishape` is different from zero
and this is the first layer of the model, `neurons` represents the number of
neurons in the first layer (i.e., the number of input features).
ishape : int, optional
The number of neurons in the first layer (i.e., the number of input features). Default is 0.
oshape : int, optional
The number of output neurons (shape of the output). If not provided, defaults to the number of neurons. Default is 0.
weights_initializer : function, optional
The function used to initialize the layer's weights. Default is `init_func.opt_ptrbf_weights`.
bias_initializer : function, optional
The function used to initialize the layer's biases. Default is `init_func.zeros`.
sigma_initializer : function, optional
The function used to initialize the `sigma` parameter. Default is `init_func.ones`.
gamma_initializer : function, optional
The function used to initialize the `gamma` parameter. Default is `init_func.opt_ptrbf_gamma`.
reg_strength : float, optional
The strength of L2 regularization applied to the layer. Default is 0.0 (no regularization).
lambda_init : float, optional
The initial value for the regularization term. Default is 0.1.
weights_rate : float, optional
The learning rate applied to the weights during training. Default is 0.001.
biases_rate : float, optional
The learning rate applied to the biases during training. Default is 0.001.
gamma_rate : float, optional
The learning rate applied to the `gamma` parameter during training. Default is 0.01.
sigma_rate : float, optional
The learning rate applied to the `sigma` parameter during training. Default is 0.01.
lr_decay_method : function, optional
The method used for decaying the learning rate over time. Default is `decay_func.none_decay`.
lr_decay_rate : float, optional
The rate at which the learning rate decays. Default is 0.0 (no decay).
lr_decay_steps : int, optional
The number of steps after which the learning rate decays. Default is 1.
kernel_initializer : function, optional
The function used to initialize the kernel for convolutional layers. Default is `init_func.opt_ptrbf_gamma`.
kernel_size : int, optional
The size of the convolutional kernel. Default is 3.
module : object, optional
The computation module used (e.g., NumPy or CuPy). If not provided, it is set during the initialization of the `NeuralNetwork` class. Default is None.
category : int, optional
The type of convolution: 1 for transient and steady-state, 0 for steady-state only. Default is 1.
layer_type : str, optional
The type of layer to add: "Fully" for fully connected layers, "Conv" for convolutional layers. Default is "Fully".
Returns
-------
None
This method does not return any value; it modifies the network structure by appending a new layer.
Notes
-----
The layer is added to the `self.layers` list, which is a sequence of layers in the neural network.
The parameters provided, such as initialization methods and learning rates, are specific to each layer.
"""
self.layers.append(Layer(
ishape if not len(self.layers) else self.layers[-1].oshape, neurons, neurons if not oshape else oshape,
weights_initializer=weights_initializer,
bias_initializer=bias_initializer,
sigma_initializer=sigma_initializer,
gamma_initializer=gamma_initializer,
reg_strength=reg_strength,
lambda_init=lambda_init,
weights_rate=weights_rate,
biases_rate=biases_rate,
sigma_rate=sigma_rate,
gamma_rate=gamma_rate,
cvnn=4,
lr_decay_method=lr_decay_method,
lr_decay_rate=lr_decay_rate,
lr_decay_steps=lr_decay_steps,
kernel_initializer=kernel_initializer,
kernel_size=kernel_size,
module=self.xp,
category=category,
layer_type=layer_type
))