Source code for memspectrum.memspectrum

"""
Core module of `memspectrum`. Implements class :class:`MESA`, which computes the spectrum of a given time-series using the Burg algorithm, plus some helpers.
"""

import sys
import numpy as np
import warnings
warnings.simplefilter('once', RuntimeWarning)
from scipy.signal import correlate, convolve

#############DEBUG LINE PROFILING
try:
	from line_profiler import LineProfiler

	def do_profile(follow=[]):
		def inner(func):
			def profiled_func(*args, **kwargs):
				try:
					profiler = LineProfiler()
					profiler.add_function(func)
					for f in follow:
						profiler.add_function(f)
					profiler.enable_by_count()
					return func(*args, **kwargs)
				finally:
					profiler.print_stats()
			return profiled_func
		return inner
except:
	pass

#pip install line_profiler
#add decorator @do_profile(follow=[]) before any function you need to track

#################

class loss_function:

	def __init__(self, method):
		"""
		Implements various method to choose the best recursive order for Burg's Algorithm
		Avilable methods are "FPE", "OBD", "CAT", "AIC", "Fixed". The most representative order is 
		chosen to be the one that minimized the related function. 
		
		Parameters
		----------
		method : 'str'
			Selects the method to be used to estimate the best order between "FPE",
			"OBD", "CAT", "AIC", "LL", "Fixed"

		"""
		self.method = method
		self.data_autocorr = None
		self._forward_error = None
		self._backward_error = None

	def __call__(self, *args): #Stefano: what are args? We should specify them and call them by name...
		if self.method == 'FPE':
			return self._FPE(args[0], args[2], args[3])
		if self.method == 'MDL':
			return self._FPE(args[0], args[2], args[3])
		elif self.method == 'CAT':
			return self._CAT(args[0], args[2], args[3])
		elif self.method == 'CAT_OLD':
			return self._CAT_OLD(args[0], args[2], args[3])
		elif self.method == 'OBD':
			return self._OBD(args[0], args[1], args[2], args[3])
		elif self.method =='AIC':
			return self._AIC(args[0], args[2], args[3])
		elif self.method =='LL':
			return self._LL(args[3], args[4])
		elif self.method == 'VM':
			return self._VM(args[2], args[3], args[5])
		elif self.method == 'Fixed':
			return self._Fixed(args[3])
		else:
			raise ValueError("{} is not a an available method! Valid choices are 'FPE', 'AIC', 'CAT', 'OBD' and 'Fixed'.".format(self.method))
	
	def _set_data(self, data):
		"""
		Computes the data auto-correlation in frequency domain and stores it. It is only useful for LL loss function
		
		Parameters
		----------
		data : :class:`~numpy:numpy.ndarray`
			The data to estimate the autocorrelation from
		"""
		data_tilde = np.fft.fft(data)
		self.data_autocorr = np.multiply(data_tilde, np.conj(data_tilde)).real #(N,)
		return
		
	
	def _FPE(self, P, N, m):
		"""
		Implements Akaike Final prediction Error to estimate the recursive 
		order 
		
		Parameters
		----------
		P : float
			The estimate of the variance for the white noise component.
		N : 'np.int'
			The length of the dataset.
		m : 'np.int'
			The recursive order.

		Returns
		-------
		float
			The value of FPE loss function.

		"""
		return P[-1] * (N + m + 1) / (N - m - 1)
	
	def _MDL(self, P, N, m):
		"""
		Implements Minimum Description Length to estimate the recursive 
		order 
		
		Parameters
		----------
		P : float
			The estimate of the variance for the white noise component.
		N : 'np.int'
			The length of the dataset.
		m : 'np.int'
			The recursive order.

		Returns
		-------
		float
			The value of MDL loss function.

		"""
		return N * np.log(P[-1]) + m * np.log(N)
    
	def _AIC(self, P, N, m):
		"""
		Implements Akaike information criterion to estimate the recursive 
		order 
		
		Parameters
		----------
		P : float
			The estimate of the variance for the white noise component.
		N : 'np.int'
			The length of the dataset.
		m : 'np.int'
			The recursive order.

		Returns
		-------
		float
			The value of AIC loss function.

		"""
		return np.log(P[-1]) + (2 * m) / N
	
	def _LL(self, m, spectrum):
		"""
		Implements data log-likelihood as a loss function
		Log-likelihood is composed of two terms A = 0.5*|x_f|**2/S(f) and B = 0.5*N*log(S(f))
		A wants to be large to agree with data; |B| wants to be small to keep the model simple
		The loss function to minimize is the balance of the two: L =  -A - B
		
		#WARNING: Untested: the performance of this loss function are not studied
		
		Parameters
		----------
		spectrum : :class:`~numpy:numpy.ndarray`
			The spectrum (PSD) for computing the log-likelihood (also with negative frequencies)
		data_tilde : :class:`~numpy:numpy.ndarray`
			Fourier transform of the data

		Returns
		-------
		float
			The value of the log-likelihood loss function

		"""
		if self.data_autocorr is None:
			raise ValueError("Autocorrelation of data shall be set, before computing the LL loss function")
		assert spectrum.shape == self.data_autocorr.shape, "Log-likelihood calculation: shape of the spectrum and of data do not match"
		T = len(spectrum)*4096
		autocorr = 0.5* np.sum(np.divide(self.data_autocorr, spectrum))
		det_S = 0.5*len(spectrum)*np.sum(np.log(spectrum))
		LL =  - autocorr/T - det_S
		#LL -= m*np.log(2)+np.log(m)
		#print(autocorr, det_S, m*np.log(2)+np.log(m), LL) #DEBUG
		return -LL #this is a loss function, we want to minimize it (maybe)
		
	
	def _CAT(self, P, N, m):
	 """
	 Implements Parzen's criterion on autoregressive transfer function
	 to estimate the recursive order 
	 
	 Parameters
	 ----------
	 P : float
		 The estimate of the variance for the white noise component.
	 N : 'np.int'
		 The length of the dataset.
	 m : 'np.int'
		 The recursive order.

	 Returns
	 -------
	 float
		 The value of CAT loss function.

	 """
	 if m == 0:
		 return np.inf
	 P = np.array(P[1:])
	 k = np.linspace(1, m, m)
	 PW_k = (N - k) / (N * P)
	 return PW_k.sum() / N - PW_k[-1]
	
	def _CAT_OLD(self, P, N, m):
		"""
		OUTDATED FOR ERRORS
		Implements Parzen's criterion on autoregressive transfer function
		to estimate the recursive order 
		
		Parameters
		----------
		P : float
			The estimate of the variance for the white noise component.
		N : 'np.int'
			The length of the dataset.
		m : 'np.int'
			The recursive order.

		Returns
		-------
		float
			The value of CAT loss function.

		"""
		if m == 0:
			return np.inf
		P = np.array(P[1:])
		k = np.linspace(1, m, m)
		PW_k = N / (N - k) * P
		return 1 / (N * PW_k.sum())- (1 / PW_k[-1])
	
	def _OBD(self, P, a_k, N, m):
		"""
		Implement Rao's Optimum Bayes Decision rule to estimate the recursive
		order

		Parameters
		----------
		 P : float
			The estimate of the variance for the white noise component.
		a_k : 'np.array'
			The values for the final prediction error coefficients for the 
			considered recursive order.
		N : 'np.int'
			The length of the dataset.
		m : 'np.int'
			The recursive order.

		Returns
		-------
		float
			The value of OBD loss function.

		"""
		P_m = P[-1]
		P = np.array(P[:-1])
		return (N - m - 2)*np.log(P_m) + m*np.log(N) + np.log(P).sum() + (a_k**2).sum()
	
	def _VM(self, N, m, k):
		"""
		Implement Variance Minimum (VM) method to estimate the recursive
		order

		Parameters
		----------
		N : 'np.int'
			 The length of the dataset.
		m : 'np.int'
			The recursive order.
		k : float
			The m-th order reflection coefficient

		Returns
		-------
		float
			The value of VM loss function.
		"""
		forward_error = self._forward_error[1:] + k * self._backward_error[:-1]
		backward_error = self._backward_error[:-1] + k * self._forward_error[1:]
		VM = np.sum(forward_error ** 2) / (N - 2 * m)
		self._forward_error, self._backward_error = forward_error, backward_error
		return VM
		
	def _Fixed(self, m):
		"""
		Returns a fixed recursive order m. Is implemented via a monotonically
		decreasing loss function

		Parameters
		----------
		m : 'np.int':
			The selected recursive order

		Returns
		-------
		float
			The value of the loss functiona at order m

		"""
		return 1./(m+1)

