Source code for ellalgo.oracles.lowpass_oracle

"""
Lowpass Oracle

This code implements a Lowpass Oracle, which is used to design a low-pass filter for signal processing. A low-pass filter allows low-frequency signals to pass through while attenuating high-frequency signals. The main purpose of this code is to help optimize the design of such a filter by providing a way to assess whether a given set of filter coefficients meets certain specifications.

The code defines a class called LowpassOracle that takes several inputs when initialized:

1. ndim: The number of filter coefficients
2. wpass: The end of the passband (frequencies that should pass through)
3. wstop: The end of the stopband (frequencies that should be attenuated)
4. lp_sq: The lower bound for the squared magnitude response in the passband
5. up_sq: The upper bound for the squared magnitude response in the passband
6. sp_sq: The upper bound for the squared magnitude response in the stopband

The main outputs of this code are produced by two methods: assess_feas and assess_optim. These methods take a set of filter coefficients as input and determine whether they meet the specified requirements or how close they are to meeting them.

The LowpassOracle achieves its purpose through a series of checks on the frequency response of the filter. It uses a pre-computed spectrum matrix to efficiently calculate the frequency response at different points. The code then checks if the response falls within the specified bounds for the passband and stopband.

The important logic flow in this code involves iterating through different frequency points and checking the filter's response at each point. If any violations of the specifications are found, the code returns information about the violation, which can be used to adjust the filter coefficients.

A key data transformation happening in this code is the conversion from filter coefficients to frequency response. This is done using the pre-computed spectrum matrix, which allows for efficient calculation of the response at many frequency points.

The code also includes a helper function called create_lowpass_case, which sets up a specific instance of the LowpassOracle with predefined parameters. This function can be used to quickly create a standard test case for filter design.

Overall, this code provides a tool for iteratively designing and optimizing low-pass filters by giving feedback on how well a set of coefficients meets the desired specifications. It's part of a larger optimization process where the coefficients would be adjusted based on the feedback from this oracle until a satisfactory filter design is achieved.
"""

from math import floor
from typing import Optional, Tuple, Union

import numpy as np

from ellalgo.ell_typing import OracleOptim

Arr = np.ndarray
ParallelCut = Tuple[Arr, Union[float, Tuple[float, float]]]


# Modified from CVX code by Almir Mutapcic in 2006.
# Adapted in 2010 for impulse response peak-minimization by convex iteration
# by Christine Law.
#
# "FIR Filter Design via Spectral Factorization and Convex Optimization"
# by S.-P. Wu, S. Boyd, and L. Vandenberghe
#
# Designs an FIR lowpass filter using spectral factorization method with
# constraint on maximum passband ripple and stopband attenuation:
#
#   minimize   max |H(w)|                      for w in stopband
#       s.t.   1/delta <= |H(w)| <= delta      for w in passband
#
# We change variables via spectral factorization method and get:
#
#   minimize   max R(w)                          for w in stopband
#       s.t.   (1/delta)**2 <= R(w) <= delta**2  for w in passband
#              R(w) >= 0                         for all w
#
# where R(w) is squared magnitude frequency response
# (and Fourier transform of autocorrelation coefficients r).
# Variables are coeffients r and gra = hh' where h is impulse response.
# delta is allowed passband ripple.
# This is a convex problem (can be formulated as an SDP after sampling).


