diff --git a/src/pathsim/blocks/__init__.py b/src/pathsim/blocks/__init__.py index c8580e25..dd67152b 100644 --- a/src/pathsim/blocks/__init__.py +++ b/src/pathsim/blocks/__init__.py @@ -1,6 +1,7 @@ from .differentiator import * from .integrator import * from .multiplier import * +from .divider import * from .converters import * from .comparator import * from .samplehold import * @@ -20,6 +21,7 @@ from .noise import * from .table import * from .relay import * +from .logic import * from .math import * from .ctrl import * from .lti import * diff --git a/src/pathsim/blocks/converters.py b/src/pathsim/blocks/converters.py index c2726930..b069031c 100644 --- a/src/pathsim/blocks/converters.py +++ b/src/pathsim/blocks/converters.py @@ -12,10 +12,12 @@ from ._block import Block from ..utils.register import Register from ..events.schedule import Schedule +from ..utils.mutable import mutable # MIXED SIGNAL BLOCKS =================================================================== +@mutable class ADC(Block): """Models an ideal Analog-to-Digital Converter (ADC). @@ -104,6 +106,7 @@ def __len__(self): return 0 +@mutable class DAC(Block): """Models an ideal Digital-to-Analog Converter (DAC). diff --git a/src/pathsim/blocks/ctrl.py b/src/pathsim/blocks/ctrl.py index 5f65cdc6..70fa72f6 100644 --- a/src/pathsim/blocks/ctrl.py +++ b/src/pathsim/blocks/ctrl.py @@ -14,10 +14,12 @@ from .dynsys import DynamicalSystem from ..optim.operator import Operator, DynamicOperator +from ..utils.mutable import mutable # LTI CONTROL BLOCKS (StateSpace subclasses) ============================================ +@mutable class PT1(StateSpace): """First-order lag element (PT1). @@ -65,6 +67,7 @@ def __init__(self, K=1.0, T=1.0): ) +@mutable class PT2(StateSpace): """Second-order lag element (PT2). @@ -124,6 +127,7 @@ def __init__(self, K=1.0, T=1.0, d=1.0): ) +@mutable class LeadLag(StateSpace): """Lead-Lag compensator. @@ -180,6 +184,7 @@ def __init__(self, K=1.0, T1=1.0, T2=1.0): ) +@mutable class PID(StateSpace): """Proportional-Integral-Differentiation (PID) controller. @@ -253,6 +258,7 @@ def __init__(self, Kp=0, Ki=0, Kd=0, f_max=100): ) +@mutable class AntiWindupPID(PID): """Proportional-Integral-Differentiation (PID) controller with anti-windup mechanism (back-calculation). diff --git a/src/pathsim/blocks/delay.py b/src/pathsim/blocks/delay.py index 88baad10..6bcbed8b 100644 --- a/src/pathsim/blocks/delay.py +++ b/src/pathsim/blocks/delay.py @@ -1,6 +1,6 @@ ######################################################################################### ## -## TIME DOMAIN DELAY BLOCK +## TIME DOMAIN DELAY BLOCK ## (blocks/delay.py) ## ######################################################################################### @@ -9,68 +9,120 @@ import numpy as np +from collections import deque + from ._block import Block from ..utils.adaptivebuffer import AdaptiveBuffer +from ..events.schedule import Schedule +from ..utils.mutable import mutable # BLOCKS ================================================================================ +@mutable class Delay(Block): - """Delays the input signal by a time constant 'tau' in seconds. + """Delays the input signal by a time constant 'tau' in seconds. + + Supports two modes of operation: - Mathematically this block creates a time delay of the input signal like this: + **Continuous mode** (default, ``sampling_period=None``): + Uses an adaptive interpolating buffer for continuous-time delay. .. math:: - - y(t) = + + y(t) = \\begin{cases} x(t - \\tau) & , t \\geq \\tau \\\\ 0 & , t < \\tau \\end{cases} + **Discrete mode** (``sampling_period`` provided): + Uses a ring buffer with scheduled sampling events for N-sample delay, + where ``N = round(tau / sampling_period)``. + + .. math:: + + y[k] = x[k - N] + Note ---- - The internal adaptive buffer uses interpolation for the evaluation. This is - required to be compatible with variable step solvers. It has a drawback however. - The order of the ode solver used will degrade when this block is used, due to - the interpolation. + In continuous mode, the internal adaptive buffer uses interpolation for + the evaluation. This is required to be compatible with variable step solvers. + It has a drawback however. The order of the ode solver used will degrade + when this block is used, due to the interpolation. + - Note ---- - This block supports vector input, meaning we can have multiple parallel + This block supports vector input, meaning we can have multiple parallel delay paths through this block. Example ------- - The block is initialized like this: + Continuous-time delay: .. code-block:: python - + #5 time units delay D = Delay(tau=5) - + + Discrete-time N-sample delay (10 samples): + + .. code-block:: python + + D = Delay(tau=0.01, sampling_period=0.001) + Parameters ---------- tau : float - delay time constant + delay time constant in seconds + sampling_period : float, None + sampling period for discrete mode, default is continuous mode Attributes ---------- _buffer : AdaptiveBuffer - internal interpolatable adaptive rolling buffer + internal interpolatable adaptive rolling buffer (continuous mode) + _ring : deque + internal ring buffer for N-sample delay (discrete mode) """ - def __init__(self, tau=1e-3): + def __init__(self, tau=1e-3, sampling_period=None): super().__init__() - #time delay in seconds + #time delay in seconds self.tau = tau - #create adaptive buffer - self._buffer = AdaptiveBuffer(self.tau) + #params for sampling + self.sampling_period = sampling_period + + if sampling_period is None: + + #continuous mode: adaptive buffer with interpolation + self._buffer = AdaptiveBuffer(self.tau) + + else: + + #discrete mode: ring buffer with N-sample delay + self._n = max(1, round(self.tau / self.sampling_period)) + self._ring = deque([0.0] * self._n, maxlen=self._n + 1) + + #flag to indicate this is a timestep to sample + self._sample_next_timestep = False + + #internal scheduled event for periodic sampling + def _sample(t): + self._sample_next_timestep = True + + self.events = [ + Schedule( + t_start=0, + t_period=sampling_period, + func_act=_sample + ) + ] def __len__(self): @@ -81,13 +133,18 @@ def __len__(self): def reset(self): super().reset() - #clear the buffer - self._buffer.clear() + if self.sampling_period is None: + #clear the adaptive buffer + self._buffer.clear() + else: + #clear the ring buffer + self._ring.clear() + self._ring.extend([0.0] * self._n) def update(self, t): - """Evaluation of the buffer at different times - via interpolation. + """Evaluation of the buffer at different times + via interpolation (continuous) or ring buffer lookup (discrete). Parameters ---------- @@ -95,13 +152,17 @@ def update(self, t): evaluation time """ - #retrieve value from buffer - y = self._buffer.get(t) - self.outputs.update_from_array(y) + if self.sampling_period is None: + #continuous mode: retrieve value from buffer + y = self._buffer.get(t) + self.outputs.update_from_array(y) + else: + #discrete mode: output the oldest value in the ring buffer + self.outputs[0] = self._ring[0] def sample(self, t, dt): - """Sample input values and time of sampling + """Sample input values and time of sampling and add them to the buffer. Parameters @@ -112,5 +173,11 @@ def sample(self, t, dt): integration timestep """ - #add new value to buffer - self._buffer.add(t, self.inputs.to_array()) \ No newline at end of file + if self.sampling_period is None: + #continuous mode: add new value to buffer + self._buffer.add(t, self.inputs.to_array()) + else: + #discrete mode: only sample on scheduled events + if self._sample_next_timestep: + self._ring.append(self.inputs[0]) + self._sample_next_timestep = False \ No newline at end of file diff --git a/src/pathsim/blocks/divider.py b/src/pathsim/blocks/divider.py new file mode 100644 index 00000000..4e623170 --- /dev/null +++ b/src/pathsim/blocks/divider.py @@ -0,0 +1,218 @@ +######################################################################################### +## +## REDUCTION BLOCKS (blocks/divider.py) +## +## This module defines static 'Divider' block +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from math import prod + +from ._block import Block +from ..utils.register import Register +from ..optim.operator import Operator +from ..utils.mutable import mutable + + +# MISO BLOCKS =========================================================================== + +_ZERO_DIV_OPTIONS = ("warn", "raise", "clamp") + + +@mutable +class Divider(Block): + """Multiplies and divides input signals (MISO). + + This is the default behavior (multiply all): + + .. math:: + + y(t) = \\prod_i u_i(t) + + and this is the behavior with an operations string: + + .. math:: + + y(t) = \\frac{\\prod_{i \\in M} u_i(t)}{\\prod_{j \\in D} u_j(t)} + + where :math:`M` is the set of inputs with ``*`` and :math:`D` the set with ``/``. + + + Example + ------- + Default initialization multiplies the first input and divides by the second: + + .. code-block:: python + + D = Divider() + + Multiply the first two inputs and divide by the third: + + .. code-block:: python + + D = Divider('**/') + + Raise an error instead of producing ``inf`` when a denominator input is zero: + + .. code-block:: python + + D = Divider('**/', zero_div='raise') + + Clamp the denominator to machine epsilon so the output stays finite: + + .. code-block:: python + + D = Divider('**/', zero_div='clamp') + + + Note + ---- + This block is purely algebraic and its operation (``op_alg``) will be called + multiple times per timestep, each time when ``Simulation._update(t)`` is + called in the global simulation loop. + + + Parameters + ---------- + operations : str, optional + String of ``*`` and ``/`` characters indicating which inputs are + multiplied (``*``) or divided (``/``). Inputs beyond the length of + the string default to ``*``. Defaults to ``'*/'`` (divide second + input by first). + zero_div : str, optional + Behaviour when a denominator input is zero. One of: + + ``'warn'`` *(default)* + Propagates ``inf`` and emits a ``RuntimeWarning`` — numpy's + standard behaviour. + ``'raise'`` + Raises ``ZeroDivisionError``. + ``'clamp'`` + Clamps the denominator magnitude to machine epsilon + (``numpy.finfo(float).eps``), preserving sign, so the output + stays large-but-finite rather than ``inf``. + + + Attributes + ---------- + _ops : dict + Maps operation characters to exponent values (``+1`` or ``-1``). + _ops_array : numpy.ndarray + Exponents (+1 for ``*``, -1 for ``/``) converted to an array. + op_alg : Operator + Internal algebraic operator. + """ + + input_port_labels = None + output_port_labels = {"out": 0} + + def __init__(self, operations="*/", zero_div="warn"): + super().__init__() + + # validate zero_div + if zero_div not in _ZERO_DIV_OPTIONS: + raise ValueError( + f"'zero_div' must be one of {_ZERO_DIV_OPTIONS}, got '{zero_div}'" + ) + self.zero_div = zero_div + + # allowed arithmetic operations mapped to exponents + self._ops = {"*": 1, "/": -1} + self.operations = operations + + if self.operations is None: + + # Default: multiply all inputs — identical to Multiplier + self.op_alg = Operator( + func=prod, + jac=lambda x: np.array([[ + prod(np.delete(x, i)) for i in range(len(x)) + ]]) + ) + + else: + + # input validation + if not isinstance(self.operations, str): + raise ValueError("'operations' must be a string or None") + for op in self.operations: + if op not in self._ops: + raise ValueError( + f"operation '{op}' not in {set(self._ops)}" + ) + + self._ops_array = np.array( + [self._ops[op] for op in self.operations], dtype=float + ) + + # capture for closures + _ops_array = self._ops_array + _zero_div = zero_div + _eps = np.finfo(float).eps + + def _safe_den(d): + """Apply zero_div policy to a denominator value.""" + if d == 0: + if _zero_div == "raise": + raise ZeroDivisionError( + "Divider: denominator is zero. " + "Use zero_div='warn' or 'clamp' to suppress." + ) + elif _zero_div == "clamp": + return _eps + return d + + def prod_ops(X): + n = len(X) + no = len(_ops_array) + ops = np.ones(n) + ops[:min(n, no)] = _ops_array[:min(n, no)] + num = prod(X[i] for i in range(n) if ops[i] > 0) + den = _safe_den(prod(X[i] for i in range(n) if ops[i] < 0)) + return num / den + + def jac_ops(X): + n = len(X) + no = len(_ops_array) + ops = np.ones(n) + ops[:min(n, no)] = _ops_array[:min(n, no)] + X = np.asarray(X, dtype=float) + # Apply zero_div policy to all denominator inputs up front so + # both the direct division and the rest-product stay consistent. + X_safe = X.copy() + for i in range(n): + if ops[i] < 0: + X_safe[i] = _safe_den(float(X[i])) + row = [] + for k in range(n): + rest = np.prod( + np.power(np.delete(X_safe, k), np.delete(ops, k)) + ) + if ops[k] > 0: # multiply: dy/du_k = prod of rest + row.append(rest) + else: # divide: dy/du_k = -rest / u_k^2 + row.append(-rest / X_safe[k] ** 2) + return np.array([row]) + + self.op_alg = Operator(func=prod_ops, jac=jac_ops) + + + def __len__(self): + """Purely algebraic block.""" + return 1 + + + def update(self, t): + """Update system equation. + + Parameters + ---------- + t : float + Evaluation time. + """ + u = self.inputs.to_array() + self.outputs.update_from_array(self.op_alg(u)) diff --git a/src/pathsim/blocks/filters.py b/src/pathsim/blocks/filters.py index ceff9cdc..6015df89 100644 --- a/src/pathsim/blocks/filters.py +++ b/src/pathsim/blocks/filters.py @@ -14,10 +14,12 @@ from .lti import StateSpace from ..utils.register import Register +from ..utils.mutable import mutable # FILTER BLOCKS ========================================================================= +@mutable class ButterworthLowpassFilter(StateSpace): """Direct implementation of a low pass butterworth filter block. @@ -52,6 +54,7 @@ def __init__(self, Fc=100, n=2): super().__init__(omega_c*A, omega_c*B, C, D) +@mutable class ButterworthHighpassFilter(StateSpace): """Direct implementation of a high pass butterworth filter block. @@ -85,6 +88,7 @@ def __init__(self, Fc=100, n=2): super().__init__(omega_c*A, omega_c*B, C, D) +@mutable class ButterworthBandpassFilter(StateSpace): """Direct implementation of a bandpass butterworth filter block. @@ -119,6 +123,7 @@ def __init__(self, Fc=[50, 100], n=2): super().__init__(*tf2ss(num, den)) +@mutable class ButterworthBandstopFilter(StateSpace): """Direct implementation of a bandstop butterworth filter block. @@ -153,6 +158,7 @@ def __init__(self, Fc=[50, 100], n=2): super().__init__(*tf2ss(num, den)) +@mutable class AllpassFilter(StateSpace): """Direct implementation of a first order allpass filter, or a cascade of n 1st order allpass filters diff --git a/src/pathsim/blocks/fir.py b/src/pathsim/blocks/fir.py index 5cdebd66..8db1a8a3 100644 --- a/src/pathsim/blocks/fir.py +++ b/src/pathsim/blocks/fir.py @@ -10,13 +10,15 @@ import numpy as np from collections import deque -from ._block import Block +from ._block import Block from ..utils.register import Register -from ..events.schedule import Schedule +from ..events.schedule import Schedule +from ..utils.mutable import mutable # FIR FILTER BLOCK ====================================================================== +@mutable class FIR(Block): """Models a discrete-time Finite-Impulse-Response (FIR) filter. diff --git a/src/pathsim/blocks/logic.py b/src/pathsim/blocks/logic.py new file mode 100644 index 00000000..31315b6f --- /dev/null +++ b/src/pathsim/blocks/logic.py @@ -0,0 +1,229 @@ +######################################################################################### +## +## COMPARISON AND LOGIC BLOCKS +## (blocks/logic.py) +## +## definitions of comparison and boolean logic blocks +## +######################################################################################### + +# IMPORTS =============================================================================== + +import numpy as np + +from ._block import Block + +from ..optim.operator import Operator + + +# BASE LOGIC BLOCK ====================================================================== + +class Logic(Block): + """Base logic block. + + Note + ---- + This block doesnt implement any functionality itself. + Its intended to be used as a base for the comparison and logic blocks. + Its **not** intended to be used directly! + + """ + + def __len__(self): + """Purely algebraic block""" + return 1 + + + def update(self, t): + """update algebraic component of system equation + + Parameters + ---------- + t : float + evaluation time + """ + u = self.inputs.to_array() + y = self.op_alg(u) + self.outputs.update_from_array(y) + + +# COMPARISON BLOCKS ===================================================================== + +class GreaterThan(Logic): + """Greater-than comparison block. + + Compares two inputs and outputs 1.0 if a > b, else 0.0. + + .. math:: + + y = + \\begin{cases} + 1 & , a > b \\\\ + 0 & , a \\leq b + \\end{cases} + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + input_port_labels = {"a":0, "b":1} + output_port_labels = {"y":0} + + def __init__(self): + super().__init__() + + self.op_alg = Operator( + func=lambda x: float(x[0] > x[1]), + jac=lambda x: np.zeros((1, 2)) + ) + + +class LessThan(Logic): + """Less-than comparison block. + + Compares two inputs and outputs 1.0 if a < b, else 0.0. + + .. math:: + + y = + \\begin{cases} + 1 & , a < b \\\\ + 0 & , a \\geq b + \\end{cases} + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + input_port_labels = {"a":0, "b":1} + output_port_labels = {"y":0} + + def __init__(self): + super().__init__() + + self.op_alg = Operator( + func=lambda x: float(x[0] < x[1]), + jac=lambda x: np.zeros((1, 2)) + ) + + +class Equal(Logic): + """Equality comparison block. + + Compares two inputs and outputs 1.0 if |a - b| <= tolerance, else 0.0. + + .. math:: + + y = + \\begin{cases} + 1 & , |a - b| \\leq \\epsilon \\\\ + 0 & , |a - b| > \\epsilon + \\end{cases} + + Parameters + ---------- + tolerance : float + comparison tolerance for floating point equality + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + input_port_labels = {"a":0, "b":1} + output_port_labels = {"y":0} + + def __init__(self, tolerance=1e-12): + super().__init__() + + self.tolerance = tolerance + + self.op_alg = Operator( + func=lambda x: float(abs(x[0] - x[1]) <= self.tolerance), + jac=lambda x: np.zeros((1, 2)) + ) + + +# BOOLEAN LOGIC BLOCKS ================================================================== + +class LogicAnd(Logic): + """Logical AND block. + + Outputs 1.0 if both inputs are nonzero, else 0.0. + + .. math:: + + y = a \\land b + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + input_port_labels = {"a":0, "b":1} + output_port_labels = {"y":0} + + def __init__(self): + super().__init__() + + self.op_alg = Operator( + func=lambda x: float(bool(x[0]) and bool(x[1])), + jac=lambda x: np.zeros((1, 2)) + ) + + +class LogicOr(Logic): + """Logical OR block. + + Outputs 1.0 if either input is nonzero, else 0.0. + + .. math:: + + y = a \\lor b + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + input_port_labels = {"a":0, "b":1} + output_port_labels = {"y":0} + + def __init__(self): + super().__init__() + + self.op_alg = Operator( + func=lambda x: float(bool(x[0]) or bool(x[1])), + jac=lambda x: np.zeros((1, 2)) + ) + + +class LogicNot(Logic): + """Logical NOT block. + + Outputs 1.0 if input is zero, else 0.0. + + .. math:: + + y = \\lnot x + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + def __init__(self): + super().__init__() + + self.op_alg = Operator( + func=lambda x: float(not bool(x[0])), + jac=lambda x: np.zeros((1, 1)) + ) diff --git a/src/pathsim/blocks/lti.py b/src/pathsim/blocks/lti.py index de85e99c..6a9cd23b 100644 --- a/src/pathsim/blocks/lti.py +++ b/src/pathsim/blocks/lti.py @@ -22,6 +22,7 @@ from ..utils.deprecation import deprecated from ..optim.operator import DynamicOperator +from ..utils.mutable import mutable # LTI BLOCKS ============================================================================ @@ -169,6 +170,7 @@ def step(self, t, dt): return self.engine.step(f, dt) +@mutable class TransferFunctionPRC(StateSpace): """This block defines a LTI (MIMO for pole residue) transfer function. @@ -227,6 +229,7 @@ class TransferFunction(TransferFunctionPRC): pass +@mutable class TransferFunctionZPG(StateSpace): """This block defines a LTI (SISO) transfer function. @@ -281,6 +284,7 @@ def __init__(self, Zeros=[], Poles=[-1], Gain=1.0): super().__init__(sp_SS.A, sp_SS.B, sp_SS.C, sp_SS.D) +@mutable class TransferFunctionNumDen(StateSpace): """This block defines a LTI (SISO) transfer function. diff --git a/src/pathsim/blocks/math.py b/src/pathsim/blocks/math.py index 6a08f927..aa134a28 100644 --- a/src/pathsim/blocks/math.py +++ b/src/pathsim/blocks/math.py @@ -15,6 +15,7 @@ from ..utils.register import Register from ..optim.operator import Operator +from ..utils.mutable import mutable # BASE MATH BLOCK ======================================================================= @@ -574,4 +575,154 @@ def __init__(self, A=np.eye(1)): self.op_alg = Operator( func=lambda u: np.dot(self.A, u), jac=lambda u: self.A + ) + + +class Atan2(Block): + """Two-argument arctangent block. + + Computes the four-quadrant arctangent of two inputs: + + .. math:: + + y = \\mathrm{atan2}(a, b) + + Note + ---- + This block takes exactly two inputs (a, b) and produces one output. + The first input is the y-coordinate, the second is the x-coordinate, + matching the convention of ``numpy.arctan2(y, x)``. + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + input_port_labels = {"a":0, "b":1} + output_port_labels = {"y":0} + + def __init__(self): + super().__init__() + + def _atan2_jac(x): + a, b = x[0], x[1] + denom = a**2 + b**2 + if denom == 0: + return np.zeros((1, 2)) + return np.array([[b / denom, -a / denom]]) + + self.op_alg = Operator( + func=lambda x: np.arctan2(x[0], x[1]), + jac=_atan2_jac + ) + + + def __len__(self): + """Purely algebraic block""" + return 1 + + + def update(self, t): + """update algebraic component of system equation + + Parameters + ---------- + t : float + evaluation time + """ + u = self.inputs.to_array() + y = self.op_alg(u) + self.outputs.update_from_array(y) + + +@mutable +class Rescale(Math): + """Linear rescaling / mapping block. + + Maps the input linearly from range ``[i0, i1]`` to range ``[o0, o1]``. + Optionally saturates the output to ``[o0, o1]``. + + .. math:: + + y = o_0 + \\frac{(x - i_0) \\cdot (o_1 - o_0)}{i_1 - i_0} + + This block supports vector inputs. + + Parameters + ---------- + i0 : float + input range lower bound + i1 : float + input range upper bound + o0 : float + output range lower bound + o1 : float + output range upper bound + saturate : bool + if True, clamp output to [min(o0,o1), max(o0,o1)] + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + def __init__(self, i0=0.0, i1=1.0, o0=0.0, o1=1.0, saturate=False): + super().__init__() + + self.i0 = i0 + self.i1 = i1 + self.o0 = o0 + self.o1 = o1 + self.saturate = saturate + + #precompute gain + self._gain = (o1 - o0) / (i1 - i0) + + def _maplin(x): + y = self.o0 + (x - self.i0) * self._gain + if self.saturate: + lo, hi = min(self.o0, self.o1), max(self.o0, self.o1) + y = np.clip(y, lo, hi) + return y + + def _maplin_jac(x): + if self.saturate: + lo, hi = min(self.o0, self.o1), max(self.o0, self.o1) + y = self.o0 + (x - self.i0) * self._gain + mask = (y >= lo) & (y <= hi) + return np.diag(mask.astype(float) * self._gain) + return np.diag(np.full_like(x, self._gain)) + + self.op_alg = Operator( + func=_maplin, + jac=_maplin_jac + ) + + +class Alias(Math): + """Signal alias / pass-through block. + + Passes the input directly to the output without modification. + This is useful for signal renaming in model composition. + + .. math:: + + y = x + + This block supports vector inputs. + + Attributes + ---------- + op_alg : Operator + internal algebraic operator + """ + + def __init__(self): + super().__init__() + + self.op_alg = Operator( + func=lambda x: x, + jac=lambda x: np.eye(len(x)) ) \ No newline at end of file diff --git a/src/pathsim/blocks/samplehold.py b/src/pathsim/blocks/samplehold.py index ae3e5b96..a292877c 100644 --- a/src/pathsim/blocks/samplehold.py +++ b/src/pathsim/blocks/samplehold.py @@ -9,10 +9,12 @@ from ._block import Block from ..events.schedule import Schedule +from ..utils.mutable import mutable # MIXED SIGNAL BLOCKS =================================================================== +@mutable class SampleHold(Block): """Samples the inputs periodically and produces them at the output. diff --git a/src/pathsim/blocks/sources.py b/src/pathsim/blocks/sources.py index 170514b9..a2abfcea 100644 --- a/src/pathsim/blocks/sources.py +++ b/src/pathsim/blocks/sources.py @@ -14,6 +14,7 @@ from ._block import Block from ..utils.register import Register from ..utils.deprecation import deprecated +from ..utils.mutable import mutable from ..events.schedule import Schedule, ScheduleList from .._constants import TOLERANCE @@ -169,6 +170,7 @@ def update(self, t): # SPECIAL CONTINUOUS SOURCE BLOCKS ====================================================== +@mutable class TriangleWaveSource(Source): """Source block that generates an analog triangle wave @@ -214,6 +216,7 @@ def _triangle_wave(self, t, f): return 2 * abs(t*f - np.floor(t*f + 0.5)) - 1 +@mutable class SinusoidalSource(Source): """Source block that generates a sinusoid wave @@ -289,6 +292,7 @@ def _gaussian(self, t, f_max): return np.exp(-(t/tau)**2) +@mutable class SinusoidalPhaseNoiseSource(Block): """Sinusoidal source with cumulative and white phase noise. @@ -703,6 +707,7 @@ class ChirpSource(ChirpPhaseNoiseSource): # SPECIAL DISCRETE SOURCE BLOCKS ======================================================== +@mutable class PulseSource(Block): """Generates a periodic pulse waveform with defined rise and fall times. @@ -909,6 +914,7 @@ class Pulse(PulseSource): pass +@mutable class ClockSource(Block): """Discrete time clock source block. @@ -970,6 +976,7 @@ class Clock(ClockSource): +@mutable class SquareWaveSource(Block): """Discrete time square wave source. diff --git a/src/pathsim/blocks/spectrum.py b/src/pathsim/blocks/spectrum.py index 24268fe9..7b3a0878 100644 --- a/src/pathsim/blocks/spectrum.py +++ b/src/pathsim/blocks/spectrum.py @@ -15,12 +15,14 @@ from ..utils.realtimeplotter import RealtimePlotter from ..utils.deprecation import deprecated +from ..utils.mutable import mutable from .._constants import COLORS_ALL # BLOCKS FOR DATA RECORDING ============================================================= +@mutable class Spectrum(Block): """Block for fourier spectrum analysis (spectrum analyzer). diff --git a/src/pathsim/utils/__init__.py b/src/pathsim/utils/__init__.py index 4e586b8c..ce95ea7c 100644 --- a/src/pathsim/utils/__init__.py +++ b/src/pathsim/utils/__init__.py @@ -1 +1,2 @@ from .deprecation import deprecated +from .mutable import mutable diff --git a/src/pathsim/utils/mutable.py b/src/pathsim/utils/mutable.py new file mode 100644 index 00000000..97888908 --- /dev/null +++ b/src/pathsim/utils/mutable.py @@ -0,0 +1,177 @@ +######################################################################################### +## +## MUTABLE PARAMETER DECORATOR +## (utils/mutable.py) +## +## Class decorator that enables runtime parameter mutation with automatic +## reinitialization. When a decorated parameter is changed, the block's +## __init__ is re-run with updated values while preserving engine state. +## +######################################################################################### + +# IMPORTS =============================================================================== + +import inspect +import functools + +import numpy as np + + +# REINIT HELPER ========================================================================= + +def _do_reinit(block): + """Re-run __init__ with current parameter values, preserving engine state. + + Uses ``type(block).__init__`` to always reinit from the most derived class, + ensuring that subclass overrides (e.g. operator replacements) are preserved. + + Parameters + ---------- + block : Block + the block instance to reinitialize + """ + + actual_cls = type(block) + + # gather current values for ALL init params of the actual class + sig = inspect.signature(actual_cls.__init__) + kwargs = {} + for name in sig.parameters: + if name == "self": + continue + if hasattr(block, name): + kwargs[name] = getattr(block, name) + + # save engine + engine = block.engine if hasattr(block, 'engine') else None + + # re-run init through the wrapped __init__ (handles depth counting) + block._param_locked = False + actual_cls.__init__(block, **kwargs) + # _param_locked is set to True by the outermost new_init wrapper + + # restore engine + if engine is not None: + old_dim = len(engine) + new_dim = len(np.atleast_1d(block.initial_value)) if hasattr(block, 'initial_value') else 0 + + if old_dim == new_dim: + # same dimension - restore the entire engine + block.engine = engine + else: + # dimension changed - create new engine inheriting settings + block.engine = type(engine).create( + block.initial_value, + parent=engine.parent, + ) + block.engine.tolerance_lte_abs = engine.tolerance_lte_abs + block.engine.tolerance_lte_rel = engine.tolerance_lte_rel + + +# DECORATOR ============================================================================= + +def mutable(cls): + """Class decorator that makes all ``__init__`` parameters trigger automatic + reinitialization when changed at runtime. + + Parameters are auto-detected from the ``__init__`` signature. When any parameter + is changed at runtime, the block's ``__init__`` is re-executed with updated values. + The integration engine state is preserved across reinitialization. + + A ``set(**kwargs)`` method is also generated for batched parameter updates that + triggers only a single reinitialization. + + Supports inheritance: if both a parent and child class use ``@mutable``, the init + guard uses a depth counter to ensure reinitialization only triggers after the + outermost ``__init__`` completes. + + Example + ------- + .. code-block:: python + + @mutable + class PT1(StateSpace): + def __init__(self, K=1.0, T=1.0): + self.K = K + self.T = T + super().__init__( + A=np.array([[-1.0 / T]]), + B=np.array([[K / T]]), + C=np.array([[1.0]]), + D=np.array([[0.0]]) + ) + + pt1 = PT1(K=2.0, T=0.5) + pt1.K = 5.0 # auto reinitializes + pt1.set(K=5.0, T=0.3) # single reinitialization + """ + + original_init = cls.__init__ + + # auto-detect all __init__ parameters + params = [ + name for name in inspect.signature(original_init).parameters + if name != "self" + ] + + # -- install property descriptors for all params ------------------------------- + + for name in params: + storage = f"_p_{name}" + + def _make_property(s): + def getter(self): + return getattr(self, s) + + def setter(self, value): + setattr(self, s, value) + if getattr(self, '_param_locked', False): + _do_reinit(self) + + return property(getter, setter) + + setattr(cls, name, _make_property(storage)) + + # -- wrap __init__ with depth counter ------------------------------------------ + + @functools.wraps(original_init) + def new_init(self, *args, **kwargs): + self._init_depth = getattr(self, '_init_depth', 0) + 1 + try: + original_init(self, *args, **kwargs) + finally: + self._init_depth -= 1 + if self._init_depth == 0: + self._param_locked = True + + cls.__init__ = new_init + + # -- generate batched set() method --------------------------------------------- + + def set(self, **kwargs): + """Set multiple parameters and reinitialize once. + + Parameters + ---------- + kwargs : dict + parameter names and their new values + + Example + ------- + .. code-block:: python + + block.set(K=5.0, T=0.3) + """ + self._param_locked = False + for key, value in kwargs.items(): + setattr(self, key, value) + _do_reinit(self) + + cls.set = set + + # -- store metadata for introspection ------------------------------------------ + + existing = getattr(cls, '_mutable_params', ()) + cls._mutable_params = existing + tuple(p for p in params if p not in existing) + + return cls diff --git a/tests/evals/test_logic_system.py b/tests/evals/test_logic_system.py new file mode 100644 index 00000000..9d6f56cf --- /dev/null +++ b/tests/evals/test_logic_system.py @@ -0,0 +1,265 @@ +######################################################################################## +## +## Testing logic and comparison block systems +## +## Verifies comparison (GreaterThan, LessThan, Equal) and boolean logic +## (LogicAnd, LogicOr, LogicNot) blocks in full simulation context. +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim import Simulation, Connection +from pathsim.blocks import ( + Source, + Constant, + Scope, + ) + +from pathsim.blocks.logic import ( + GreaterThan, + LessThan, + Equal, + LogicAnd, + LogicOr, + LogicNot, + ) + + +# TESTCASE ============================================================================= + +class TestComparisonSystem(unittest.TestCase): + """ + Test comparison blocks in a simulation that compares a sine wave + against a constant threshold. + + System: Source(sin(t)) → GT/LT/EQ → Scope + Constant(0) ↗ + + Verify: GT outputs 1 when sin(t) > 0, LT outputs 1 when sin(t) < 0 + """ + + def setUp(self): + + Src = Source(lambda t: np.sin(2 * np.pi * t)) + Thr = Constant(0.0) + + self.GT = GreaterThan() + self.LT = LessThan() + + self.Sco = Scope(labels=["signal", "gt_zero", "lt_zero"]) + + blocks = [Src, Thr, self.GT, self.LT, self.Sco] + + connections = [ + Connection(Src, self.GT["a"], self.LT["a"], self.Sco[0]), + Connection(Thr, self.GT["b"], self.LT["b"]), + Connection(self.GT, self.Sco[1]), + Connection(self.LT, self.Sco[2]), + ] + + self.Sim = Simulation( + blocks, + connections, + dt=0.01, + log=False + ) + + + def test_gt_lt_complementary(self): + """GT and LT should be complementary (sum to 1) away from zero crossings""" + + self.Sim.run(duration=3.0, reset=True) + + time, [sig, gt, lt] = self.Sco.read() + + #away from zero crossings, GT + LT should be 1 (exactly one is true) + mask = np.abs(sig) > 0.1 + result = gt[mask] + lt[mask] + + self.assertTrue(np.allclose(result, 1.0), + "GT and LT should be complementary away from zero crossings") + + + def test_gt_matches_positive(self): + """GT output should be 1 when signal is clearly positive""" + + self.Sim.run(duration=3.0, reset=True) + + time, [sig, gt, lt] = self.Sco.read() + + mask_pos = sig > 0.2 + self.assertTrue(np.all(gt[mask_pos] == 1.0), + "GT should be 1 when signal is positive") + + mask_neg = sig < -0.2 + self.assertTrue(np.all(gt[mask_neg] == 0.0), + "GT should be 0 when signal is negative") + + +class TestLogicGateSystem(unittest.TestCase): + """ + Test logic gates combining two comparison outputs. + + System: Two sine waves at different frequencies compared against 0, + then combined with AND/OR/NOT. + + Verify: Logic truth tables hold across the simulation. + """ + + def setUp(self): + + #two signals with different frequencies so they go in and out of phase + Src1 = Source(lambda t: np.sin(2 * np.pi * 1.0 * t)) + Src2 = Source(lambda t: np.sin(2 * np.pi * 1.5 * t)) + Zero = Constant(0.0) + + GT1 = GreaterThan() + GT2 = GreaterThan() + + self.AND = LogicAnd() + self.OR = LogicOr() + self.NOT = LogicNot() + + self.Sco = Scope(labels=["gt1", "gt2", "and", "or", "not1"]) + + blocks = [Src1, Src2, Zero, GT1, GT2, + self.AND, self.OR, self.NOT, self.Sco] + + connections = [ + Connection(Src1, GT1["a"]), + Connection(Src2, GT2["a"]), + Connection(Zero, GT1["b"], GT2["b"]), + Connection(GT1, self.AND["a"], self.OR["a"], self.NOT, self.Sco[0]), + Connection(GT2, self.AND["b"], self.OR["b"], self.Sco[1]), + Connection(self.AND, self.Sco[2]), + Connection(self.OR, self.Sco[3]), + Connection(self.NOT, self.Sco[4]), + ] + + self.Sim = Simulation( + blocks, + connections, + dt=0.01, + log=False + ) + + + def test_and_gate(self): + """AND should only be 1 when both inputs are 1""" + + self.Sim.run(duration=5.0, reset=True) + + time, [gt1, gt2, and_out, or_out, not_out] = self.Sco.read() + + #where both are 1, AND should be 1 + both_true = (gt1 == 1.0) & (gt2 == 1.0) + if np.any(both_true): + self.assertTrue(np.all(and_out[both_true] == 1.0)) + + #where either is 0, AND should be 0 + either_false = (gt1 == 0.0) | (gt2 == 0.0) + if np.any(either_false): + self.assertTrue(np.all(and_out[either_false] == 0.0)) + + + def test_or_gate(self): + """OR should be 1 when either input is 1""" + + self.Sim.run(duration=5.0, reset=True) + + time, [gt1, gt2, and_out, or_out, not_out] = self.Sco.read() + + #where both are 0, OR should be 0 + both_false = (gt1 == 0.0) & (gt2 == 0.0) + if np.any(both_false): + self.assertTrue(np.all(or_out[both_false] == 0.0)) + + #where either is 1, OR should be 1 + either_true = (gt1 == 1.0) | (gt2 == 1.0) + if np.any(either_true): + self.assertTrue(np.all(or_out[either_true] == 1.0)) + + + def test_not_gate(self): + """NOT should invert its input""" + + self.Sim.run(duration=5.0, reset=True) + + time, [gt1, gt2, and_out, or_out, not_out] = self.Sco.read() + + #NOT should be inverse of GT1 + self.assertTrue(np.allclose(not_out + gt1, 1.0), + "NOT should invert its input") + + +class TestEqualSystem(unittest.TestCase): + """ + Test Equal block detecting when two signals are close. + + System: Source(sin(t)) → Equal ← Source(sin(t + small_offset)) + """ + + def test_equal_detects_match(self): + """Equal should output 1 when signals match within tolerance""" + + Src1 = Constant(3.14) + Src2 = Constant(3.14) + + Eq = Equal(tolerance=0.01) + Sco = Scope() + + Sim = Simulation( + blocks=[Src1, Src2, Eq, Sco], + connections=[ + Connection(Src1, Eq["a"]), + Connection(Src2, Eq["b"]), + Connection(Eq, Sco), + ], + dt=0.1, + log=False + ) + + Sim.run(duration=1.0, reset=True) + + time, [eq_out] = Sco.read() + + self.assertTrue(np.all(eq_out == 1.0), + "Equal should output 1 for identical signals") + + + def test_equal_detects_mismatch(self): + """Equal should output 0 when signals differ""" + + Src1 = Constant(1.0) + Src2 = Constant(2.0) + + Eq = Equal(tolerance=0.01) + Sco = Scope() + + Sim = Simulation( + blocks=[Src1, Src2, Eq, Sco], + connections=[ + Connection(Src1, Eq["a"]), + Connection(Src2, Eq["b"]), + Connection(Eq, Sco), + ], + dt=0.1, + log=False + ) + + Sim.run(duration=1.0, reset=True) + + time, [eq_out] = Sco.read() + + self.assertTrue(np.all(eq_out == 0.0), + "Equal should output 0 for different signals") + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/evals/test_rescale_delay_system.py b/tests/evals/test_rescale_delay_system.py new file mode 100644 index 00000000..60c97be4 --- /dev/null +++ b/tests/evals/test_rescale_delay_system.py @@ -0,0 +1,277 @@ +######################################################################################## +## +## Testing Rescale, Atan2, Alias, and discrete Delay systems +## +## Verifies new math blocks and discrete delay mode in full simulation context. +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim import Simulation, Connection +from pathsim.blocks import ( + Source, + SinusoidalSource, + Constant, + Delay, + Scope, + ) + +from pathsim.blocks.math import Atan2, Rescale, Alias + + +# TESTCASE ============================================================================= + +class TestRescaleSystem(unittest.TestCase): + """ + Test Rescale block mapping a sine wave from [-1, 1] to [0, 10]. + + System: Source(sin(t)) → Rescale → Scope + Verify: output is linearly mapped to target range + """ + + def setUp(self): + + Src = SinusoidalSource(amplitude=1.0, frequency=1.0) + + self.Rsc = Rescale(i0=-1.0, i1=1.0, o0=0.0, o1=10.0) + self.Sco = Scope(labels=["input", "rescaled"]) + + blocks = [Src, self.Rsc, self.Sco] + + connections = [ + Connection(Src, self.Rsc, self.Sco[0]), + Connection(self.Rsc, self.Sco[1]), + ] + + self.Sim = Simulation( + blocks, + connections, + dt=0.01, + log=False + ) + + + def test_rescale_range(self): + """Output should be in [0, 10] for input in [-1, 1]""" + + self.Sim.run(duration=3.0, reset=True) + + time, [inp, rsc] = self.Sco.read() + + #check output stays within target range (with small tolerance) + self.assertTrue(np.all(rsc >= -0.1), "Rescaled output below lower bound") + self.assertTrue(np.all(rsc <= 10.1), "Rescaled output above upper bound") + + + def test_rescale_linearity(self): + """Output should be linear mapping of input""" + + self.Sim.run(duration=3.0, reset=True) + + time, [inp, rsc] = self.Sco.read() + + #expected: 5 + 5 * sin(t) + expected = 5.0 + 5.0 * inp + error = np.max(np.abs(rsc - expected)) + + self.assertLess(error, 0.01, f"Rescale linearity error: {error:.4f}") + + +class TestRescaleSaturationSystem(unittest.TestCase): + """ + Test Rescale with saturation enabled. + + System: Source(ramp) → Rescale(saturate=True) → Scope + Verify: output is clamped to target range + """ + + def test_saturation_clamps_output(self): + + #ramp from -2 to 2 over 4 seconds, mapped [0,1] -> [0,10] + Src = Source(lambda t: t - 2.0) + Rsc = Rescale(i0=0.0, i1=1.0, o0=0.0, o1=10.0, saturate=True) + Sco = Scope(labels=["input", "rescaled"]) + + Sim = Simulation( + blocks=[Src, Rsc, Sco], + connections=[ + Connection(Src, Rsc, Sco[0]), + Connection(Rsc, Sco[1]), + ], + dt=0.01, + log=False + ) + + Sim.run(duration=4.0, reset=True) + + time, [inp, rsc] = Sco.read() + + #output should never exceed [0, 10] + self.assertTrue(np.all(rsc >= -0.01), "Saturated output below 0") + self.assertTrue(np.all(rsc <= 10.01), "Saturated output above 10") + + #input in valid range [0, 1] should map normally + mask_valid = (inp >= 0.0) & (inp <= 1.0) + if np.any(mask_valid): + expected = 10.0 * inp[mask_valid] + error = np.max(np.abs(rsc[mask_valid] - expected)) + self.assertLess(error, 0.1) + + +class TestAtan2System(unittest.TestCase): + """ + Test Atan2 block computing the angle of a rotating vector. + + System: Source(sin(t)) → Atan2 ← Source(cos(t)) + Verify: output recovers the angle t (mod 2pi) + """ + + def setUp(self): + + self.SrcY = Source(lambda t: np.sin(t)) + self.SrcX = Source(lambda t: np.cos(t)) + + self.At2 = Atan2() + self.Sco = Scope(labels=["angle"]) + + blocks = [self.SrcY, self.SrcX, self.At2, self.Sco] + + connections = [ + Connection(self.SrcY, self.At2["a"]), + Connection(self.SrcX, self.At2["b"]), + Connection(self.At2, self.Sco), + ] + + self.Sim = Simulation( + blocks, + connections, + dt=0.01, + log=False + ) + + + def test_atan2_recovers_angle(self): + """atan2(sin(t), cos(t)) should equal t for t in [0, pi)""" + + self.Sim.run(duration=3.0, reset=True) + + time, [angle] = self.Sco.read() + + #check in first half period where atan2 is monotonic + mask = time < np.pi - 0.1 + expected = time[mask] + actual = angle[mask] + + error = np.max(np.abs(actual - expected)) + self.assertLess(error, 0.02, + f"Atan2 angle recovery error: {error:.4f}") + + +class TestAliasSystem(unittest.TestCase): + """ + Test Alias block as a transparent pass-through. + + System: Source(sin(t)) → Alias → Scope + Verify: output is identical to input + """ + + def test_alias_transparent(self): + + Src = SinusoidalSource(amplitude=1.0, frequency=2.0) + Als = Alias() + Sco = Scope(labels=["input", "alias"]) + + Sim = Simulation( + blocks=[Src, Als, Sco], + connections=[ + Connection(Src, Als, Sco[0]), + Connection(Als, Sco[1]), + ], + dt=0.01, + log=False + ) + + Sim.run(duration=2.0, reset=True) + + time, [inp, als] = Sco.read() + + self.assertTrue(np.allclose(inp, als), + "Alias output should be identical to input") + + +class TestDiscreteDelaySystem(unittest.TestCase): + """ + Test discrete-time delay using sampling_period parameter. + + System: Source(ramp) → Delay(tau, sampling_period) → Scope + Verify: output is a staircase-delayed version of input + """ + + def setUp(self): + + self.tau = 0.1 + self.T = 0.01 + + Src = Source(lambda t: t) + self.Dly = Delay(tau=self.tau, sampling_period=self.T) + self.Sco = Scope(labels=["input", "delayed"]) + + blocks = [Src, self.Dly, self.Sco] + + connections = [ + Connection(Src, self.Dly, self.Sco[0]), + Connection(self.Dly, self.Sco[1]), + ] + + self.Sim = Simulation( + blocks, + connections, + dt=0.001, + log=False + ) + + + def test_discrete_delay_offset(self): + """Delayed signal should trail input by approximately tau""" + + self.Sim.run(duration=1.0, reset=True) + + time, [inp, delayed] = self.Sco.read() + + #after initial fill (t > tau + settling), check delay offset + mask = time > self.tau + 0.2 + t_check = time[mask] + delayed_check = delayed[mask] + + #the delayed ramp should be approximately (t - tau) + #with staircase quantization from sampling + expected = t_check - self.tau + error = np.mean(np.abs(delayed_check - expected)) + + self.assertLess(error, self.T + 0.01, + f"Discrete delay mean error: {error:.4f}") + + + def test_discrete_delay_zero_initial(self): + """Output should be zero during initial fill period""" + + self.Sim.run(duration=0.5, reset=True) + + time, [inp, delayed] = self.Sco.read() + + #during first tau seconds, output should be 0 + mask = time < self.tau * 0.5 + early_output = delayed[mask] + + self.assertTrue(np.all(early_output == 0.0), + "Discrete delay output should be zero before buffer fills") + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/pathsim/blocks/test_delay.py b/tests/pathsim/blocks/test_delay.py index 5a98236a..f4a86cec 100644 --- a/tests/pathsim/blocks/test_delay.py +++ b/tests/pathsim/blocks/test_delay.py @@ -105,6 +105,97 @@ def test_update(self): self.assertEqual(D.outputs[0], max(0, t-10.5)) +class TestDelayDiscrete(unittest.TestCase): + """ + Test the discrete-time (sampling_period) mode of the 'Delay' block class + """ + + def test_init_discrete(self): + + D = Delay(tau=0.01, sampling_period=0.001) + + self.assertEqual(D._n, 10) + self.assertEqual(len(D._ring), 10) + self.assertTrue(hasattr(D, 'events')) + self.assertEqual(len(D.events), 1) + + + def test_n_computation(self): + + #exact multiple + D = Delay(tau=0.05, sampling_period=0.01) + self.assertEqual(D._n, 5) + + #rounding + D = Delay(tau=0.015, sampling_period=0.01) + self.assertEqual(D._n, 2) + + #minimum of 1 + D = Delay(tau=0.001, sampling_period=0.01) + self.assertEqual(D._n, 1) + + + def test_len(self): + + D = Delay(tau=0.01, sampling_period=0.001) + + #no passthrough + self.assertEqual(len(D), 0) + + + def test_reset(self): + + D = Delay(tau=0.01, sampling_period=0.001) + + #push some values + D._sample_next_timestep = True + D.inputs[0] = 42.0 + D.sample(0, 0.001) + + D.reset() + + #ring buffer should be all zeros + self.assertTrue(all(v == 0.0 for v in D._ring)) + self.assertEqual(len(D._ring), D._n) + + + def test_discrete_delay(self): + + n = 3 + D = Delay(tau=0.003, sampling_period=0.001) + + self.assertEqual(D._n, n) + + #push values through the ring buffer + outputs = [] + for k in range(10): + D.inputs[0] = float(k) + D._sample_next_timestep = True + D.sample(k * 0.001, 0.001) + D.update(k * 0.001) + outputs.append(D.outputs[0]) + + #first n outputs should be 0 (initial buffer fill) + for k in range(n): + self.assertEqual(outputs[k], 0.0, f"output[{k}] should be 0.0") + + #after that, output should be delayed by n samples + for k in range(n, 10): + self.assertEqual(outputs[k], float(k - n), f"output[{k}] should be {k-n}") + + + def test_no_sample_without_flag(self): + + D = Delay(tau=0.003, sampling_period=0.001) + + #push a value without the flag set + D.inputs[0] = 42.0 + D.sample(0, 0.001) + + #ring buffer should be unchanged (all zeros) + self.assertTrue(all(v == 0.0 for v in D._ring)) + + # RUN TESTS LOCALLY ==================================================================== if __name__ == '__main__': diff --git a/tests/pathsim/blocks/test_divider.py b/tests/pathsim/blocks/test_divider.py new file mode 100644 index 00000000..a60b7edc --- /dev/null +++ b/tests/pathsim/blocks/test_divider.py @@ -0,0 +1,305 @@ +######################################################################################## +## +## TESTS FOR +## 'blocks.divider.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim.blocks.divider import Divider + +from tests.pathsim.blocks._embedding import Embedding + + +# TESTS ================================================================================ + +class TestDivider(unittest.TestCase): + """ + Test the implementation of the 'Divider' block class + """ + + def test_init(self): + + # default initialization + D = Divider() + self.assertEqual(D.operations, "*/") + + # valid ops strings + for ops in ["*", "/", "*/", "/*", "**/", "/**"]: + D = Divider(ops) + self.assertEqual(D.operations, ops) + + # non-string types are rejected + for bad in [0.4, 3, [1, -1], True]: + with self.assertRaises(ValueError): + Divider(bad) + + # strings with invalid characters are rejected + for bad in ["+/", "*-", "a", "**0", "+-"]: + with self.assertRaises(ValueError): + Divider(bad) + + + def test_embedding(self): + """Test algebraic output against reference via Embedding.""" + + # default: '*/' — u0 * u2 * ... / u1 + D = Divider() + + def src(t): return t + 1, np.cos(t) + 2, 3.0 + def ref(t): return (t + 1) * 3.0 / (np.cos(t) + 2) + + E = Embedding(D, src, ref) + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + # '**/' : multiply first two, divide by third + D = Divider('**/') + + def src(t): return np.cos(t) + 2, np.sin(t) + 2, t + 1 + def ref(t): return (np.cos(t) + 2) * (np.sin(t) + 2) / (t + 1) + + E = Embedding(D, src, ref) + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + # '*/' : u0 / u1 + D = Divider('*/') + + def src(t): return t + 1, np.cos(t) + 2 + def ref(t): return (t + 1) / (np.cos(t) + 2) + + E = Embedding(D, src, ref) + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + # ops string shorter than number of inputs: extra inputs default to '*' + # '/' with 3 inputs → y = u1 * u2 / u0 + D = Divider('/') + + def src(t): return t + 1, np.cos(t) + 2, 3.0 + def ref(t): return (np.cos(t) + 2) * 3.0 / (t + 1) + + E = Embedding(D, src, ref) + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + # single input, default: passes through unchanged + D = Divider() + + def src(t): return np.cos(t) + def ref(t): return np.cos(t) + + E = Embedding(D, src, ref) + for t in range(10): self.assertEqual(*E.check_SISO(t)) + + + def test_linearization(self): + """Test linearize / delinearize round-trip.""" + + # default ('*/') — nonlinear, so only check at linearization point + D = Divider() + + def src(t): return np.cos(t) + 2, t + 1 + def ref(t): return (np.cos(t) + 2) / (t + 1) + + E = Embedding(D, src, ref) + + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + # linearize at the current operating point (inputs set to src(9) by the loop) + D.linearize(t) + a, b = E.check_MIMO(t) + self.assertAlmostEqual(np.linalg.norm(a - b), 0, 8) + + D.delinearize() + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + # with ops string + D = Divider('**/') + + def src(t): return np.cos(t) + 2, np.sin(t) + 2, t + 1 + def ref(t): return (np.cos(t) + 2) * (np.sin(t) + 2) / (t + 1) + + E = Embedding(D, src, ref) + + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + D.linearize(t) + a, b = E.check_MIMO(t) + self.assertAlmostEqual(np.linalg.norm(a - b), 0, 8) + + D.delinearize() + for t in range(10): self.assertEqual(*E.check_MIMO(t)) + + + def test_update_single(self): + + D = Divider() + + D.inputs[0] = 5.0 + D.update(None) + + self.assertEqual(D.outputs[0], 5.0) + + + def test_update_multi(self): + + # default '*/' with 3 inputs: ops=[*, /, *] → (u0 * u2) / u1 + D = Divider() + + D.inputs[0] = 6.0 + D.inputs[1] = 2.0 + D.inputs[2] = 3.0 + D.update(None) + + self.assertEqual(D.outputs[0], 9.0) + + + def test_update_ops(self): + + # '**/' : 2 * 3 / 4 = 1.5 + D = Divider('**/') + D.inputs[0] = 2.0 + D.inputs[1] = 3.0 + D.inputs[2] = 4.0 + D.update(None) + self.assertAlmostEqual(D.outputs[0], 1.5) + + # '*/' : 6 / 2 = 3 + D = Divider('*/') + D.inputs[0] = 6.0 + D.inputs[1] = 2.0 + D.update(None) + self.assertAlmostEqual(D.outputs[0], 3.0) + + # '/' with extra inputs: 2 * 3 / 4 = 1.5 (u0 divides, u1 u2 multiply) + D = Divider('/') + D.inputs[0] = 4.0 + D.inputs[1] = 2.0 + D.inputs[2] = 3.0 + D.update(None) + self.assertAlmostEqual(D.outputs[0], 1.5) + + # '/**' : u1 * u2 / u0 + D = Divider('/**') + D.inputs[0] = 4.0 + D.inputs[1] = 2.0 + D.inputs[2] = 3.0 + D.update(None) + self.assertAlmostEqual(D.outputs[0], 1.5) + + + def test_jacobian(self): + """Verify analytical Jacobian against central finite differences.""" + + eps = 1e-6 + + def numerical_jac(func, u): + n = len(u) + J = np.zeros((1, n)) + for k in range(n): + u_p = u.copy(); u_p[k] += eps + u_m = u.copy(); u_m[k] -= eps + J[0, k] = (func(u_p) - func(u_m)) / (2 * eps) + return J + + # default (all multiply) + D = Divider() + u = np.array([2.0, 3.0, 4.0]) + np.testing.assert_allclose( + D.op_alg.jac(u), + numerical_jac(D.op_alg._func, u), + rtol=1e-5, + ) + + # '**/' : u0 * u1 / u2 + D = Divider('**/') + u = np.array([2.0, 3.0, 4.0]) + np.testing.assert_allclose( + D.op_alg.jac(u), + numerical_jac(D.op_alg._func, u), + rtol=1e-5, + ) + + # '*/' : u0 / u1 + D = Divider('*/') + u = np.array([6.0, 2.0]) + np.testing.assert_allclose( + D.op_alg.jac(u), + numerical_jac(D.op_alg._func, u), + rtol=1e-5, + ) + + # '/**' : u1 * u2 / u0 + D = Divider('/**') + u = np.array([4.0, 2.0, 3.0]) + np.testing.assert_allclose( + D.op_alg.jac(u), + numerical_jac(D.op_alg._func, u), + rtol=1e-5, + ) + + # ops shorter than inputs: '/' with 3 inputs → u1 * u2 / u0 + D = Divider('/') + u = np.array([4.0, 2.0, 3.0]) + np.testing.assert_allclose( + D.op_alg.jac(u), + numerical_jac(D.op_alg._func, u), + rtol=1e-5, + ) + + + def test_zero_div(self): + + # 'warn' (default): produces inf, no exception + D = Divider('*/', zero_div='warn') + D.inputs[0] = 6.0 + D.inputs[1] = 0.0 + import warnings + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + D.update(None) + self.assertTrue(np.isinf(D.outputs[0])) + + # 'raise': ZeroDivisionError on zero denominator + D = Divider('*/', zero_div='raise') + D.inputs[0] = 6.0 + D.inputs[1] = 0.0 + with self.assertRaises(ZeroDivisionError): + D.update(None) + + # 'raise': no error when denominator is nonzero + D = Divider('*/', zero_div='raise') + D.inputs[0] = 6.0 + D.inputs[1] = 2.0 + D.update(None) + self.assertAlmostEqual(D.outputs[0], 3.0) + + # 'clamp': output is large-but-finite + D = Divider('*/', zero_div='clamp') + D.inputs[0] = 1.0 + D.inputs[1] = 0.0 + D.update(None) + self.assertTrue(np.isfinite(D.outputs[0])) + self.assertGreater(abs(D.outputs[0]), 1.0) + + # 'raise' invalid zero_div value + with self.assertRaises(ValueError): + Divider('*/', zero_div='ignore') + + # Jacobian: 'raise' on zero denominator input + D = Divider('*/', zero_div='raise') + with self.assertRaises(ZeroDivisionError): + D.op_alg.jac(np.array([6.0, 0.0])) + + # Jacobian: 'clamp' stays finite + D = Divider('*/', zero_div='clamp') + J = D.op_alg.jac(np.array([1.0, 0.0])) + self.assertTrue(np.all(np.isfinite(J))) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/pathsim/blocks/test_logic.py b/tests/pathsim/blocks/test_logic.py new file mode 100644 index 00000000..e684205c --- /dev/null +++ b/tests/pathsim/blocks/test_logic.py @@ -0,0 +1,217 @@ +######################################################################################## +## +## TESTS FOR +## 'blocks.logic.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim.blocks.logic import ( + GreaterThan, + LessThan, + Equal, + LogicAnd, + LogicOr, + LogicNot, +) + +from tests.pathsim.blocks._embedding import Embedding + + +# TESTS ================================================================================ + +class TestGreaterThan(unittest.TestCase): + """ + Test the implementation of the 'GreaterThan' block class + """ + + def test_embedding(self): + """test algebraic components via embedding""" + + B = GreaterThan() + + #test a > b + def src(t): return t, 5.0 + def ref(t): return float(t > 5.0) + E = Embedding(B, src, ref) + + for t in range(10): self.assertTrue(np.allclose(*E.check_MIMO(t))) + + def test_equal_values(self): + """test that equal values return 0""" + + B = GreaterThan() + B.inputs[0] = 5.0 + B.inputs[1] = 5.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + def test_less_than(self): + """test that a < b returns 0""" + + B = GreaterThan() + B.inputs[0] = 3.0 + B.inputs[1] = 5.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + +class TestLessThan(unittest.TestCase): + """ + Test the implementation of the 'LessThan' block class + """ + + def test_embedding(self): + """test algebraic components via embedding""" + + B = LessThan() + + #test a < b + def src(t): return t, 5.0 + def ref(t): return float(t < 5.0) + E = Embedding(B, src, ref) + + for t in range(10): self.assertTrue(np.allclose(*E.check_MIMO(t))) + + def test_equal_values(self): + """test that equal values return 0""" + + B = LessThan() + B.inputs[0] = 5.0 + B.inputs[1] = 5.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + +class TestEqual(unittest.TestCase): + """ + Test the implementation of the 'Equal' block class + """ + + def test_equal(self): + """test that equal values return 1""" + + B = Equal() + B.inputs[0] = 5.0 + B.inputs[1] = 5.0 + B.update(0) + self.assertEqual(B.outputs[0], 1.0) + + def test_not_equal(self): + """test that different values return 0""" + + B = Equal() + B.inputs[0] = 5.0 + B.inputs[1] = 6.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + def test_tolerance(self): + """test tolerance parameter""" + + B = Equal(tolerance=0.1) + B.inputs[0] = 5.0 + B.inputs[1] = 5.05 + B.update(0) + self.assertEqual(B.outputs[0], 1.0) + + B.inputs[1] = 5.2 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + +class TestLogicAnd(unittest.TestCase): + """ + Test the implementation of the 'LogicAnd' block class + """ + + def test_truth_table(self): + """test all combinations of boolean inputs""" + + B = LogicAnd() + + cases = [ + (0.0, 0.0, 0.0), + (0.0, 1.0, 0.0), + (1.0, 0.0, 0.0), + (1.0, 1.0, 1.0), + ] + + for a, b, expected in cases: + B.inputs[0] = a + B.inputs[1] = b + B.update(0) + self.assertEqual(B.outputs[0], expected, f"AND({a}, {b}) should be {expected}") + + def test_nonzero_is_true(self): + """test that nonzero values are treated as true""" + + B = LogicAnd() + B.inputs[0] = 5.0 + B.inputs[1] = -3.0 + B.update(0) + self.assertEqual(B.outputs[0], 1.0) + + +class TestLogicOr(unittest.TestCase): + """ + Test the implementation of the 'LogicOr' block class + """ + + def test_truth_table(self): + """test all combinations of boolean inputs""" + + B = LogicOr() + + cases = [ + (0.0, 0.0, 0.0), + (0.0, 1.0, 1.0), + (1.0, 0.0, 1.0), + (1.0, 1.0, 1.0), + ] + + for a, b, expected in cases: + B.inputs[0] = a + B.inputs[1] = b + B.update(0) + self.assertEqual(B.outputs[0], expected, f"OR({a}, {b}) should be {expected}") + + +class TestLogicNot(unittest.TestCase): + """ + Test the implementation of the 'LogicNot' block class + """ + + def test_true_to_false(self): + """test that nonzero input gives 0""" + + B = LogicNot() + B.inputs[0] = 1.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + def test_false_to_true(self): + """test that zero input gives 1""" + + B = LogicNot() + B.inputs[0] = 0.0 + B.update(0) + self.assertEqual(B.outputs[0], 1.0) + + def test_nonzero_is_true(self): + """test that arbitrary nonzero values are treated as true""" + + B = LogicNot() + B.inputs[0] = 42.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + +# RUN TESTS LOCALLY ==================================================================== + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/pathsim/blocks/test_math.py b/tests/pathsim/blocks/test_math.py index a9c950b6..075e831f 100644 --- a/tests/pathsim/blocks/test_math.py +++ b/tests/pathsim/blocks/test_math.py @@ -544,6 +544,132 @@ def ref(t): +class TestAtan2(unittest.TestCase): + """ + Test the implementation of the 'Atan2' block class + """ + + def test_embedding(self): + """test algebraic components via embedding""" + + B = Atan2() + + def src(t): return np.sin(t + 0.1), np.cos(t + 0.1) + def ref(t): return np.arctan2(np.sin(t + 0.1), np.cos(t + 0.1)) + E = Embedding(B, src, ref) + + for t in range(10): self.assertTrue(np.allclose(*E.check_MIMO(t))) + + def test_quadrants(self): + """test all four quadrants""" + + B = Atan2() + + cases = [ + ( 1.0, 1.0, np.arctan2(1.0, 1.0)), + ( 1.0, -1.0, np.arctan2(1.0, -1.0)), + (-1.0, -1.0, np.arctan2(-1.0, -1.0)), + (-1.0, 1.0, np.arctan2(-1.0, 1.0)), + ] + + for a, b, expected in cases: + B.inputs[0] = a + B.inputs[1] = b + B.update(0) + self.assertAlmostEqual(B.outputs[0], expected) + + +class TestRescale(unittest.TestCase): + """ + Test the implementation of the 'Rescale' block class + """ + + def test_default_identity(self): + """test default mapping [0,1] -> [0,1] is identity""" + + B = Rescale() + + def src(t): return t * 0.1 + def ref(t): return t * 0.1 + E = Embedding(B, src, ref) + + for t in range(10): self.assertEqual(*E.check_SISO(t)) + + def test_custom_mapping(self): + """test custom linear mapping""" + + B = Rescale(i0=0.0, i1=10.0, o0=0.0, o1=100.0) + + def src(t): return float(t) + def ref(t): return float(t) * 10.0 + E = Embedding(B, src, ref) + + for t in range(10): self.assertAlmostEqual(*E.check_SISO(t)) + + def test_saturate(self): + """test saturation clamping""" + + B = Rescale(i0=0.0, i1=1.0, o0=0.0, o1=10.0, saturate=True) + + #input beyond range + B.inputs[0] = 2.0 + B.update(0) + self.assertEqual(B.outputs[0], 10.0) + + #input below range + B.inputs[0] = -1.0 + B.update(0) + self.assertEqual(B.outputs[0], 0.0) + + def test_no_saturate(self): + """test that without saturation, output can exceed range""" + + B = Rescale(i0=0.0, i1=1.0, o0=0.0, o1=10.0, saturate=False) + + B.inputs[0] = 2.0 + B.update(0) + self.assertEqual(B.outputs[0], 20.0) + + def test_vector_input(self): + """test with vector inputs""" + + B = Rescale(i0=0.0, i1=10.0, o0=-1.0, o1=1.0) + + def src(t): return float(t), float(t) * 2 + def ref(t): return -1.0 + float(t) * 0.2, -1.0 + float(t) * 2 * 0.2 + E = Embedding(B, src, ref) + + for t in range(5): self.assertTrue(np.allclose(*E.check_MIMO(t))) + + +class TestAlias(unittest.TestCase): + """ + Test the implementation of the 'Alias' block class + """ + + def test_passthrough_siso(self): + """test that input passes through unchanged""" + + B = Alias() + + def src(t): return float(t) + def ref(t): return float(t) + E = Embedding(B, src, ref) + + for t in range(10): self.assertEqual(*E.check_SISO(t)) + + def test_passthrough_mimo(self): + """test that vector input passes through unchanged""" + + B = Alias() + + def src(t): return float(t), float(t) * 2 + def ref(t): return float(t), float(t) * 2 + E = Embedding(B, src, ref) + + for t in range(10): self.assertTrue(np.allclose(*E.check_MIMO(t))) + + # RUN TESTS LOCALLY ==================================================================== if __name__ == '__main__': unittest.main(verbosity=2) \ No newline at end of file diff --git a/tests/pathsim/utils/test_mutable.py b/tests/pathsim/utils/test_mutable.py new file mode 100644 index 00000000..e43d36fd --- /dev/null +++ b/tests/pathsim/utils/test_mutable.py @@ -0,0 +1,267 @@ +######################################################################################## +## +## TESTS FOR +## 'utils.mutable.py' +## +######################################################################################## + +# IMPORTS ============================================================================== + +import unittest +import numpy as np + +from pathsim.blocks._block import Block +from pathsim.blocks.lti import StateSpace +from pathsim.blocks.ctrl import PT1, PT2, LeadLag, PID, AntiWindupPID +from pathsim.blocks.lti import TransferFunctionNumDen, TransferFunctionZPG +from pathsim.blocks.filters import ButterworthLowpassFilter +from pathsim.blocks.sources import SinusoidalSource, ClockSource +from pathsim.blocks.delay import Delay +from pathsim.blocks.fir import FIR +from pathsim.blocks.samplehold import SampleHold + +from pathsim.utils.mutable import mutable + + +# TESTS FOR DECORATOR ================================================================== + +class TestMutableDecorator(unittest.TestCase): + """Test the @mutable decorator mechanics.""" + + def test_basic_construction(self): + """Block construction should work as before.""" + pt1 = PT1(K=2.0, T=0.5) + self.assertEqual(pt1.K, 2.0) + self.assertEqual(pt1.T, 0.5) + np.testing.assert_array_almost_equal(pt1.A, [[-2.0]]) + np.testing.assert_array_almost_equal(pt1.B, [[4.0]]) + + def test_param_mutation_triggers_reinit(self): + """Changing a mutable param should update derived state.""" + pt1 = PT1(K=1.0, T=1.0) + np.testing.assert_array_almost_equal(pt1.A, [[-1.0]]) + np.testing.assert_array_almost_equal(pt1.B, [[1.0]]) + + pt1.K = 5.0 + np.testing.assert_array_almost_equal(pt1.A, [[-1.0]]) + np.testing.assert_array_almost_equal(pt1.B, [[5.0]]) + + pt1.T = 0.5 + np.testing.assert_array_almost_equal(pt1.A, [[-2.0]]) + np.testing.assert_array_almost_equal(pt1.B, [[10.0]]) + + def test_batched_set(self): + """set() should update multiple params with a single reinit.""" + pt1 = PT1(K=1.0, T=1.0) + pt1.set(K=3.0, T=0.2) + self.assertEqual(pt1.K, 3.0) + self.assertEqual(pt1.T, 0.2) + np.testing.assert_array_almost_equal(pt1.A, [[-5.0]]) + np.testing.assert_array_almost_equal(pt1.B, [[15.0]]) + + def test_mutable_params_introspection(self): + """_mutable_params should list all init params.""" + self.assertEqual(PT1._mutable_params, ("K", "T")) + self.assertEqual(PT2._mutable_params, ("K", "T", "d")) + self.assertEqual(PID._mutable_params, ("Kp", "Ki", "Kd", "f_max")) + + def test_mutable_params_inherited(self): + """AntiWindupPID should accumulate parent and own params.""" + self.assertIn("Kp", AntiWindupPID._mutable_params) + self.assertIn("Ks", AntiWindupPID._mutable_params) + self.assertIn("limits", AntiWindupPID._mutable_params) + # no duplicates + self.assertEqual( + len(AntiWindupPID._mutable_params), + len(set(AntiWindupPID._mutable_params)) + ) + + def test_no_reinit_during_construction(self): + """Properties should not trigger reinit during __init__.""" + # If this doesn't hang or error, the init guard works + pt1 = PT1(K=2.0, T=0.5) + self.assertTrue(pt1._param_locked) + + +# TESTS FOR ENGINE PRESERVATION ========================================================= + +class TestEnginePreservation(unittest.TestCase): + """Test that engine state is preserved across reinit.""" + + def test_engine_preserved_same_dimension(self): + """Engine should be preserved when state dimension doesn't change.""" + from pathsim.solvers.euler import EUF + + pt1 = PT1(K=1.0, T=1.0) + pt1.set_solver(EUF, None) + pt1.engine.state = np.array([42.0]) + + # Mutate parameter + pt1.K = 5.0 + + # Engine should be preserved with same state + self.assertIsNotNone(pt1.engine) + np.testing.assert_array_equal(pt1.engine.state, [42.0]) + + def test_engine_recreated_on_dimension_change(self): + """Engine should be recreated when state dimension changes.""" + from pathsim.solvers.euler import EUF + + filt = ButterworthLowpassFilter(Fc=100, n=2) + filt.set_solver(EUF, None) + + old_state_dim = len(filt.engine) + self.assertEqual(old_state_dim, 2) + + # Change filter order -> dimension change + filt.n = 4 + + # Engine should exist but with new dimension + self.assertIsNotNone(filt.engine) + self.assertEqual(len(filt.engine), 4) + + +# TESTS FOR INHERITANCE ================================================================= + +class TestInheritance(unittest.TestCase): + """Test that @mutable works with class hierarchies.""" + + def test_antiwinduppid_construction(self): + """AntiWindupPID should construct correctly with both decorators.""" + awpid = AntiWindupPID(Kp=2, Ki=0.5, Kd=0.1, f_max=1e3, Ks=10, limits=[-5, 5]) + self.assertEqual(awpid.Kp, 2) + self.assertEqual(awpid.Ks, 10) + + def test_antiwinduppid_parent_param_mutation(self): + """Mutating inherited param should reinit from most derived class.""" + awpid = AntiWindupPID(Kp=2, Ki=0.5, Kd=0.1, f_max=1e3, Ks=10, limits=[-5, 5]) + + # Mutate inherited param + awpid.Kp = 5.0 + + # op_dyn should still be the antiwindup version (not plain PID) + x = np.array([0.0, 0.0]) + u = np.array([1.0]) + result = awpid.op_dyn(x, u, 0) + # For AntiWindupPID with these params, dx1 = f_max*(u-x1), dx2 = u - w + self.assertEqual(len(result), 2) + + def test_antiwinduppid_own_param_mutation(self): + """Mutating AntiWindupPID's own param should work.""" + awpid = AntiWindupPID(Kp=2, Ki=0.5, Kd=0.1, f_max=1e3, Ks=10, limits=[-5, 5]) + awpid.Ks = 20 + self.assertEqual(awpid.Ks, 20) + + +# TESTS FOR SPECIFIC BLOCKS ============================================================= + +class TestSpecificBlocks(unittest.TestCase): + """Test @mutable on various block types.""" + + def test_pt2(self): + pt2 = PT2(K=1.0, T=1.0, d=0.5) + A_before = pt2.A.copy() + pt2.d = 0.7 + # A matrix should have changed + self.assertFalse(np.allclose(pt2.A, A_before)) + + def test_leadlag(self): + ll = LeadLag(K=1.0, T1=0.5, T2=0.1) + ll.K = 2.0 + self.assertEqual(ll.K, 2.0) + # C and D should reflect new K + np.testing.assert_array_almost_equal(ll.D, [[2.0 * 0.5 / 0.1]]) + + def test_transfer_function_numden(self): + tf = TransferFunctionNumDen(Num=[1], Den=[1, 1]) + np.testing.assert_array_almost_equal(tf.A, [[-1.0]]) + tf.Den = [1, 2] + np.testing.assert_array_almost_equal(tf.A, [[-2.0]]) + + def test_transfer_function_dimension_change(self): + """Changing denominator order should change state dimension.""" + tf = TransferFunctionNumDen(Num=[1], Den=[1, 1]) + self.assertEqual(tf.A.shape, (1, 1)) + tf.Den = [1, 3, 2] # second order + self.assertEqual(tf.A.shape, (2, 2)) + + def test_sinusoidal_source(self): + s = SinusoidalSource(frequency=10, amplitude=2, phase=0.5) + self.assertAlmostEqual(s._omega, 2*np.pi*10) + s.frequency = 20 + self.assertAlmostEqual(s._omega, 2*np.pi*20) + + def test_delay(self): + d = Delay(tau=0.01) + self.assertEqual(d._buffer.delay, 0.01) + d.tau = 0.05 + self.assertEqual(d._buffer.delay, 0.05) + + def test_clock_source(self): + c = ClockSource(T=1.0, tau=0.0) + self.assertEqual(c.events[0].t_period, 1.0) + c.T = 2.0 + self.assertEqual(c.events[0].t_period, 2.0) + + def test_fir(self): + f = FIR(coeffs=[0.5, 0.5], T=0.1) + self.assertEqual(f.T, 0.1) + f.T = 0.2 + self.assertEqual(f.T, 0.2) + self.assertEqual(f.events[0].t_period, 0.2) + + def test_samplehold(self): + sh = SampleHold(T=0.5, tau=0.0) + sh.T = 1.0 + self.assertEqual(sh.T, 1.0) + + def test_butterworth_filter_mutation(self): + filt = ButterworthLowpassFilter(Fc=100, n=2) + A_before = filt.A.copy() + filt.Fc = 200 + # Matrices should change + self.assertFalse(np.allclose(filt.A, A_before)) + + def test_butterworth_filter_order_change(self): + filt = ButterworthLowpassFilter(Fc=100, n=2) + self.assertEqual(filt.A.shape, (2, 2)) + filt.n = 4 + self.assertEqual(filt.A.shape, (4, 4)) + + +# INTEGRATION TEST ====================================================================== + +class TestMutableInSimulation(unittest.TestCase): + """Test parameter mutation in an actual simulation context.""" + + def test_pt1_mutation_mid_simulation(self): + """Mutating PT1 gain mid-simulation should affect output.""" + from pathsim import Simulation, Connection + from pathsim.blocks.sources import Constant + + src = Constant(value=1.0) + pt1 = PT1(K=1.0, T=0.1) + + sim = Simulation( + blocks=[src, pt1], + connections=[Connection(src, pt1)], + dt=0.01 + ) + + # Run for a bit + sim.run(duration=1.0) + output_before = pt1.outputs[0] + + # Change gain + pt1.K = 5.0 + + # Run more + sim.run(duration=1.0) + output_after = pt1.outputs[0] + + # With K=5 and enough settling time, output should approach 5.0 + self.assertGreater(output_after, output_before) + + +if __name__ == "__main__": + unittest.main()