[docs] class MESA(object): """ Class the implement the reproduces the Maximum Entropy Spectrum of a given time-series. """ def __init__(self, filename = None, *args, **kwargs): """ Class that implements Burg method to estimate power spectral densitiy of time series. Parameters ---------- filename: str Name of file from which the model is loaded. If `None`, the model is not initialized """ self.P = None self.a_k = None #If a_k and P are None, the model is not already fitted self.N = None self.mu = None self.optimization = None if isinstance(filename,str): self.load(filename)
[docs] def save(self,filename): """ Save the class to file (if the spectral density analysis is already performed). The output file can be used to load the class with method load() File is a 1D array with the format: [P, N, mu, a_k]. The header holds the shapes for each array Parameters ---------- filename: str Name of the file to save the data at """ if self.P is None or self.a_k is None: raise RuntimeError("PSD analysis is not performed yet: unable to save the model. You should call solve() before saving to file") to_save = np.concatenate([[self.P], [float(self.N)],[self.mu], self.a_k]) header = "(1,1,1,{})".format(len(self.a_k)) np.savetxt(filename, to_save, header = header) return
[docs] def load(self,filename): """ Load the class from a given file. The file shall be the same format produced by save(). Parameters ---------- filename: str Name of the file to load the data from """ data = np.loadtxt(filename, dtype = np.complex128) if np.all(np.abs(data.imag) < 1e-40): data = data.real if data.ndim != 1: raise ValueError("Wrong format for the file: unable to load the model") #reading the first line with open(filename) as f: first_line = f.readline() shapes = eval(first_line.translate({ord('#'): None, ord(' '): None})) #checking for the header if not isinstance(shapes, tuple): if len(shapes) != 4 or not np.all([isinstance(s, int) for s in shapes]): raise ValueError("Wrong format for the header: unable to load the model") #assigning values self.P, self.N, self.mu, self.a_k = np.split(data, np.cumsum(shapes)[:-1]) self.N = int(self.N.real) return
[docs] def save_spectrum(self, filename, dt, frequencies = None): """ Saves the power spectral density computed by the model to a txt file. The PSD is evaluated on a user given grid of frequencies. If None, a standard grid is used (as computed by np.fft.fftfreq). The spectrum is saved as a 2D array: [f, PSD(f)] Parameters ---------- filename: str Name of the file to save the PSD at dt: float Sampling rate for the time series frequencies: :class:`~numpy:numpy.ndarray` Frequencies to compute the PSD at. If None, a standard grid will be computed """ if isinstance(frequencies, np.ndarray): PSD = self.spectrum(dt,frequencies) elif frequencies is None: PSD, frequencies = self.spectrum(dt) else: raise ValueError("Wrong type for input frequencies: expected \`np.ndarray\` or None, got {} instead".format(type(frequencies))) to_save = np.column_stack([frequencies, PSD]) np.savetxt(filename, to_save, header = "dt = {} s".format(dt)) return
def _spectrum(self, dt, N, P, a_k): """ Method that compted the spectrum of the time series on sampling frequencies Parameters ---------- dt: float Sampling rate for the time series N: float Length of the frequency grid P: float P parameter of the analytical formula (i.e. variance of the white noise) a_k: :class:`~numpy:numpy.ndarray` a_ks of the analytical formula (i.e. autoregressive coefficients) Returns ------- spectrum: :class:`~numpy:numpy.ndarray` PSD of the model, including both positive and negative frequencies freq: :class:`~numpy:numpy.ndarray` Frequencies at which spectrum is evaluated (as provided by np.fft.fftfreq) """ den = np.fft.fft(a_k, n=N) spectrum = dt * P / (np.abs(den) ** 2) return np.fft.fftfreq(N,dt), spectrum
[docs] def spectrum(self, dt = 1., frequencies = None, onesided = False): """ Computes the power spectral density of the model. Default returns power spectral density and frequency array automatically computed by sampling theory. It can also be computed on a user-given frequency grid passing proper frequency array Parameters ---------- dt: float Sampling interval frequencies: :class:`~numpy:numpy.ndarray` (positive) frequencies to evaluate the spectrum at If None, a equally spaced frequency grid is used (and returned) onesided: bool Whether the one sided PSD (only positive frequencies) shall be returned. It has effect only if a frequency array is not given Returns ------- frequencies: :class:`~numpy:numpy.ndarray` Frequencies at which power spectral density is evaluated (**only returned** if ``frequencies = None``) spectrum: :class:`~numpy:numpy.ndarray` Power spectral density """ if self.a_k is None: raise RuntimeError("Model is not initialized: unable to compute spectrum. Call MESA.solve() or load a model from file to initialize the model") f_ny = .5 / dt f_spec, spec = self._spectrum(dt, self.N, self.P, self.a_k) if frequencies is None: if onesided: return f_spec[:self.N//2], spec[:self.N//2] * 2 else: return f_spec, spec elif isinstance(frequencies, np.ndarray): if np.max(frequencies) > f_ny * 1.01: warnings.warn("Some of the required frequencies are higher than the Nyquist frequency ({} Hz): a zero PSD is returned for f>Nyquist".format(f_ny), UserWarning) f_interp = np.interp(frequencies, f_spec[:int(self.N/2+0.5)], spec.real[:int(self.N/2+0.5)], left = 0., right = 0.) return f_interp else: raise ValueError("Type of frequencies not understood: expected to be None or np.ndarray but given {} insted".format(type(frequencies))) return
[docs] def compute_autocorrelation(self, dt, normalize = True, scipy_convention = False): """ Compute the autocovariance :math:`C(\\tau)` of the data based on the autoregressive coefficients. The autocovariance is defined as: .. math:: C(\\tau) = E_t[(x_t - \mu)(x_{t+\\tau} -\mu)] where :math:`\mu` is the mean value of the timeseries. It amounts to the inverse Fourier transform of the PSD. If option ``scipy_convention`` is set, we center the autocorrelation, following :func:`~scipy:scipy.signal.correlate` conventions. .. math:: C(\\tau) = E_t[(x_t - \mu)(x_{t-\\tau+T} -\mu)] where :math:`T` is the segment length Parameters ---------- dt: float Sampling interval normalize: bool Whether the autocovariance should be normalized s.t. :math:`C(\\tau = 0) = 1` Returns ------- autocorr: :class:`~numpy:numpy.ndarray` Autocorrelation of the model """ f, spec = self.spectrum(dt) spec = spec[:int(self.N/2)] f = f[:int(self.N/2)] autocorr = np.fft.irfft(spec) #or there is a +1 in there... #autocov -= np.square(self.mu) #print(self.mu) if normalize: #autocorr /= np.max(autocorr) autocorr /= autocorr[0] if scipy_convention: autocorr = np.concatenate([autocorr[len(autocorr)//2+1:], autocorr[:len(autocorr)//2]]) return autocorr
[docs] def solve(self, data, m = None, optimisation_method = "FPE", method = "Fast", regularisation = 1.0, early_stop = True, verbose = False): """ Computes the power spectral density of the given data data by solving the Levinson recursion for the autoregressive coefficients a_k and the noise variance P. It uses the standard Burg method recursive and a Faster (but less stable) version. Default is 'Fast'. Parameters ---------- data: :class:`~numpy:numpy.ndarray` One dimensional array with the time series data for the spectrum calculation m : int Maximum number of recursions for the computation of the power spectral density. Maximum autoregressive order is p = m-1 Default is None, that means m = 2N / log(2N) optimisation_method: 'str' Method used to select the best recursive order. The order is chosen minimizing the corresponding method. Available methods are "FPE", "OBD", "CAT", "AIC", "Fixed". If optimisation_method is "Fixed", the autoregressive order is always set to m, without looking for a minimum. Deafult is "FPE". method: 'str' Can be "standard" or "Fast". Selects the algorithm used to compute the power spectral density with. Default is "Fast" regularisation: float Implement Tikhonov regularisation. Should be a number slightly larger than one. Default is 1, which means no regularisation early_stop: bool Default is True. Breaks the iteration if there is no new global maximum after 100 iterations. Recommended for every optimisation method but CAT. Has no effect with "Fixed" loss function. verbose: bool Whether to print the status of the mesa solution Returns ------- P: float Variance of white noise for the associated autoregressive process a_k: :class:`~numpy:numpy.ndarray` The coefficient used to compute the power spectral density optimization: :class:`~numpy:numpy.ndarray` The values of the chosen optimisation_method at every iteration """ data = np.array(data) data = np.squeeze(data) if isinstance(data.flatten()[0],np.number): assert data.ndim == 1, ValueError("Wrong number of dimension for data: 1 dim expcted but got {} dims".format(data.ndim)) else: raise ValueError("Type of data should np.ndarray: given {} instead. ".format(type(data))) self.data = data self.N = len(data) self.ref_coefficients = [] self.mu = np.mean(data) self.regularisation = regularisation self.early_stop = early_stop self.verbose = verbose if m is None: self.mmax = int(2*self.N/np.log(2.*self.N)) else: self.mmax = m if optimisation_method == 'Fixed': self.early_stop = False if optimisation_method == 'LL': warnings.warn('The performance of the \'LL\' loss function are not studied. There is no guarantee of stable and meaningful results') if method.lower() == "fast": self._method = self._FastBurg elif method.lower() == "standard": self._method = self._Burg else: print("Method {0} unknown! Valid choices are 'Fast' and 'Standard'".format(method)) return self._loss_function = loss_function(optimisation_method) self.P, self.a_k, self.optimization = self._method() del self.data return self.P, self.a_k, self.optimization
#@do_profile(follow=[]) def _FastBurg(self): """ Uses the Fast version of Burg Algorithm to compute the power spectral density. The order is selected by the minimization of the chosen method Returns ------- P: float Variance of white noise for the associated autoregressive process a_k: :class:`~numpy:numpy.ndarray` The coefficient used to compute the power spectral density optimization: :class:`~numpy:numpy.ndarray` The values of the chosen optimisation_method at every iteration """ #setting data for the loss function, if required spec = None if self._loss_function.method == 'LL': #computing data autocorrelation self._loss_function._set_data(self.data- np.mean(self.data)) #FIXME: make sure that here you need to compute the covariance!! spec = np.zeros(self.data.shape)+1e-200 if self._loss_function.method == 'VM': #Initialise loss function data attribute self._loss_function._forward_error, self._loss_function._backward_error\ = self.data, self.data #initialise forward and backward errors for computation of VM c = correlate(self.data, self.data)[self.N-1:self.N+self.mmax+1] #(M,) #very fast scipy correlation!! c[0] *= self.regularisation #Initialize variables a = [np.array([1])] P = [c[0] / self.N] r = 2 * c[1] g = np.array([2 * c[0] - self.data[0] * self.data[0].conj() - self.data[-1] * self.data[-1].conj(), r]) #Initialize lists to save arrays optimization = [] idx = None old_idx = 0 #Loop for the FastBurg Algorithm for i in range(self.mmax): if self.verbose: sys.stderr.write('\r\tIteration {0} of {1}'.format(i + 1, self.mmax)) #Update prediction error filter and reflection error coefficient k, new_a = self._updateCoefficients(a[-1], g) #Update variables. Check paper for indices at j-th loop. r = self._updateR(i, r, c[i + 2]) #Construct the array in two different, equivalent ways. DrA = self._constructDr2(i, new_a) #Update last coefficient g = self._updateG(g, k, r, new_a, DrA) #Append values to respective lists a.append(new_a) self.ref_coefficients.append(k) P.append(P[-1] * (1 - k * k.conj())) #Compute loss function value for chosen method if spec is not None: _, spec = self._spectrum(1.,len(self.data), P[-1], a[-1]) #_LL optimization.append(self._loss_function(P, a[-1], self.N, i + 1, spec, k)) is_nan = np.isnan(new_a).any() #checking for nans if np.abs(k)>1 or is_nan: warnings.warn("There is a numerical stability issue. Results might not be what you expect", RuntimeWarning) #dealing with stopping of the iteration #checking if there is a minimum (every some iterations) if early_stop option is on if is_nan and not self.early_stop: idx = np.nanargmin(optimization) break if ((i % 100 == 0 and i !=0) or (i >= self.mmax-1)) and self.early_stop: idx = np.nanargmin(optimization) if old_idx < idx: #if True, an improvement is made old_idx = idx else: break if not self.early_stop: idx = np.nanargmin(optimization) if self.verbose: sys.stderr.write('\n') return P[idx], a[idx], np.array(optimization) def _updateCoefficients(self, a, g): """ Updates the forward prediction error coefficients (needed to compute to compute the spectrum) from order i to order i + 1 i + 1 Parameters ---------- a : :class:`~numpy:numpy.ndarray` The i'th order forward prediction error coefficients. (Shape (i,)) g : :class:`~numpy:numpy.ndarray` Array used to update the forward prediction error (Shape (i+1, )). Returns ------- k : float The reflection coefficient. aUpd : :class:`~numpy:numpy.ndarray` The (i+1)'th order forward predicition error coefficients (Shape (i+1,)). """ a = np.concatenate((a, np.zeros(1))) #regularizing? Doesn't really work unfortunately... # a -> a/alpha # g -> g/gamma # N = len(a) # alpha = sqrt(N)*median_magnitude_of_a # a*a ~ 1 # median_magnitude_of_a = a[0] k = - (np.dot(a.conj(), g[::-1])) / (np.dot(a, g)) aUpd = a + k * a[::-1].conj() return k, aUpd def _updateR(self, i, r, aCorr): rUp = np.array([2 * aCorr]) rDown = r - self.data[: i + 1] * self.data[i + 1].conj() - \ self.data[self.N - i - 1 : ].conj()[::-1] * self.data[self.N - i - 2] return np.concatenate((rUp, rDown)) def _constructDr(self, i, a): data1 = self.data[ : i + 2][::-1] data2 = self.data[self.N - i - 2 :] d1 = -np.outer(data1, data1.conj()) d2 = -np.outer(data2.conj(), data2) return d1 + d2 def _constructDr2(self, i, a): data1 = self.data[ : i + 2][::-1] data2 = self.data[self.N - i - 2 :] d1 = - data1 * np.dot(data1, a).conj() d2 = - data2.conj() * np.dot(data2, a.conj()) return d1 + d2 def _updateG(self, g, k, r, a, Dra): gUp = g + (k * g[::-1]).conj() + Dra gDown = np.array([np.dot(r, a.conj())]) return np.concatenate((gUp, gDown)) def _Burg(self, **kwargs): """ Uses the Standard version of Burg Algorithm to compute the power spectral density. The order is selected by the minimization of the chosen method Returns ------- P: float Variance of white noise for the associated autoregressive process a_k: :class:`~numpy:numpy.ndarray` The coefficient used to compute the power spectral density optimization: :class:`~numpy:numpy.ndarray` The values of the chosen optimisation_method at every iteration """ #setting data for the loss function, if required spec = None if self._loss_function.method == 'LL': #computing data autocorrelation self._loss_function._set_data(self.data- np.mean(self.data)) spec = np.zeros(self.data.shape) if self._loss_function.method == 'VM': #Initialise loss function data attribute self._loss_function._forward_error, self._loss_function._backward_error\ = self.data, self.data #initialise forward and backward errors for computation of VM #initialization of variables P_0 = np.var(self.data)#(self.data ** 2).mean() P = [P_0] a_0 = 1 a_k = [np.array([a_0])] _f = np.array(self.data) _b = np.array(self.data) optimization = [] early_stop_step = 100 idx = None old_idx = 0 #Burg's recursion for i in range(self.mmax): if self.verbose: sys.stderr.write('\r\tIteration {0} of {1}'.format(i + 1, self.mmax)) f = _f[1:] b = _b[:-1] den = convolve(f,f[::-1], 'valid')[0] + convolve(b,b[::-1], 'valid')[0] k = - 2 * convolve(f,b[::-1], 'valid')[0] / den a_k.append(self._updatePredictionCoefficient(a_k[i], k)) P.append(P[i] * (1 - k * k.conj())) self.ref_coefficients.append(k) _f = f + k * b _b = b + k * f #print('P: ', P, '\nak: ', a_k[-1]) if spec is not None: spec = self.spectrum()[1] optimization.append(self._loss_function(P, a_k[-1], self.N, i + 1, spec, k)) #checking if there is a minimum (every some iterations) if early_stop option is on if ((i % early_stop_step == 0 and i !=0) or (i >= self.mmax-1)) and self.early_stop: idx = np.argmin(optimization) #+ 1 #print(i, optimization[idx]) #DEBUG if old_idx < idx and optimization[idx]*1.01 < optimization[old_idx]: old_idx = idx else: old_idx = idx break if not self.early_stop: idx = np.argmin(optimization) #+ 1 if self.verbose: sys.stderr.write('\n') return P[idx], a_k[idx], optimization def _updatePredictionCoefficient(self, x, reflectionCoefficient): """ Uses the Levinson recursion to update the prediction error coefficients Parameters ---------- x : :class:`~numpy:numpy.ndarray` The i'th order forward prediction error coefficients (Shape (i,)). reflectionCoefficient : float The i'th order reflection coefficients, used to update the forward prediction error coefficients via the solution of Levinson Recursion. Returns ------- :class:`~numpy:numpy.ndarray` The updatet forward prediction error coefficients at order i + 1 (Shape (i + 1,)). """ new_x = np.concatenate((x, np.zeros(1))) return new_x + reflectionCoefficient * new_x[::-1]
[docs] def get_p(self): """ Returns the order of the autoregressive process that defines the PSD. Returns ------- p : int Order of the autoregressive process that define the PSD """ return self.a_k.size - 1
@property def p(self): return self.a_k.size - 1
[docs] def forecast(self, data, length, number_of_simulations = 1, P = None, include_data = False, seed = None, verbose = False): """ Forecasting on an observed process for a total number of points given by length. It computes number_of_simulations realization of the forecast time series. This method can only be used if a_k coefficients have been computed already. Use solve method before forecasting. Parameters ---------- data: :class:`~numpy:numpy.ndarray` data for the spectrum calculation length : 'np.int' Number of future points to be predicted number_of_simulations : 'np.int' Total number of simulations of the process P : float Variance of white noise for the autoregressive process. Default is None and uses the estimate obtained with Burg's algorithm. include_data: `bool` Whether to prepend to the output the input time series seed: `int` Seed for the random generator. If is None, no initialization of the seed is done and the authomatic numpy setting is used. verbose: `bool` Whether to print the status of the forecasting Returns ------- predictions : :class:`~numpy:numpy.ndarray` Array containing the forecasted points for every simulation of the process (Shape (number_of_simulations, length)) """ #FIXME: this function does not work when len(a.k) ==1. Fix this problem if self.P is None or self.a_k is None: raise RuntimeError("PSD analysis is not performed yet: unable to forecast the data. You should call solve() before forecasting") if P is None: P = self.P p = self.a_k.size - 1 predictions = np.zeros((number_of_simulations, p + length)) data = np.array(data) if data.ndim > 1: data = np.squeeze(data) if isinstance(data.flatten()[0],np.number): assert data.ndim == 1, ValueError("Wrong number of dimension for data: 1 dim expcted but got {} dims".format(data.ndim)) if len(data) >= p > 0: predictions[:,:p] = data[-p:] elif p!=0: raise ValueError("Data are not long enough for forecasting") else: pass else: raise ValueError("Type of data should np.ndarray: given {} instead. ".format(type(data))) if isinstance(seed, int): np.random.seed(seed) elif seed is not None: warnings.warn("Invalid seed given for the generator. Default one used",UserWarning) coef = - self.a_k[1:][::-1] for i in range(length): if verbose: sys.stderr.write('\r {0} of {1}'.format(i + 1, length)) predictions[:, p + i] = predictions[:, i: p + i] @ coef +\ np.random.normal(0, np.sqrt(P), size = number_of_simulations) if verbose: sys.stderr.write('\n') if not include_data: return predictions[:,p:] return predictions
[docs] def generate_noise(self, T, sampling_rate = 1., fmin = None, fmax = None, N_series = 1): """ Generate some noise starting from the internal :math:`AR(p)` model. The noise generated has the same features as the data given in input to the mesa. Parameters ---------- T: float Length (in seconds) of the signal to generate sampling_rate: float Sampling rate fmin: float Minimum frequency in the signal (if None, is equal to zero) fmax: float Maximum frequency in the signal (if None, Nyquist frequency is used: f_Ny = 0.5*sampling_rate) N_series: float Number of time series to generate Returns ------- times: :class:`~numpy:numpy.ndarray` Time grid at which the noise time series is evaluated at times_series: :class:`~numpy:numpy.ndarray` Time series (shape (N_series, sampling_rate*T) ) frequencies: :class:`~numpy:numpy.ndarray` Frequency grid at which the noise frequency series is evaluated at frequency_series: :class:`~numpy:numpy.ndarray` Frequency series from which the times_series is computed (shape (N_series, sampling_rate*T) ) psd: :class:`~numpy:numpy.ndarray` PSD of the model, including both positive and negative frequencies """ df = 1 / T N = int(sampling_rate * T) times = np.linspace(0., T , N) if fmin == None: fmin = 0 if fmax == None: fmax = (N / 2) / T # filter out the bad bits kmin = np.int(fmin/df) kmax = np.int(fmax/df) + 1 # generate the FD noise frequencies = df * np.linspace(kmin, kmax, int(N / 2 + 1)) #(D,) #df * N / 2 is Ny frequency, + 1 needed because arange cuts last term psd = self.spectrum(1/sampling_rate, frequencies) sigma = np.sqrt(psd / df * .5) #(D,) phi = np.random.uniform(0, 2 * np.pi, len(sigma)) frequency_series = np.einsum('ij,j -> ij',np.random.normal(0, 1, (N_series,len(sigma))) + 1j * np.random.normal(0, 1, (N_series,len(sigma)) ), sigma) #(N_series,D) # inverse FFT to return the TD strain time_series = np.fft.irfft(frequency_series) * df * N ##(N_series, N ) return times, np.squeeze(time_series), frequencies, np.squeeze(frequency_series), psd
[docs] def entropy_rate(self, dt): """ Compute the entropy gain :math:`\Delta H` for a given power spectrum: .. math:: \Delta H = \int_{- Ny}^{Ny}\log S(f) df It is described in Eq (84) in `Papoulis <https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=1163713>`_ or Eq (10) in `Martini et al. <https://arxiv.org/pdf/2106.09499.pdf>`_ Parameters ---------- dt: float Sampling interval Returns ------- rate: float Entropy rate """ f, psd = self.spectrum(dt = dt) df = np.diff(f)[0] return np.sum(np.log(psd))*df/(4*f.max())+0.5*np.log(2.0*np.pi*np.e)
[docs] def logL(self, data, dt): """ Compute the log likelihood given the current spectrum data smust be in the time domain. Parameters ---------- data: :class:`~numpy:numpy.ndarray` Stretch of data to compute the likelihood of dt: float Sampling interval Returns ------- white_data: :class:`~numpy:numpy.ndarray` Stretch of (trimmed) whitened data """ N = len(data) f = np.fft.rfftfreq(N) psd = self.spectrum(frequencies = f, dt = dt, onesided=True) d = np.fft.rfft(data)*dt TwoDeltaTOverN = 2*dt/N return -TwoDeltaTOverN*np.vdot(d, d/(psd*dt**2)).real-0.5*np.sum(np.log(0.5*np.pi*N*dt*psd))
[docs] def whiten(self, data, trim = None, zero_phase = False): """ Whiten the data by convolving with the autoregressive coeffiecients ``a_k`` Parameters ---------- data: :class:`~numpy:numpy.ndarray` Stretch of data to whiten trim: int Number of points to remove at the begining and at the end of the stretch of data to remove edge effects. If `None`, trim is set to be equal to the autoregressive order. Returns ------- white_data: :class:`~numpy:numpy.ndarray` Stretch of (trimmed) whitened data """ if zero_phase is False: white_data = convolve(data, self.a_k, mode = 'same')/np.sqrt(self.P) else: c = np.correlate(self.a_k, self.a_k, 'full') c /= c.max() white_data = np.convolve(c, data, 'full')/np.sqrt(self.P) if trim is None: trim = self.get_p() if trim: white_data = white_data[trim:-trim] return white_data