# *********************************************************************
# filter specs (for a low-pass filter)
# *********************************************************************
# number of FIR coefficients (including zeroth)
[docs] class LowpassOracle(OracleOptim): # more_alt: bool = True idx1: int = 0 def __init__( self, ndim: int, wpass: float, wstop: float, lp_sq: float, up_sq: float, sp_sq: float, ): """ Initializes a LowpassOracle object with the given parameters. Args: ndim (int): The number of FIR coefficients (including the zeroth). wpass (float): The end of the passband. wstop (float): The end of the stopband. lp_sq (float): The lower bound on the squared magnitude frequency response in the passband. up_sq (float): The upper bound on the squared magnitude frequency response in the passband. sp_sq (float): The upper bound on the squared magnitude frequency response in the stopband. Attributes: spectrum (np.ndarray): The matrix used to compute the power spectrum. nwpass (int): The index of the end of the passband. nwstop (int): The index of the end of the stopband. lp_sq (float): The lower bound on the squared magnitude frequency response in the passband. up_sq (float): The upper bound on the squared magnitude frequency response in the passband. sp_sq (float): The upper bound on the squared magnitude frequency response in the stopband. idx1 (int): The current index for the passband. idx2 (int): The current index for the stopband. idx3 (int): The current index for the stopband. fmax (float): The maximum value of the squared magnitude frequency response. kmax (int): The index of the maximum value of the squared magnitude frequency response. """ # ********************************************************************* # optimization parameters # ********************************************************************* # rule-of-thumb discretization (from Cheney's Approximation Theory) mdim = 15 * ndim # Number of frequency points to evaluate w = np.linspace(0, np.pi, mdim) # omega (frequency points from 0 to π) # spectrum is the matrix used to compute the power spectrum # spectrum(w,:) = [1 2*cos(w) 2*cos(2*w) ... 2*cos(mdim*w)] # This creates a matrix where each row corresponds to a frequency point, # and each column contains the cosine terms for that frequency temp = 2 * np.cos(np.outer(w, np.arange(1, ndim))) self.spectrum = np.concatenate((np.ones((mdim, 1)), temp), axis=1) # Convert normalized frequency bounds to array indices self.nwpass: int = floor(wpass * (mdim - 1)) + 1 # end of passband self.nwstop: int = floor(wstop * (mdim - 1)) + 1 # end of stopband # Store the squared magnitude bounds self.lp_sq = lp_sq # Lower bound for passband (squared) self.up_sq = up_sq # Upper bound for passband (squared) self.sp_sq = sp_sq # Upper bound for stopband (squared) # Initialize indices for round-robin checking of frequency points self.idx1 = 0 # Current index for passband checking self.idx2 = self.nwpass # Current index for transition band checking self.idx3 = self.nwstop # Current index for stopband checking # Variables to track maximum response in stopband self.fmax = float("-inf") # Maximum response value found self.kmax = 0 # Index where maximum response occurs
[docs] def assess_feas(self, x: Arr) -> Optional[ParallelCut]: """ Assess whether the given filter coefficients meet the design specifications. This method checks the frequency response at various points in three bands: 1. Passband (0 to nwpass): Checks if response is within [lp_sq, up_sq] 2. Stopband (nwstop to end): Checks if response is below sp_sq and non-negative 3. Transition band (nwpass to nwstop): Checks if response is non-negative Uses a round-robin approach to check different frequency points on each call to distribute the computational load across multiple iterations. Args: x (Arr): The filter coefficients (autocorrelation coefficients) Returns: Optional[ParallelCut]: - None if all specifications are met - A tuple containing: * The gradient of the violating constraint * The violation amount (or tuple of lower/upper violations) """ # Get dimensions of the spectrum matrix mdim, ndim = self.spectrum.shape # Check passband frequencies (0 to nwpass) for _ in range(self.nwpass): self.idx1 += 1 if self.idx1 == self.nwpass: self.idx1 = 0 # round robin - wrap around to start col_k = self.spectrum[self.idx1, :] # Get frequency point coefficients v = col_k.dot(x) # Compute response at this frequency # Check upper bound violation if v > self.up_sq: f = (v - self.up_sq, v - self.lp_sq) return col_k, f # Return gradient and violation amounts # Check lower bound violation if v < self.lp_sq: f = (-v + self.lp_sq, -v + self.up_sq) return -col_k, f # Return negative gradient and violation amounts # Initialize tracking for stopband maximum response self.fmax = float("-inf") self.kmax = 0 # Check stopband frequencies (nwstop to end) for _ in range(self.nwstop, mdim): self.idx3 += 1 if self.idx3 == mdim: self.idx3 = self.nwstop # round robin - wrap around to start col_k = self.spectrum[self.idx3, :] v = col_k.dot(x) # Check upper bound violation in stopband if v > self.sp_sq: return col_k, (v - self.sp_sq, v) # Check non-negativity constraint if v < 0: return -col_k, (-v, -v + self.sp_sq) # Track maximum response in stopband (for optimization) if v > self.fmax: self.fmax = v self.kmax = self.idx3 # Check transition band frequencies (nwpass to nwstop) # Only need to ensure non-negativity here for _ in range(self.nwpass, self.nwstop): self.idx2 += 1 if self.idx2 == self.nwstop: self.idx2 = self.nwpass # round robin - wrap around to start col_k = self.spectrum[self.idx2, :] v = col_k.dot(x) # Check non-negativity constraint if v < 0: return -col_k, -v # Return single cut for non-negativity # Additional check: First coefficient should be non-negative if x[0] < 0: grad = np.zeros(ndim) grad[0] = -1.0 return grad, -x[0] # If all checks pass, return None (no violations) return None
[docs] def assess_optim(self, xc: Arr, gamma: float): """ Assess the optimality of the current filter coefficients for the stopband. First checks feasibility using assess_feas. If feasible, returns information about the maximum response in the stopband which can be used to further optimize the filter design. Args: xc (Arr): The filter coefficients (autocorrelation coefficients) gamma (float): The current best stopband attenuation value to beat Returns: tuple: A tuple containing: - A tuple of (gradient, (lower, upper)) for the maximum stopband response - The maximum stopband response value (or None if not feasible) """ # Update the stopband bound self.sp_sq = gamma # First check feasibility if cut := self.assess_feas(xc): return cut, None # Return feasibility cut and no objective value # If feasible, return information about the maximum stopband response return (self.spectrum[self.kmax, :], (0.0, self.fmax)), self.fmax
# ********************************************************************* # filter specs (for a low-pass filter) # ********************************************************************* # number of FIR coefficients (including zeroth)
[docs] def create_lowpass_case(ndim=48): """ Creates a standard low-pass filter design case with typical parameters. Sets up a LowpassOracle instance with commonly used specifications: - Passband edge at 0.12π - Stopband edge at 0.20π - Passband ripple of ±0.025 dB - Stopband attenuation of 0.125 Args: ndim (int, optional): Number of filter coefficients. Defaults to 48. Returns: LowpassOracle: An initialized LowpassOracle instance with standard parameters """ # Define normalized frequency tolerances delta0_wpass = 0.025 # Passband ripple tolerance delta0_wstop = 0.125 # Stopband attenuation tolerance # Convert to dB scale for calculations delta1 = 20 * np.log10(1 + delta0_wpass) # Passband ripple in dB delta2 = 20 * np.log10(delta0_wstop) # Stopband attenuation in dB # Convert dB specifications to linear scale low_pass = pow(10, -delta1 / 20) # Lower passband bound up_pass = pow(10, +delta1 / 20) # Upper passband bound stop_pass = pow(10, +delta2 / 20) # Stopband bound # Square the bounds for use with squared magnitude response lp_sq = low_pass * low_pass up_sq = up_pass * up_pass sp_sq = stop_pass * stop_pass # Create and return LowpassOracle instance with these parameters return LowpassOracle(ndim, 0.12, 0.20, lp_sq, up_sq, sp_sq)