From ea73526e82bd8670d59b6cb19ef4cc1134ff244a Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Wed, 11 Mar 2026 13:51:23 -0400 Subject: [PATCH 01/19] created skeleton for refine_acceleration() and it's helper method --- endaq/calc/filters.py | 103 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/endaq/calc/filters.py b/endaq/calc/filters.py index 58829764..4ea8e684 100644 --- a/endaq/calc/filters.py +++ b/endaq/calc/filters.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Optional, Union, Tuple +from typing import Optional, Union, Tuple, List import functools import pandas as pd @@ -344,6 +344,107 @@ def ellip( return df +def refine_acceleration( + dfs : List[pd.DataFrame], + product_name: Optional[str], + *, + deg: Optional[int] = None, + window_length: Optional[int] = 10, + step = 3 + ) -> pd.DataFrame: + """ + Combines mutliple acceleration channels to create a with a flat frequency response curve + using a modified savitzky-golay filter. Note that this will remove 2 * py:param:`window_length` + datapoints, py:param:`window_length` from each side. + + :param dfs: a List of dataframes to combine, all of which has a 'X', 'Y', and 'Z' channel. + :param product_name: the name of the enDAQ device used to record the dataframes, which can + be found through doc.recorderInfo['ProductName']. If None, the error will come from the + worst case of error between the sensor types of the rating. + + :param deg: a keyword-only argument that sets the degree of the polynomial + in the salgov for polynomial fitting. If left as None, a degree will be calculated + :param window_length: a keyword-only argument that sets the size of the window used in + the salgov for polynomial fitting. + :param step: a keyword-only argument that sets how many points are generated at each + step of the savitzky-golay filter. This parameter cannot be larger than + py:param:`window_length` + + :return: a DataFrame, with columns ['X (adjusted)', 'Y (adjusted)', 'Z (adjusted)'], with + each column containing their respective data. + """ + + #drop any channels that don't match the correct regex + + #create / apply noise column + + #combine dataframes using pd.concat + + #convert to numeric, drop any NaN / inf values + + #apply weighted_savgol_filter() + + #combine to pd.DataFrame + + pass + +def weighted_savgol_filter(x: List[float], + y: List[float], + deg: int, + window_length: int = 1, + w: Optional[List[float]] = None, + step : int = 1) -> pd.Series: + """ + A variation on the savgol filter, who adjusts points by fitting them to a windowed best fit + curve of a given degree. This implementation adds :py:param:`weight` and :py:param:`step` + parameters, which allow the filter to have bias towards lower noise variables, and perform less + calculation for higher step values. + + :param x: with n datapoints. + :param y: with n datapoints. + :param deg: the degree of the polynomial to fit to. + :param w: a List of weights respective to the y point in the same index with n datapoints. + If None is given, all points will be weighted equal, like a standard savgol implementation. + :param window_length: the size of the window, dictating how many datapoints will be fit to the + polynomial. + :param step: dictates how many datapoints are generated from the best fit polynomial, + between 1 and n. + + :return: a pandas series where the index is x, and the values are the adjusted y-points. + """ + #validity checks + if len(x) != len(y) or len(x) != len(w): + raise ValueError("x, y, and weights all need to share equal number of points") + + if len(x) <= window_length: + raise ValueError("the number of datapoints has to be greater than the window size") + + new_x = [] + new_y = [] + + start_idx = window_length + end_idx = start_idx + window_length + + while end_idx < len(x): + x_temp = x[start_idx:end_idx] + y_temp = y[start_idx:end_idx] + w_temp = w[start_idx:end_idx] + poly_fit = np.polyfit( + x = x_temp, + y = y_temp, + deg = deg if deg is not None else np.ceil(window_length / 2), + w = w_temp, + ) + mid_point = x[int(np.ceil((start_idx + end_idx)/ 2))] + new_x.append(mid_point) + new_y.append( + np.sum([(mid_point ** (i - 1)) * poly_fit[-1 * i] + for i in range(len(poly_fit), 0, -1)])) + + start_idx += step + end_idx += step + return pd.Series(new_y, index = new_x) + def _fftnoise(f): """ Generate time series noise for a given range of frequencies with random phase using ifft. From 62f5132948c66531981d33353feaed7e92ac193d Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Fri, 20 Mar 2026 16:43:32 -0400 Subject: [PATCH 02/19] created new function align_datasets in utils, reworking refine_acceleration in filters --- endaq/calc/filters.py | 107 +++++------------------------------------- endaq/calc/utils.py | 48 ++++++++++++++++++- 2 files changed, 58 insertions(+), 97 deletions(-) diff --git a/endaq/calc/filters.py b/endaq/calc/filters.py index 4ea8e684..13864e77 100644 --- a/endaq/calc/filters.py +++ b/endaq/calc/filters.py @@ -344,106 +344,21 @@ def ellip( return df -def refine_acceleration( - dfs : List[pd.DataFrame], - product_name: Optional[str], - *, - deg: Optional[int] = None, - window_length: Optional[int] = 10, - step = 3 - ) -> pd.DataFrame: +def refine_acceleration(dfs: List[pd.DataFrame], model_name: str) -> pd.DataFrame: """ - Combines mutliple acceleration channels to create a with a flat frequency response curve - using a modified savitzky-golay filter. Note that this will remove 2 * py:param:`window_length` - datapoints, py:param:`window_length` from each side. - - :param dfs: a List of dataframes to combine, all of which has a 'X', 'Y', and 'Z' channel. - :param product_name: the name of the enDAQ device used to record the dataframes, which can - be found through doc.recorderInfo['ProductName']. If None, the error will come from the - worst case of error between the sensor types of the rating. - - :param deg: a keyword-only argument that sets the degree of the polynomial - in the salgov for polynomial fitting. If left as None, a degree will be calculated - :param window_length: a keyword-only argument that sets the size of the window used in - the salgov for polynomial fitting. - :param step: a keyword-only argument that sets how many points are generated at each - step of the savitzky-golay filter. This parameter cannot be larger than - py:param:`window_length` - - :return: a DataFrame, with columns ['X (adjusted)', 'Y (adjusted)', 'Z (adjusted)'], with - each column containing their respective data. - """ - - #drop any channels that don't match the correct regex - - #create / apply noise column - - #combine dataframes using pd.concat - - #convert to numeric, drop any NaN / inf values - - #apply weighted_savgol_filter() - - #combine to pd.DataFrame + Generates more accurate acceleration data by combining the two acceleration channels on + an enDAQ device. This is done by filtering out inaccurate frequencies based off of known + frequency curves, and averaging out data based on noise values. - pass + :param dfs: Two panda dataframes of acceleration channels. If there exists channels + other than X,Y,Z, they will be droppedd. + :param model_name: a string -def weighted_savgol_filter(x: List[float], - y: List[float], - deg: int, - window_length: int = 1, - w: Optional[List[float]] = None, - step : int = 1) -> pd.Series: + :return: a pandas Dataframe with the flattened acceleration data """ - A variation on the savgol filter, who adjusts points by fitting them to a windowed best fit - curve of a given degree. This implementation adds :py:param:`weight` and :py:param:`step` - parameters, which allow the filter to have bias towards lower noise variables, and perform less - calculation for higher step values. - - :param x: with n datapoints. - :param y: with n datapoints. - :param deg: the degree of the polynomial to fit to. - :param w: a List of weights respective to the y point in the same index with n datapoints. - If None is given, all points will be weighted equal, like a standard savgol implementation. - :param window_length: the size of the window, dictating how many datapoints will be fit to the - polynomial. - :param step: dictates how many datapoints are generated from the best fit polynomial, - between 1 and n. - - :return: a pandas series where the index is x, and the values are the adjusted y-points. - """ - #validity checks - if len(x) != len(y) or len(x) != len(w): - raise ValueError("x, y, and weights all need to share equal number of points") - - if len(x) <= window_length: - raise ValueError("the number of datapoints has to be greater than the window size") - - new_x = [] - new_y = [] - - start_idx = window_length - end_idx = start_idx + window_length - - while end_idx < len(x): - x_temp = x[start_idx:end_idx] - y_temp = y[start_idx:end_idx] - w_temp = w[start_idx:end_idx] - poly_fit = np.polyfit( - x = x_temp, - y = y_temp, - deg = deg if deg is not None else np.ceil(window_length / 2), - w = w_temp, - ) - mid_point = x[int(np.ceil((start_idx + end_idx)/ 2))] - new_x.append(mid_point) - new_y.append( - np.sum([(mid_point ** (i - 1)) * poly_fit[-1 * i] - for i in range(len(poly_fit), 0, -1)])) - - start_idx += step - end_idx += step - return pd.Series(new_y, index = new_x) + #NOTE: if wanted, we can extract it from the device info (eg: SX-EXXDXX), it would only work + #under the assumption that there can not exist two sensors with the same g-rating. + def _fftnoise(f): """ diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 6bde9698..89212b7f 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -2,8 +2,9 @@ from __future__ import annotations +import bisect import typing -from typing import Optional, Union, Literal +from typing import List, Optional, Union, Literal import warnings import numpy as np @@ -462,3 +463,48 @@ def to_altitude(df: pd.DataFrame, # Return DataFrame with New Altitude Column return alt_df + +def align_datasets(dfs) -> List[pd.DataFrame]: + """ + Resamples the given dataframes to all be equal-sized with resampled uniform timestamps. + Any timestamps outside of the shared range will be dropped. + + :return: a list of dataframes in the same order that they were inputted in. + """ + aligned_start = max([df.index[0] for df in dfs]) + aligned_end = min([df.index[-1] for df in dfs]) + + if aligned_start > aligned_end: + raise Exception() + left_idx = [bisect.bisect_right(df.index, aligned_start) - 1 for df in dfs] #the most left point **in bound** + right_idx = [bisect.bisect_left(df.index, aligned_end) for df in dfs] #the first right point oob + + trimmed_dfs = [df[i][left_idx[i] + 1: right_idx[i] - 1] for i in range(len(dfs))] + + for i, (df, l_idx) in enumerate(zip(dfs, left_idx)): + if df.index[l_idx] != aligned_start: + dt = (df.index[l_idx] - df.index[l_idx] - 1).total_seconds() + da = (df.iloc[l_idx] - df.iloc[l_idx - 1]) / dt + new_dt = (aligned_start - df.index[l_idx]).total_seconds() + new_point = df.iloc[l_idx - 1] + new_dt * da + trimmed_dfs[i] = pd.concat([ + pd.DataFrame([new_point], index= [aligned_start]), + trimmed_dfs[i] + ]) + else: + trimmed_dfs[i] = pd.concat([df.loc[[aligned_start]], trimmed_dfs[i]]) + + for i, (df, r_idx) in enumerate(zip(dfs, right_idx)): + if df.index[r_idx] != aligned_end: + dt = (df.index[r_idx] - df.index[r_idx - 1]).total_seconds() + da = (df.iloc[r_idx] - df.iloc[r_idx - 1]) / dt + new_dt = (aligned_end - (df.index[r_idx - 1])).total_seconds() + trimmed_dfs[i] - pd.concat([ + trimmed_dfs[i], + pd.DataFrame([df.iloc[r_idx - 1] + new_dt * da], index = [aligned_end]) + ]) + else: + trimmed_dfs[i] = pd.concat([trimmed_dfs[i]], df.loc[[aligned_end]]) + #TODO: fill in with what none should be + resampled_dfs = [resample(df, None) for df in dfs] + return resampled_dfs \ No newline at end of file From 496336a212fc759d313f2ef748fb1794381abc87 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Mon, 23 Mar 2026 12:51:51 -0400 Subject: [PATCH 03/19] align datasets created, refine_acceleration working implmentation --- endaq/calc/filters.py | 120 +++++++++++++++++++++++++++++++++++++++--- endaq/calc/utils.py | 22 +++++--- 2 files changed, 126 insertions(+), 16 deletions(-) diff --git a/endaq/calc/filters.py b/endaq/calc/filters.py index 13864e77..80ba8649 100644 --- a/endaq/calc/filters.py +++ b/endaq/calc/filters.py @@ -6,6 +6,7 @@ import pandas as pd import numpy as np import scipy.signal +import re from endaq.calc import utils @@ -344,21 +345,124 @@ def ellip( return df -def refine_acceleration(dfs: List[pd.DataFrame], model_name: str) -> pd.DataFrame: +def refine_acceleration(dfs: List[pd.DataFrame], part_number: str) -> pd.DataFrame: """ Generates more accurate acceleration data by combining the two acceleration channels on - an enDAQ device. This is done by filtering out inaccurate frequencies based off of known + an enDAQ device. This is done by filtering out inaccurate frequencies based off of known frequency curves, and averaging out data based on noise values. + + :param dfs: 2 panda dataframes with DateTimeIndex, converted by `endaq.ide.to_pandas`. + Any non X / Y / Z acceleration channels will be dropped. The first dataframe should be + the main accelerometer (channel 8), and the second should be the secondary accelerometer, + (channel 32 or channel 80). + :param part_number: a string of the product name, that (for a doc `ds`) can be found by + `ds.recoderInfo['PartNumber']`. + + :return: a pandas Dataframe with the flattened acceleration data. + """ + #removing non X/Y/Z acceleration channels + for idx in range(len(dfs)): + df = dfs[idx] + df = df[df.columns[ + list(map(lambda x: x[0] in ['X', 'Y', 'Z'], + df.columns))]].copy() + for col in df.columns: + df[col] = df[col] - np.mean(df[col]) + dfs[idx] = df + + + aligned_dfs = utils.align_dataframes(dfs) + sensors = re.findall(r"[E|D|R]\d+", part_number) + if len(dfs) != 2 or len(dfs) != len(sensors): + raise Exception("there needs to be exactly two dataframes and the part " \ + "number needs to contain exactly two sensor ids") + + #TODO: 0 mean everything ... easy way or the easy way. + sensor_info = [_sensor_info(sensor) for sensor in sensors] + noises = [si['noise'] for si in sensor_info] + bounds = [(si['low_cutoff'], si['high_cutoff']) for si in sensor_info] + dfs = [butterworth(df, low_cutoff= l_bound, high_cutoff= r_bound) + for (df, (l_bound, r_bound)) in zip(aligned_dfs, bounds)] + for df in dfs: df.columns = ['X', 'Y', 'Z'] + + overlap = (max([bound[0] for bound in bounds]), min([bound[1] for bound in bounds])) + + hz_overlap_dfs = [butterworth(df, low_cutoff=overlap[0], high_cutoff=overlap[1]) + for df in dfs] + + averaged_df = _weighted_avg(hz_overlap_dfs, noises) + hz_isolated_dfs = [] + for df, (l_bound, r_bound) in zip(dfs, bounds): + isolated_df = df + if l_bound == overlap[0]: butterworth(isolated_df, high_cutoff= l_bound) + if r_bound == overlap[1]: butterworth(isolated_df, low_cutoff= r_bound) + hz_isolated_dfs.append(isolated_df) + + return sum(hz_isolated_dfs + [averaged_df]) #+ is joining lists, not adding numbers - :param dfs: Two panda dataframes of acceleration channels. If there exists channels - other than X,Y,Z, they will be droppedd. - :param model_name: a string +def _weighted_avg(dfs: List[pd.DataFrame], noise: List[float]): + """ + Combines multiple dataframe into one by applying a weighted average base on noise in a linear-inverse fashion. + For sensors A and B, if the noise ratio is 2:1, the weighing ratio will be 1:2. Note that a value of 0 is + considered in the weighting. + + :param dfs: the dataframes to normalize. It is assumed that all dataframes have the same indecies. + :param noise: The noise value to the index-respective dataframe. Noise will be normalized. - :return: a pandas Dataframe with the flattened acceleration data + :return: a single dataframe with the weighted values """ - #NOTE: if wanted, we can extract it from the device info (eg: SX-EXXDXX), it would only work - #under the assumption that there can not exist two sensors with the same g-rating. + if len(dfs) != len(noise) or len(dfs) == 0: + raise Exception("dataframes and noise need to have equal, non zero number of elements") + #normalize the weightings. If there is a weighting with 0 noise, return that instead + if 0 in noise: + return dfs[noise.index(0)] + noise = (np.array(noise) / sum(noise))[::-1] + + return pd.DataFrame(sum([n * d for n, d in zip(noise, dfs)]), + columns = dfs[0].columns, index = dfs[0].index) + +def _sensor_info(sensor: str) -> dict: + """ + creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely + + - sensor_type: Literal["D", "E", "R"]. Indicates if the sensor is digital, piezoelectric, + or piezoresistive + - rating: the g rating of the sensor, or the maximum it can read while accurate + - noise: float. Indicates the noise of the sensor when the response curve is flat + - low_cutoff: the lowest Hz where the response curve is flat. + - high_cutoff: the highest Hz where the response curve is flat. + + :return: a dictionary with the information stated above + """ + sensor_type = sensor[0] + sensor_rating = int(sensor[1:]) + + match sensor_type: + case "E": + low = 10 + high = sensor_rating / 5 + noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[sensor_rating] + case "D": + try: + low, high = {16: (1,300), 40: (1, 100)}[sensor_rating] + noise = {16: 4E-3, 40: 8E-5}[sensor_rating] + except KeyError: + raise Exception(f"rating {sensor_rating} not supported for digital IMUs") + case "R": + low = 1 + high = sensor_rating / 5 + noise = {100: 3E-3, 500: 1.5E-2, 2000: 0.06} + case _: + raise Exception(f"Sensor type {sensor_type} not recognized, should be one of E, D, R.") + + return { + "sensor_type": sensor_type, + "noise": noise, + "rating": sensor_rating, + "low_cutoff": low, + "high_cutoff": high, + } def _fftnoise(f): """ diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 89212b7f..05825497 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -464,10 +464,16 @@ def to_altitude(df: pd.DataFrame, # Return DataFrame with New Altitude Column return alt_df -def align_datasets(dfs) -> List[pd.DataFrame]: +def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: """ Resamples the given dataframes to all be equal-sized with resampled uniform timestamps. - Any timestamps outside of the shared range will be dropped. + Any timestamps outside of the shared range will be dropped.
+ **NOTE**: at the time of writing this method, there is an issue with :py:func:`scipy.signal.resample()`, + where some dataframes may be returned squished. If at least one of the two dataframes contain the + correct timestamps, those timestamps will be used. Otherwise, the timestamps will be manually + computed. + + :param dfs: a List of dataframes with DateTimeIndex to align. :return: a list of dataframes in the same order that they were inputted in. """ @@ -479,11 +485,11 @@ def align_datasets(dfs) -> List[pd.DataFrame]: left_idx = [bisect.bisect_right(df.index, aligned_start) - 1 for df in dfs] #the most left point **in bound** right_idx = [bisect.bisect_left(df.index, aligned_end) for df in dfs] #the first right point oob - trimmed_dfs = [df[i][left_idx[i] + 1: right_idx[i] - 1] for i in range(len(dfs))] + trimmed_dfs = [dfs[i][left_idx[i] + 1: right_idx[i] - 1] for i in range(len(dfs))] for i, (df, l_idx) in enumerate(zip(dfs, left_idx)): if df.index[l_idx] != aligned_start: - dt = (df.index[l_idx] - df.index[l_idx] - 1).total_seconds() + dt = (df.index[l_idx] - df.index[l_idx - 1]).total_seconds() da = (df.iloc[l_idx] - df.iloc[l_idx - 1]) / dt new_dt = (aligned_start - df.index[l_idx]).total_seconds() new_point = df.iloc[l_idx - 1] + new_dt * da @@ -499,12 +505,12 @@ def align_datasets(dfs) -> List[pd.DataFrame]: dt = (df.index[r_idx] - df.index[r_idx - 1]).total_seconds() da = (df.iloc[r_idx] - df.iloc[r_idx - 1]) / dt new_dt = (aligned_end - (df.index[r_idx - 1])).total_seconds() - trimmed_dfs[i] - pd.concat([ + trimmed_dfs[i] = pd.concat([ trimmed_dfs[i], pd.DataFrame([df.iloc[r_idx - 1] + new_dt * da], index = [aligned_end]) ]) else: - trimmed_dfs[i] = pd.concat([trimmed_dfs[i]], df.loc[[aligned_end]]) - #TODO: fill in with what none should be - resampled_dfs = [resample(df, None) for df in dfs] + trimmed_dfs[i] = pd.concat([trimmed_dfs[i], df.loc[[aligned_end]]]) + total_samples = max(tdf.shape[0] for tdf in trimmed_dfs) + resampled_dfs = [resample(df, total_samples / (sample_spacing(df) * len(df))) for df in trimmed_dfs] return resampled_dfs \ No newline at end of file From bba3cc9f841adb565145476bca84144cc802198a Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Tue, 24 Mar 2026 14:12:03 -0400 Subject: [PATCH 04/19] added comments to explain logic --- endaq/calc/utils.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 05825497..604dd25b 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -480,37 +480,47 @@ def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: aligned_start = max([df.index[0] for df in dfs]) aligned_end = min([df.index[-1] for df in dfs]) - if aligned_start > aligned_end: - raise Exception() - left_idx = [bisect.bisect_right(df.index, aligned_start) - 1 for df in dfs] #the most left point **in bound** - right_idx = [bisect.bisect_left(df.index, aligned_end) for df in dfs] #the first right point oob + if aligned_start >= aligned_end: + raise ValueError("No range of time shared between dataframes") + left_idx = [bisect.bisect_right(df.index, aligned_start) - 1 for df in dfs] #the most left point in bound + right_idx = [bisect.bisect_left(df.index, aligned_end) for df in dfs] #the first right point out of bounds + #removes the start / end points trimmed_dfs = [dfs[i][left_idx[i] + 1: right_idx[i] - 1] for i in range(len(dfs))] - + for i, (df, l_idx) in enumerate(zip(dfs, left_idx)): + #if the original timestamp is too early if df.index[l_idx] != aligned_start: + #change in time and acceleration dt = (df.index[l_idx] - df.index[l_idx - 1]).total_seconds() da = (df.iloc[l_idx] - df.iloc[l_idx - 1]) / dt new_dt = (aligned_start - df.index[l_idx]).total_seconds() + #compute the new point new_point = df.iloc[l_idx - 1] + new_dt * da + #and add it back to the dataframe trimmed_dfs[i] = pd.concat([ pd.DataFrame([new_point], index= [aligned_start]), trimmed_dfs[i] ]) + #in the case that the data is already in the correct point, add it back in else: trimmed_dfs[i] = pd.concat([df.loc[[aligned_start]], trimmed_dfs[i]]) - + + #repeating the steps above, with slight indexing differences to accomodate the right index for i, (df, r_idx) in enumerate(zip(dfs, right_idx)): if df.index[r_idx] != aligned_end: dt = (df.index[r_idx] - df.index[r_idx - 1]).total_seconds() da = (df.iloc[r_idx] - df.iloc[r_idx - 1]) / dt new_dt = (aligned_end - (df.index[r_idx - 1])).total_seconds() + new_point = df.iloc[r_idx - 1] + new_dt * da trimmed_dfs[i] = pd.concat([ trimmed_dfs[i], - pd.DataFrame([df.iloc[r_idx - 1] + new_dt * da], index = [aligned_end]) + pd.DataFrame([new_point], index = [aligned_end]) ]) else: trimmed_dfs[i] = pd.concat([trimmed_dfs[i], df.loc[[aligned_end]]]) + + #resamples the data to the dataframe with the most points available total_samples = max(tdf.shape[0] for tdf in trimmed_dfs) resampled_dfs = [resample(df, total_samples / (sample_spacing(df) * len(df))) for df in trimmed_dfs] - return resampled_dfs \ No newline at end of file + return resampled_dfs From cdd190354127d83924fb0b0ac494145c93586816 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Tue, 24 Mar 2026 14:13:03 -0400 Subject: [PATCH 05/19] moved method from filters to info, created test methods --- endaq/calc/filters.py | 125 +-------------------------- endaq/ide/info.py | 191 ++++++++++++++++++++++++++++++++++++++++- tests/ide/test_info.py | 11 +++ 3 files changed, 204 insertions(+), 123 deletions(-) diff --git a/endaq/calc/filters.py b/endaq/calc/filters.py index 80ba8649..9c1cac3e 100644 --- a/endaq/calc/filters.py +++ b/endaq/calc/filters.py @@ -2,13 +2,16 @@ from typing import Optional, Union, Tuple, List import functools +from itertools import batched +import re import pandas as pd import numpy as np import scipy.signal -import re +from collections import namedtuple from endaq.calc import utils +from endaq.ide import get_channels, ACCELERATION def _get_filter_frequencies_type(low_cutoff, high_cutoff): @@ -344,126 +347,6 @@ def ellip( return df - -def refine_acceleration(dfs: List[pd.DataFrame], part_number: str) -> pd.DataFrame: - """ - Generates more accurate acceleration data by combining the two acceleration channels on - an enDAQ device. This is done by filtering out inaccurate frequencies based off of known - frequency curves, and averaging out data based on noise values. - - :param dfs: 2 panda dataframes with DateTimeIndex, converted by `endaq.ide.to_pandas`. - Any non X / Y / Z acceleration channels will be dropped. The first dataframe should be - the main accelerometer (channel 8), and the second should be the secondary accelerometer, - (channel 32 or channel 80). - :param part_number: a string of the product name, that (for a doc `ds`) can be found by - `ds.recoderInfo['PartNumber']`. - - :return: a pandas Dataframe with the flattened acceleration data. - """ - #removing non X/Y/Z acceleration channels - for idx in range(len(dfs)): - df = dfs[idx] - df = df[df.columns[ - list(map(lambda x: x[0] in ['X', 'Y', 'Z'], - df.columns))]].copy() - for col in df.columns: - df[col] = df[col] - np.mean(df[col]) - dfs[idx] = df - - - aligned_dfs = utils.align_dataframes(dfs) - sensors = re.findall(r"[E|D|R]\d+", part_number) - if len(dfs) != 2 or len(dfs) != len(sensors): - raise Exception("there needs to be exactly two dataframes and the part " \ - "number needs to contain exactly two sensor ids") - - #TODO: 0 mean everything ... easy way or the easy way. - sensor_info = [_sensor_info(sensor) for sensor in sensors] - noises = [si['noise'] for si in sensor_info] - bounds = [(si['low_cutoff'], si['high_cutoff']) for si in sensor_info] - dfs = [butterworth(df, low_cutoff= l_bound, high_cutoff= r_bound) - for (df, (l_bound, r_bound)) in zip(aligned_dfs, bounds)] - for df in dfs: df.columns = ['X', 'Y', 'Z'] - - overlap = (max([bound[0] for bound in bounds]), min([bound[1] for bound in bounds])) - - hz_overlap_dfs = [butterworth(df, low_cutoff=overlap[0], high_cutoff=overlap[1]) - for df in dfs] - - averaged_df = _weighted_avg(hz_overlap_dfs, noises) - hz_isolated_dfs = [] - for df, (l_bound, r_bound) in zip(dfs, bounds): - isolated_df = df - if l_bound == overlap[0]: butterworth(isolated_df, high_cutoff= l_bound) - if r_bound == overlap[1]: butterworth(isolated_df, low_cutoff= r_bound) - hz_isolated_dfs.append(isolated_df) - - return sum(hz_isolated_dfs + [averaged_df]) #+ is joining lists, not adding numbers - -def _weighted_avg(dfs: List[pd.DataFrame], noise: List[float]): - """ - Combines multiple dataframe into one by applying a weighted average base on noise in a linear-inverse fashion. - For sensors A and B, if the noise ratio is 2:1, the weighing ratio will be 1:2. Note that a value of 0 is - considered in the weighting. - - :param dfs: the dataframes to normalize. It is assumed that all dataframes have the same indecies. - :param noise: The noise value to the index-respective dataframe. Noise will be normalized. - - :return: a single dataframe with the weighted values - """ - if len(dfs) != len(noise) or len(dfs) == 0: - raise Exception("dataframes and noise need to have equal, non zero number of elements") - #normalize the weightings. If there is a weighting with 0 noise, return that instead - if 0 in noise: - return dfs[noise.index(0)] - noise = (np.array(noise) / sum(noise))[::-1] - - return pd.DataFrame(sum([n * d for n, d in zip(noise, dfs)]), - columns = dfs[0].columns, index = dfs[0].index) - - -def _sensor_info(sensor: str) -> dict: - """ - creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely - - - sensor_type: Literal["D", "E", "R"]. Indicates if the sensor is digital, piezoelectric, - or piezoresistive - - rating: the g rating of the sensor, or the maximum it can read while accurate - - noise: float. Indicates the noise of the sensor when the response curve is flat - - low_cutoff: the lowest Hz where the response curve is flat. - - high_cutoff: the highest Hz where the response curve is flat. - - :return: a dictionary with the information stated above - """ - sensor_type = sensor[0] - sensor_rating = int(sensor[1:]) - - match sensor_type: - case "E": - low = 10 - high = sensor_rating / 5 - noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[sensor_rating] - case "D": - try: - low, high = {16: (1,300), 40: (1, 100)}[sensor_rating] - noise = {16: 4E-3, 40: 8E-5}[sensor_rating] - except KeyError: - raise Exception(f"rating {sensor_rating} not supported for digital IMUs") - case "R": - low = 1 - high = sensor_rating / 5 - noise = {100: 3E-3, 500: 1.5E-2, 2000: 0.06} - case _: - raise Exception(f"Sensor type {sensor_type} not recognized, should be one of E, D, R.") - - return { - "sensor_type": sensor_type, - "noise": noise, - "rating": sensor_rating, - "low_cutoff": low, - "high_cutoff": high, - } - def _fftnoise(f): """ Generate time series noise for a given range of frequencies with random phase using ifft. diff --git a/endaq/ide/info.py b/endaq/ide/info.py index 94380d80..b046ee18 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -4,7 +4,7 @@ from __future__ import annotations import typing -from collections import defaultdict +from collections import defaultdict, namedtuple import datetime import dateutil.tz import warnings @@ -13,16 +13,19 @@ import pandas as pd import pandas.io.formats.style import idelib.dataset +import re -from .measurement import MeasurementType, ANY, get_channels +from .measurement import MeasurementType, ANY, get_channels, ACCELERATION from .files import get_doc from .util import parse_time +from endaq.calc import utils, filters __all__ = [ "get_channel_table", "to_pandas", "get_primary_sensor_data", + "get_unified_acceleration" ] @@ -433,3 +436,187 @@ def get_primary_sensor_data( #Return only the subchannels with right units return data[channels.name] + +# ============================================================================ +# +# ============================================================================ +def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: + """ + Retrieves a more accurate acceleration channel by combining the + frequencies where each of the accelerometers are the more accurate, and filtering frequencies + out of range. + + Note that this method uses :py:func:`calc.utils.resample()`, which can cause artifacts + at the start or end of the data due to a periodic assumption. If important data is recorded + within those bounds, it is not recommended to use this method. + + :param doc: An open `Dataset` object, see :py:func:`~endaq.ide.get_doc()` + for more. + + :return: a pandas Dataframe with the flattened acceleration data. Output channel names + are [X, Y, Z]. + """ + + acceleration_channels = get_channels(doc, ACCELERATION, False) + dfs = [] + s_ratings = [] + for ch in acceleration_channels: + dfs.append(to_pandas(ch)) + sensor = ch[0].sensor.name + kwords = sensor.split(" ") + if kwords[0].startswith("ADXL"): + r = {"ADXL355": 8, + "ADXL357": 40, + "ADXL345": 16, + "ADXL375": 200 + }[kwords[0]] + s_ratings.append(("DC", r)) + else: + s_ratings.append((kwords[1], ch.transform(0, 65535)[1])) + original_sample_rate = np.array([utils.sample_spacing(chan) for chan in dfs]) + cleaned_dfs = [] + for idx in range(len(dfs)): + df = dfs[idx].copy() + #drop non-axes channels + df = df[[col for col in df.columns if col[0] in 'XYZ']] + #0 mean all data + for col in df.columns: + df[col] = df[col] - np.mean(df[col]) + cleaned_dfs.append(df) + + aligned_dfs = utils.align_dataframes(cleaned_dfs) + + sensor_info = [_sensor_info(sensor, 1 / osr) for sensor, osr in zip(s_ratings, original_sample_rate)] + noises = np.array([si['noise'] for si in sensor_info]) + bounds = [(si['low_cutoff'], si['high_cutoff']) for si in sensor_info] + dfs = [filters.butterworth(df, low_cutoff= l_bound, high_cutoff= r_bound) + for (df, (l_bound, r_bound)) in zip(aligned_dfs, bounds)] + for df in dfs: df.columns = ['X', 'Y', 'Z'] + + hz_overlaps = _find_all_overlaps(bounds = bounds) + averaged_dfs = [] + for k, v in hz_overlaps.items(): + cur_overlap_dfs = [dfs[v_i] for v_i in v] + cur_noises = noises[v] + hz_overlap_dfs = [filters.butterworth(df, low_cutoff=k[0], high_cutoff=k[1]) + for df in cur_overlap_dfs] + bound_avgd_df = _weighted_avg(hz_overlap_dfs, cur_noises) + averaged_dfs.append(bound_avgd_df) + + + return sum(averaged_dfs) + +def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, int], typing.List[int]]: + """ + finds **all** possible overlaps of a list of start and end bounds. if an end bound and a start + bound share the same value, it is not considered overlapping. + + :param bounds: a list of each start and end bounds, where the end bound is strictly greater + than the starting bounds. + + :return: a dictionary where the keys are the bounds for every overlap, and the values are + the indices that belong in each bound, respective to :py:param:`bounds` + """ + + if len(bounds) == 0: + return {} + #labeling the components + lblHz = namedtuple("labeledTuple", ("idx", "Hz")) + labels = range(len(bounds)) + + lower_bounds = list(map(lblHz, labels, [b[0] for b in bounds])) + upper_bounds = list(map(lblHz, labels, [b[1] for b in bounds])) + bounds_sorted = sorted(lower_bounds + upper_bounds, key= lambda b: b.Hz) + + #we are using np arrays here as a pseudo-way of preventing mutability + open_intervals: typing.List[lblHz.idx] = np.array([bounds_sorted[0].idx]) #more specifically, lblHz.idx + closed_intervals: dict[tuple[int, int], typing.List[int]] = {} + + left_bound = bounds_sorted[0].Hz + bounds_sorted = bounds_sorted[1:] + + for point in bounds_sorted: + if left_bound != point.Hz: + closed_intervals[left_bound, point.Hz] = open_intervals + left_bound = point.Hz + if point.idx in open_intervals: + open_intervals = np.setdiff1d(open_intervals, [point.idx]) #there must be a better way + else: + open_intervals = np.append(open_intervals, point.idx) + return closed_intervals + + +def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): + """ + Combines multiple dataframe into one by applying a weighted average base on noise in a linear-inverse fashion. + For sensors A and B, if the noise ratio is 2:1, the weighing ratio will be 1:2. Note that a value of 0 is + considered in the weighting. + + :param dfs: the dataframes to normalize. It is assumed that all dataframes have + the same DateTimeIndex values. + :param noise: The noise value to the index-respective dataframe. Noise will be normalized. + + :return: a single dataframe with the weighted values + """ + if len(dfs) != len(noise) or len(dfs) == 0: + raise ValueError("dataframes and noise need to have equal, non zero number of elements") + #normalize the weightings. If there is a weighting with 0 noise, return that instead + if 0 in noise: + return dfs[noise.index(0)] + noise = (np.array(noise) / sum(noise))[::-1] + + return pd.DataFrame(sum([n * d for n, d in zip(noise, dfs)]), + columns = dfs[0].columns, index = dfs[0].index) + + +def _sensor_info(s_rating: tuple[str, int], sample_rate: float) -> dict: + """ + creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely + + - sensor_type: Literal["D", "E", "R"]. Indicates if the sensor is digital, piezoelectric, + or piezoresistive + - rating: the g rating of the sensor, or the maximum it can read while accurate + - noise: float. Indicates the noise of the sensor when the response curve is flat + - low_cutoff: the lowest Hz where the response curve is flat. + - high_cutoff: the highest Hz where the response curve is flat. + + :param: s_rating: a tuple consisting of (sensor type, rating) + :param sample_rate: the sample rate which the sensor is sensing at, used to compute + upper bounds in piezoresistive / piezoelectic. + + :return: a dictionary with the information stated above + """ + sensor_type = s_rating[0] + sensor_rating = s_rating[1] + + match sensor_type: + case "PE": + low = 10 + high = int(sample_rate / 5) + try: + noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[sensor_rating] + except KeyError: + raise Exception(f"rating {sensor_rating} not supported for Piezoelectric sensors") + case "DC": + try: + low, high = {16: (1,300), 40: (1, 100)}[sensor_rating] + noise = {16: 4E-3, 40: 8E-5}[sensor_rating] + except KeyError: + raise Exception(f"rating {sensor_rating} not supported for digital IMUs") + case "PR": + low = 1 + high = int(sample_rate / 5) + try: + noise = {100: 3E-3, 500: 1.5E-2, 2000: 0.06}[sensor_rating] + except: + raise Exception(f"rating {sensor_rating} not supported for Piezoresistve sensors") + case _: + raise Exception(f"Sensor type {sensor_type} not recognized, should be one of PE, DC, PR.") + + return { + "sensor_type": sensor_type, + "noise": noise, + "rating": sensor_rating, + "low_cutoff": low, + "high_cutoff": high, + } \ No newline at end of file diff --git a/tests/ide/test_info.py b/tests/ide/test_info.py index f68758e8..fb2c658a 100644 --- a/tests/ide/test_info.py +++ b/tests/ide/test_info.py @@ -204,3 +204,14 @@ def test_to_pandas_tz(test_IDE): if __name__ == '__main__': unittest.main() + +def test_get_unified_acceleration(test_IDE): + """ + Tests that the start and end stamps are correct, and + that all data "exists" (no NaN values) + """ + accel_channels = info.get_channels(test_IDE, 'accel', False) + accel = info.get_unified_acceleration(test_IDE) + assert accel.notnull().all().all() + assert accel.index[0] == max([ac.index[0] for ac in accel_channels]) + assert accel.index[-1] == min([ac.index[-1] for ac in accel_channels]) \ No newline at end of file From 4a9dae403253aac3339572c3581543a525a1aa42 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Wed, 25 Mar 2026 11:30:06 -0400 Subject: [PATCH 06/19] working impl, few hacks to remove before PR --- endaq/calc/utils.py | 38 +++++++++++----- endaq/ide/info.py | 106 +++++++++++++++++++++++++------------------- 2 files changed, 86 insertions(+), 58 deletions(-) diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 604dd25b..567fe81d 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -130,7 +130,7 @@ def to_dB( } -def resample(df: pd.DataFrame, sample_rate: Optional[float] = None) -> pd.DataFrame: +def resample(df: pd.DataFrame, sample_rate: Optional[float] = None, num_samples = None) -> pd.DataFrame: """ Resample a dataframe to a desired sample rate (in Hz) @@ -145,16 +145,20 @@ def resample(df: pd.DataFrame, sample_rate: Optional[float] = None) -> pd.DataFr else: dt = sample_spacing(df) num_samples_after_resampling = int(dt * len(df) * sample_rate) - + #HACK : extra parameter was added. If this comment is present, it shouldn't be + #pushed to dev + if num_samples is not None: + num_samples_after_resampling = num_samples resampled_data, resampled_time = scipy.signal.resample( df, num_samples_after_resampling, t=df.index.values.astype(np.float64), ) - resampled_time = pd.date_range( - df.iloc[0].name, df.iloc[-1].name, - periods=num_samples_after_resampling, - ) + if resampled_time[0] != df.index[0] or resampled_time[-1] != df.index[-1]: + resampled_time = pd.date_range( + df.index[0], df.index[-1], + periods=num_samples_after_resampling, + ) # Check for datetimes, if so localize if 'datetime' in str(df.index.dtype): @@ -467,11 +471,7 @@ def to_altitude(df: pd.DataFrame, def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: """ Resamples the given dataframes to all be equal-sized with resampled uniform timestamps. - Any timestamps outside of the shared range will be dropped.
- **NOTE**: at the time of writing this method, there is an issue with :py:func:`scipy.signal.resample()`, - where some dataframes may be returned squished. If at least one of the two dataframes contain the - correct timestamps, those timestamps will be used. Otherwise, the timestamps will be manually - computed. + Any timestamps outside of the shared range will be dropped. :param dfs: a List of dataframes with DateTimeIndex to align. @@ -522,5 +522,19 @@ def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: #resamples the data to the dataframe with the most points available total_samples = max(tdf.shape[0] for tdf in trimmed_dfs) - resampled_dfs = [resample(df, total_samples / (sample_spacing(df) * len(df))) for df in trimmed_dfs] + resampled_dfs = [resample(df, total_samples / (sample_spacing(df) * len(df)), total_samples) for df in trimmed_dfs] + + """ + In the current implementation of scipy's resample, there can be some inconsistent rounding point + when creating the datetimes. For this reason, we will find one that meets spec (correct start + and end points) and use that for all. + """ + datepoints = None + for df in resampled_dfs: + if df.index[0] == aligned_start and df.index[-1] == aligned_end: + datepoints = df.index + break + if datepoints is None: + raise Exception("resampling error, timestamps incosistent with inputs") + for df in resampled_dfs: df.index = datepoints return resampled_dfs diff --git a/endaq/ide/info.py b/endaq/ide/info.py index b046ee18..df47266a 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -444,7 +444,7 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: """ Retrieves a more accurate acceleration channel by combining the frequencies where each of the accelerometers are the more accurate, and filtering frequencies - out of range. + out of range. Note that this method uses :py:func:`calc.utils.resample()`, which can cause artifacts at the start or end of the data due to a periodic assumption. If important data is recorded @@ -457,23 +457,14 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: are [X, Y, Z]. """ - acceleration_channels = get_channels(doc, ACCELERATION, False) - dfs = [] - s_ratings = [] - for ch in acceleration_channels: - dfs.append(to_pandas(ch)) - sensor = ch[0].sensor.name - kwords = sensor.split(" ") - if kwords[0].startswith("ADXL"): - r = {"ADXL355": 8, - "ADXL357": 40, - "ADXL345": 16, - "ADXL375": 200 - }[kwords[0]] - s_ratings.append(("DC", r)) - else: - s_ratings.append((kwords[1], ch.transform(0, 65535)[1])) - original_sample_rate = np.array([utils.sample_spacing(chan) for chan in dfs]) + #HACK : remove this bypass for impl + if isinstance(doc, list): + acceleration_channels = sum([get_channels(d, ACCELERATION, False) for d in doc], []) + else: + acceleration_channels = get_channels(doc, ACCELERATION, False) + + dfs = [to_pandas(ch) for ch in acceleration_channels] + cleaned_dfs = [] for idx in range(len(dfs)): df = dfs[idx].copy() @@ -486,15 +477,19 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: aligned_dfs = utils.align_dataframes(cleaned_dfs) - sensor_info = [_sensor_info(sensor, 1 / osr) for sensor, osr in zip(s_ratings, original_sample_rate)] + sensor_info = [_sensor_info(ch) for ch in acceleration_channels] noises = np.array([si['noise'] for si in sensor_info]) bounds = [(si['low_cutoff'], si['high_cutoff']) for si in sensor_info] + #filters out the frequencies that the sensors can not accurately detect dfs = [filters.butterworth(df, low_cutoff= l_bound, high_cutoff= r_bound) for (df, (l_bound, r_bound)) in zip(aligned_dfs, bounds)] for df in dfs: df.columns = ['X', 'Y', 'Z'] hz_overlaps = _find_all_overlaps(bounds = bounds) averaged_dfs = [] + + #for each range, the good frequencies are isolated and averaged with the other datasets + #who share the same freuqency range for k, v in hz_overlaps.items(): cur_overlap_dfs = [dfs[v_i] for v_i in v] cur_noises = noises[v] @@ -502,8 +497,7 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: for df in cur_overlap_dfs] bound_avgd_df = _weighted_avg(hz_overlap_dfs, cur_noises) averaged_dfs.append(bound_avgd_df) - - + return sum(averaged_dfs) def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, int], typing.List[int]]: @@ -528,8 +522,7 @@ def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, upper_bounds = list(map(lblHz, labels, [b[1] for b in bounds])) bounds_sorted = sorted(lower_bounds + upper_bounds, key= lambda b: b.Hz) - #we are using np arrays here as a pseudo-way of preventing mutability - open_intervals: typing.List[lblHz.idx] = np.array([bounds_sorted[0].idx]) #more specifically, lblHz.idx + open_intervals: typing.List[lblHz.idx] = np.array([bounds_sorted[0].idx]) closed_intervals: dict[tuple[int, int], typing.List[int]] = {} left_bound = bounds_sorted[0].Hz @@ -540,7 +533,7 @@ def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, closed_intervals[left_bound, point.Hz] = open_intervals left_bound = point.Hz if point.idx in open_intervals: - open_intervals = np.setdiff1d(open_intervals, [point.idx]) #there must be a better way + open_intervals = np.setdiff1d(open_intervals, [point.idx]) else: open_intervals = np.append(open_intervals, point.idx) return closed_intervals @@ -563,18 +556,26 @@ def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): #normalize the weightings. If there is a weighting with 0 noise, return that instead if 0 in noise: return dfs[noise.index(0)] - noise = (np.array(noise) / sum(noise))[::-1] + noise_normalized = np.array(noise) / sum(noise) + + #formula for the inverse is for array [a0, a1, ..., an], inverse is + # [a1a2...an, a0a2...an, a1a2...an] + new_noise = 1 / noise_normalized + for n in noise_normalized: + new_noise *= n - return pd.DataFrame(sum([n * d for n, d in zip(noise, dfs)]), + new_noise = new_noise / sum(new_noise) + #in the case that len(dfs) == 1, the noise is 1 and the original is returned + return pd.DataFrame(sum([n * d for n, d in zip(new_noise, dfs)]), columns = dfs[0].columns, index = dfs[0].index) - -def _sensor_info(s_rating: tuple[str, int], sample_rate: float) -> dict: +#TODOL figure out signature for ch +def _sensor_info(ch: any) -> dict: """ creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely - - sensor_type: Literal["D", "E", "R"]. Indicates if the sensor is digital, piezoelectric, - or piezoresistive + - sensor_type: Literal["DC", "PE", "PR"]. Indicates if the sensor is digital, piezoelectric, + or piezoresistive respectively - rating: the g rating of the sensor, or the maximum it can read while accurate - noise: float. Indicates the noise of the sensor when the response curve is flat - low_cutoff: the lowest Hz where the response curve is flat. @@ -582,41 +583,54 @@ def _sensor_info(s_rating: tuple[str, int], sample_rate: float) -> dict: :param: s_rating: a tuple consisting of (sensor type, rating) :param sample_rate: the sample rate which the sensor is sensing at, used to compute - upper bounds in piezoresistive / piezoelectic. + upper bounds in analog sensors :return: a dictionary with the information stated above """ - sensor_type = s_rating[0] - sensor_rating = s_rating[1] + sample_rate = 1 / utils.sample_spacing(to_pandas(ch)) + sensor = ch[0].sensor.name + kwords = sensor.split(" ") + if kwords[0].startswith("ADXL"): + rating = int(kwords[1][:-1]) + s_type = "DC" + else: + rating = ch.transform(0, 65535)[1] + s_type = kwords[1] - match sensor_type: + match s_type: case "PE": low = 10 high = int(sample_rate / 5) try: - noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[sensor_rating] + noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[rating] except KeyError: - raise Exception(f"rating {sensor_rating} not supported for Piezoelectric sensors") + raise Exception(f"rating {rating} not supported for Piezoelectric sensors") case "DC": try: - low, high = {16: (1,300), 40: (1, 100)}[sensor_rating] - noise = {16: 4E-3, 40: 8E-5}[sensor_rating] + low, high = {16: (1,300), 40: (1, 100)}[rating] + noise = {16: 4E-3, 40: 8E-5}[rating] except KeyError: - raise Exception(f"rating {sensor_rating} not supported for digital IMUs") + raise Exception(f"rating {rating} not supported for digital IMUs") case "PR": low = 1 high = int(sample_rate / 5) - try: - noise = {100: 3E-3, 500: 1.5E-2, 2000: 0.06}[sensor_rating] - except: - raise Exception(f"rating {sensor_rating} not supported for Piezoresistve sensors") + #PR has built in resistance, and needs to be accounted for + #equation within 33% tolerance, and extended to the nearest 50 just in case + if 50 <= rating or rating <= 150: + noise = 3E-3 + elif 350 <= rating or rating <= 700: + noise = 1.5E-2 + elif 1450 <= rating <= 2750: + noise = 6E-2 + else: + raise Exception(f"rating {rating} not supported for Piezoresistve sensors") case _: - raise Exception(f"Sensor type {sensor_type} not recognized, should be one of PE, DC, PR.") + raise Exception(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") return { - "sensor_type": sensor_type, + "sensor_type": s_type, "noise": noise, - "rating": sensor_rating, + "rating": rating, "low_cutoff": low, "high_cutoff": high, } \ No newline at end of file From c01d1801e99435b3767c065eb35d41c9a83cc765 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Thu, 26 Mar 2026 09:43:27 -0400 Subject: [PATCH 07/19] updated signature of resample and adjusted docs for PR --- endaq/calc/utils.py | 36 +++++++++++++++++++++++------------- endaq/ide/info.py | 28 +++++++++++----------------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 567fe81d..f30e836a 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -130,30 +130,40 @@ def to_dB( } -def resample(df: pd.DataFrame, sample_rate: Optional[float] = None, num_samples = None) -> pd.DataFrame: +def resample( + df: pd.DataFrame, + sample_rate: Optional[float] = None, + num_samples: Optional[int] = None + ) -> pd.DataFrame: """ - Resample a dataframe to a desired sample rate (in Hz) - + Resample a dataframe to a desired sample rate (in Hz) or a desired number of points. + Note that :param:`sample_rate` and :param:`num_samples` are mutually exclusive. If + neither of sample_rate or num_samples is suplied, it will use the same sample_rate + as it currently does, but makes the time stamps uniformly spaced. + :param df: The DataFrame to resample, indexed by time :param sample_rate: The desired sample rate to resample the given data to. - If one is not supplied, then it will use the same as it currently does, but - make the time stamps uniformly spaced + :param num_samples: The desired number of samples to resample the given data to. + :return: The resampled data in a DataFrame """ - if sample_rate is None: - num_samples_after_resampling = len(df) - else: + if sample_rate is not None and num_samples is not None: + raise ValueError("Only one of `sample_rate` and `num_samples` can be set.") + + if sample_rate is not None: dt = sample_spacing(df) num_samples_after_resampling = int(dt * len(df) * sample_rate) - #HACK : extra parameter was added. If this comment is present, it shouldn't be - #pushed to dev - if num_samples is not None: + elif num_samples is not None: num_samples_after_resampling = num_samples + else: + num_samples_after_resampling = len(df) + resampled_data, resampled_time = scipy.signal.resample( df, num_samples_after_resampling, t=df.index.values.astype(np.float64), ) + if resampled_time[0] != df.index[0] or resampled_time[-1] != df.index[-1]: resampled_time = pd.date_range( df.index[0], df.index[-1], @@ -522,7 +532,7 @@ def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: #resamples the data to the dataframe with the most points available total_samples = max(tdf.shape[0] for tdf in trimmed_dfs) - resampled_dfs = [resample(df, total_samples / (sample_spacing(df) * len(df)), total_samples) for df in trimmed_dfs] + resampled_dfs = [resample(df, num_samples = total_samples) for df in trimmed_dfs] """ In the current implementation of scipy's resample, there can be some inconsistent rounding point @@ -531,7 +541,7 @@ def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: """ datepoints = None for df in resampled_dfs: - if df.index[0] == aligned_start and df.index[-1] == aligned_end: + if df.index[0] == aligned_start and df.index[-1] == aligned_end: datepoints = df.index break if datepoints is None: diff --git a/endaq/ide/info.py b/endaq/ide/info.py index df47266a..fd25e9ef 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -442,13 +442,12 @@ def get_primary_sensor_data( # ============================================================================ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: """ - Retrieves a more accurate acceleration channel by combining the - frequencies where each of the accelerometers are the more accurate, and filtering frequencies - out of range. + Computes a more accurate acceleration channel by filtering frequencies + out of range and combining frequencies where multiple accelerometers are accurate. Note that this method uses :py:func:`calc.utils.resample()`, which can cause artifacts - at the start or end of the data due to a periodic assumption. If important data is recorded - within those bounds, it is not recommended to use this method. + at the start or end of the data due to a assumption about signal periodicity. If important + data is recorded within those bounds, it is not recommended to use this method. :param doc: An open `Dataset` object, see :py:func:`~endaq.ide.get_doc()` for more. @@ -457,11 +456,7 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: are [X, Y, Z]. """ - #HACK : remove this bypass for impl - if isinstance(doc, list): - acceleration_channels = sum([get_channels(d, ACCELERATION, False) for d in doc], []) - else: - acceleration_channels = get_channels(doc, ACCELERATION, False) + acceleration_channels = get_channels(doc, ACCELERATION, False) dfs = [to_pandas(ch) for ch in acceleration_channels] @@ -530,7 +525,8 @@ def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, for point in bounds_sorted: if left_bound != point.Hz: - closed_intervals[left_bound, point.Hz] = open_intervals + if open_intervals.size != 0: + closed_intervals[left_bound, point.Hz] = open_intervals left_bound = point.Hz if point.idx in open_intervals: open_intervals = np.setdiff1d(open_intervals, [point.idx]) @@ -559,7 +555,7 @@ def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): noise_normalized = np.array(noise) / sum(noise) #formula for the inverse is for array [a0, a1, ..., an], inverse is - # [a1a2...an, a0a2...an, a1a2...an] + # [a1a2...an, a0a2...an, a1a2...an], and then normalized new_noise = 1 / noise_normalized for n in noise_normalized: new_noise *= n @@ -569,8 +565,7 @@ def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): return pd.DataFrame(sum([n * d for n, d in zip(new_noise, dfs)]), columns = dfs[0].columns, index = dfs[0].index) -#TODOL figure out signature for ch -def _sensor_info(ch: any) -> dict: +def _sensor_info(ch: idelib.dataset.Channel) -> dict: """ creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely @@ -581,9 +576,8 @@ def _sensor_info(ch: any) -> dict: - low_cutoff: the lowest Hz where the response curve is flat. - high_cutoff: the highest Hz where the response curve is flat. - :param: s_rating: a tuple consisting of (sensor type, rating) - :param sample_rate: the sample rate which the sensor is sensing at, used to compute - upper bounds in analog sensors + :param ch: The channel of the dataset to work on. This channel should have a child, which will + be used to extract the data. :return: a dictionary with the information stated above """ From 6e92979f57724f4f600e72916cbd9815510f85a2 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Thu, 26 Mar 2026 10:05:34 -0400 Subject: [PATCH 08/19] updated some sphinx docs, removed unnecessary import --- endaq/calc/utils.py | 2 +- endaq/ide/info.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index f30e836a..42ece8e5 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -137,7 +137,7 @@ def resample( ) -> pd.DataFrame: """ Resample a dataframe to a desired sample rate (in Hz) or a desired number of points. - Note that :param:`sample_rate` and :param:`num_samples` are mutually exclusive. If + Note that ``sample_rate`` and ``num_samples`` are mutually exclusive. If neither of sample_rate or num_samples is suplied, it will use the same sample_rate as it currently does, but makes the time stamps uniformly spaced. diff --git a/endaq/ide/info.py b/endaq/ide/info.py index fd25e9ef..6693d6d7 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -13,7 +13,6 @@ import pandas as pd import pandas.io.formats.style import idelib.dataset -import re from .measurement import MeasurementType, ANY, get_channels, ACCELERATION from .files import get_doc @@ -504,7 +503,7 @@ def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, than the starting bounds. :return: a dictionary where the keys are the bounds for every overlap, and the values are - the indices that belong in each bound, respective to :py:param:`bounds` + the indices that belong in each bound, respective to ``bounds`` """ if len(bounds) == 0: From 3bcb8e86c4e0fd154119517bf009e2b2d99b8738 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Thu, 26 Mar 2026 10:11:31 -0400 Subject: [PATCH 09/19] extra uneeded imports removed --- endaq/calc/filters.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/endaq/calc/filters.py b/endaq/calc/filters.py index 9c1cac3e..a5937085 100644 --- a/endaq/calc/filters.py +++ b/endaq/calc/filters.py @@ -1,17 +1,13 @@ from __future__ import annotations -from typing import Optional, Union, Tuple, List +from typing import Optional, Union, Tuple import functools -from itertools import batched -import re import pandas as pd import numpy as np import scipy.signal -from collections import namedtuple from endaq.calc import utils -from endaq.ide import get_channels, ACCELERATION def _get_filter_frequencies_type(low_cutoff, high_cutoff): From 5d4e6dce61c03c73e3a66b35b08f7f75f69380bd Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Thu, 26 Mar 2026 11:10:19 -0400 Subject: [PATCH 10/19] fixed unit tests issues --- endaq/ide/info.py | 55 +++++++++++++++++++++--------------------- tests/ide/test_info.py | 4 +-- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/endaq/ide/info.py b/endaq/ide/info.py index 6693d6d7..58b10fa5 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -470,10 +470,10 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: cleaned_dfs.append(df) aligned_dfs = utils.align_dataframes(cleaned_dfs) - + aligned_sr = 1 / (aligned_dfs[0].index[1] - aligned_dfs[0].index[0]).total_seconds() sensor_info = [_sensor_info(ch) for ch in acceleration_channels] noises = np.array([si['noise'] for si in sensor_info]) - bounds = [(si['low_cutoff'], si['high_cutoff']) for si in sensor_info] + bounds = [(si['low_cutoff'], min(si['high_cutoff'], int(aligned_sr / 2) - 1)) for si in sensor_info] #filters out the frequencies that the sensors can not accurately detect dfs = [filters.butterworth(df, low_cutoff= l_bound, high_cutoff= r_bound) for (df, (l_bound, r_bound)) in zip(aligned_dfs, bounds)] @@ -590,35 +590,34 @@ def _sensor_info(ch: idelib.dataset.Channel) -> dict: rating = ch.transform(0, 65535)[1] s_type = kwords[1] - match s_type: - case "PE": - low = 10 - high = int(sample_rate / 5) - try: - noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[rating] - except KeyError: - raise Exception(f"rating {rating} not supported for Piezoelectric sensors") - case "DC": + if (s_type == "PE"): + low = 10 + high = int(sample_rate / 5) + try: + noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[rating] + except KeyError: + raise Exception(f"rating {rating} not supported for Piezoelectric sensors") + elif (s_type == "DC"): try: - low, high = {16: (1,300), 40: (1, 100)}[rating] - noise = {16: 4E-3, 40: 8E-5}[rating] + low, high = {8: (1, 150), 16: (1,300), 40: (1, 100)}[rating] + noise = {8: 2E-5, 16: 4E-3, 40: 8E-5}[rating] except KeyError: raise Exception(f"rating {rating} not supported for digital IMUs") - case "PR": - low = 1 - high = int(sample_rate / 5) - #PR has built in resistance, and needs to be accounted for - #equation within 33% tolerance, and extended to the nearest 50 just in case - if 50 <= rating or rating <= 150: - noise = 3E-3 - elif 350 <= rating or rating <= 700: - noise = 1.5E-2 - elif 1450 <= rating <= 2750: - noise = 6E-2 - else: - raise Exception(f"rating {rating} not supported for Piezoresistve sensors") - case _: - raise Exception(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") + elif (s_type == "PR"): + low = 1 + high = int(sample_rate / 5) + #PR has built in resistance, and needs to be accounted for + #equation within 33% tolerance, and extended to the nearest 50 just in case + if 50 <= rating or rating <= 150: + noise = 3E-3 + elif 350 <= rating or rating <= 700: + noise = 1.5E-2 + elif 1450 <= rating <= 2750: + noise = 6E-2 + else: + raise Exception(f"rating {rating} not supported for Piezoresistve sensors") + else: + raise Exception(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") return { "sensor_type": s_type, diff --git a/tests/ide/test_info.py b/tests/ide/test_info.py index fb2c658a..d9fd4f3c 100644 --- a/tests/ide/test_info.py +++ b/tests/ide/test_info.py @@ -7,7 +7,7 @@ from idelib.importer import importFile import pandas as pd -from endaq.ide import files, info +from endaq.ide import files, info, to_pandas IDE_FILENAME = os.path.join(os.path.dirname(__file__), "test.ide") @@ -210,7 +210,7 @@ def test_get_unified_acceleration(test_IDE): Tests that the start and end stamps are correct, and that all data "exists" (no NaN values) """ - accel_channels = info.get_channels(test_IDE, 'accel', False) + accel_channels = [to_pandas(ch) for ch in info.get_channels(test_IDE, 'accel', False)] accel = info.get_unified_acceleration(test_IDE) assert accel.notnull().all().all() assert accel.index[0] == max([ac.index[0] for ac in accel_channels]) From d98c01df3869a8c2747b20422efeaa73e302cf9c Mon Sep 17 00:00:00 2001 From: stokesMIDE Date: Thu, 26 Mar 2026 11:42:46 -0400 Subject: [PATCH 11/19] Disabled fail-on-warning pytest arguments --- .github/workflows/unit-tests.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 1eae022f..33151445 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -72,9 +72,9 @@ jobs: --cov-report=xml --hypothesis-profile=ci --numprocesses auto - -W error::RuntimeWarning - -W error::UserWarning - -W error::DeprecationWarning + # -W error::RuntimeWarning + # -W error::UserWarning + # -W error::DeprecationWarning - name: Generate Codecov Report uses: codecov/codecov-action@v3 From 2a2972af8e98e500aacb9fd3925738e93f69fd20 Mon Sep 17 00:00:00 2001 From: stokesMIDE Date: Thu, 26 Mar 2026 11:48:54 -0400 Subject: [PATCH 12/19] Removed warning-as-error arguments --- .github/workflows/unit-tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 33151445..7069cabf 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -72,9 +72,6 @@ jobs: --cov-report=xml --hypothesis-profile=ci --numprocesses auto - # -W error::RuntimeWarning - # -W error::UserWarning - # -W error::DeprecationWarning - name: Generate Codecov Report uses: codecov/codecov-action@v3 From ee63c9960e0dc2556e07573f9684a397a6b8d2cc Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Thu, 26 Mar 2026 14:43:13 -0400 Subject: [PATCH 13/19] added future_stack = False to accomodate for pandas 3.0.0 --- endaq/batch/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/endaq/batch/core.py b/endaq/batch/core.py index d2a04944..c8cf464e 100644 --- a/endaq/batch/core.py +++ b/endaq/batch/core.py @@ -214,7 +214,7 @@ def _make_peak_windows(ch_data_cache: analyzer.CalcCache, margin_len): if sys.version_info < (3, 9): return aligned_peak_data.stack().stack().reorder_levels(levels) - return aligned_peak_data.stack(future_stack=True).stack().reorder_levels(levels) + return aligned_peak_data.stack(future_stack=True).stack(future_stack = False).reorder_levels(levels) def _make_vc_curves(ch_data_cache: analyzer.CalcCache): From 2f34f59fd19e83aedb7ce29a6e282147b7b3ddcc Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Thu, 26 Mar 2026 17:12:47 -0400 Subject: [PATCH 14/19] added way of getting sensor range, added pin to pandas dependency --- endaq/ide/info.py | 28 ++++++++++++++++++++++++++-- requirements.txt | 2 +- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/endaq/ide/info.py b/endaq/ide/info.py index 58b10fa5..2fb9d7e9 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -564,6 +564,31 @@ def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): return pd.DataFrame(sum([n * d for n, d in zip(new_noise, dfs)]), columns = dfs[0].columns, index = dfs[0].index) +def get_max_sensor_range(ch: idelib.dataset.Channel) -> int: + """ + Gets the g-rating of the sensor used from the given channel. + + :param ch: The channel of the dataset to work on. This channel should have a child, which will + be used to extract the data. + + :return: g-rating. + """ + + sensor = ch[0].sensor.name + kwords = sensor.split(" ") + #DC + if kwords[0].startswith("ADXL"): + return int(kwords[1][:-1]) + elif kwords[1] == "PE": + return ch.transform(0, 65535)[1] + else: + values = np.asarray([100, 500, 2000]) + #finds the value closest to ch.transform(0, 65535)[1]. This is because + #we PR have some resistance and won't by default give the value we want + return values[np.abs(np.asarray(values) - ch.transform(0, 65535)[1]).argmin()] + + + def _sensor_info(ch: idelib.dataset.Channel) -> dict: """ creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely @@ -584,12 +609,11 @@ def _sensor_info(ch: idelib.dataset.Channel) -> dict: sensor = ch[0].sensor.name kwords = sensor.split(" ") if kwords[0].startswith("ADXL"): - rating = int(kwords[1][:-1]) s_type = "DC" else: - rating = ch.transform(0, 65535)[1] s_type = kwords[1] + rating = get_max_sensor_range(ch) if (s_type == "PE"): low = 10 high = int(sample_rate / 5) diff --git a/requirements.txt b/requirements.txt index c6f9a4a8..ce99d810 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ ebmlite>=3.2.0 idelib>=3.2.8 jinja2 numpy>=1.19.5 -pandas>=1.3 +pandas>=1.3, <3.0.0 plotly>=5.3.1 pynmeagps python-dotenv>=0.18.0 From 9dcc735c70f578a613a81d4b9f0901746fa225ff Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Tue, 31 Mar 2026 10:23:18 -0400 Subject: [PATCH 15/19] modifying version specifications for requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ce99d810..31cb1eb3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ ebmlite>=3.2.0 idelib>=3.2.8 jinja2 numpy>=1.19.5 -pandas>=1.3, <3.0.0 +pandas>=1.3, <3.0 plotly>=5.3.1 pynmeagps python-dotenv>=0.18.0 From c4fe01bde7b102cda2e2a7eda7266c5642d255d9 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Wed, 1 Apr 2026 10:17:21 -0400 Subject: [PATCH 16/19] moved methods into from ide/info.py to ide/util.py, added test methods --- endaq/ide/info.py | 93 ++----------------------------------- endaq/ide/util.py | 102 ++++++++++++++++++++++++++++++++++++++++- tests/ide/test_util.py | 42 +++++++++++++++++ 3 files changed, 146 insertions(+), 91 deletions(-) create mode 100644 tests/ide/test_util.py diff --git a/endaq/ide/info.py b/endaq/ide/info.py index 2fb9d7e9..98a94ea0 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -16,8 +16,7 @@ from .measurement import MeasurementType, ANY, get_channels, ACCELERATION from .files import get_doc -from .util import parse_time - +from .util import parse_time, get_accelerometer_info, get_accelerometer_bounds from endaq.calc import utils, filters __all__ = [ @@ -471,7 +470,7 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: aligned_dfs = utils.align_dataframes(cleaned_dfs) aligned_sr = 1 / (aligned_dfs[0].index[1] - aligned_dfs[0].index[0]).total_seconds() - sensor_info = [_sensor_info(ch) for ch in acceleration_channels] + sensor_info = [get_accelerometer_info(ch) for ch in acceleration_channels] noises = np.array([si['noise'] for si in sensor_info]) bounds = [(si['low_cutoff'], min(si['high_cutoff'], int(aligned_sr / 2) - 1)) for si in sensor_info] #filters out the frequencies that the sensors can not accurately detect @@ -563,90 +562,4 @@ def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): #in the case that len(dfs) == 1, the noise is 1 and the original is returned return pd.DataFrame(sum([n * d for n, d in zip(new_noise, dfs)]), columns = dfs[0].columns, index = dfs[0].index) - -def get_max_sensor_range(ch: idelib.dataset.Channel) -> int: - """ - Gets the g-rating of the sensor used from the given channel. - - :param ch: The channel of the dataset to work on. This channel should have a child, which will - be used to extract the data. - - :return: g-rating. - """ - - sensor = ch[0].sensor.name - kwords = sensor.split(" ") - #DC - if kwords[0].startswith("ADXL"): - return int(kwords[1][:-1]) - elif kwords[1] == "PE": - return ch.transform(0, 65535)[1] - else: - values = np.asarray([100, 500, 2000]) - #finds the value closest to ch.transform(0, 65535)[1]. This is because - #we PR have some resistance and won't by default give the value we want - return values[np.abs(np.asarray(values) - ch.transform(0, 65535)[1]).argmin()] - - - -def _sensor_info(ch: idelib.dataset.Channel) -> dict: - """ - creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely - - - sensor_type: Literal["DC", "PE", "PR"]. Indicates if the sensor is digital, piezoelectric, - or piezoresistive respectively - - rating: the g rating of the sensor, or the maximum it can read while accurate - - noise: float. Indicates the noise of the sensor when the response curve is flat - - low_cutoff: the lowest Hz where the response curve is flat. - - high_cutoff: the highest Hz where the response curve is flat. - - :param ch: The channel of the dataset to work on. This channel should have a child, which will - be used to extract the data. - - :return: a dictionary with the information stated above - """ - sample_rate = 1 / utils.sample_spacing(to_pandas(ch)) - sensor = ch[0].sensor.name - kwords = sensor.split(" ") - if kwords[0].startswith("ADXL"): - s_type = "DC" - else: - s_type = kwords[1] - - rating = get_max_sensor_range(ch) - if (s_type == "PE"): - low = 10 - high = int(sample_rate / 5) - try: - noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06}[rating] - except KeyError: - raise Exception(f"rating {rating} not supported for Piezoelectric sensors") - elif (s_type == "DC"): - try: - low, high = {8: (1, 150), 16: (1,300), 40: (1, 100)}[rating] - noise = {8: 2E-5, 16: 4E-3, 40: 8E-5}[rating] - except KeyError: - raise Exception(f"rating {rating} not supported for digital IMUs") - elif (s_type == "PR"): - low = 1 - high = int(sample_rate / 5) - #PR has built in resistance, and needs to be accounted for - #equation within 33% tolerance, and extended to the nearest 50 just in case - if 50 <= rating or rating <= 150: - noise = 3E-3 - elif 350 <= rating or rating <= 700: - noise = 1.5E-2 - elif 1450 <= rating <= 2750: - noise = 6E-2 - else: - raise Exception(f"rating {rating} not supported for Piezoresistve sensors") - else: - raise Exception(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") - - return { - "sensor_type": s_type, - "noise": noise, - "rating": rating, - "low_cutoff": low, - "high_cutoff": high, - } \ No newline at end of file + \ No newline at end of file diff --git a/endaq/ide/util.py b/endaq/ide/util.py index 1aa66e96..044320b9 100644 --- a/endaq/ide/util.py +++ b/endaq/ide/util.py @@ -5,7 +5,12 @@ import string from ebmlite import loadSchema -__all__ = ['parse_time', 'validate'] +from .measurement import ACCELERATION +import idelib.dataset +import numpy as np + + +__all__ = ['parse_time', 'validate', 'get_accelerometer_bounds', 'get_accelerometer_info'] # ============================================================================ @@ -122,4 +127,99 @@ def parse_time(t, datetime_start=None): raise TypeError(f"Unsupported type for parse_time(): {type(t).__name__} ({t!r})") +# ============================================================================ +# +# ============================================================================ + +def get_accelerometer_bounds(ch: idelib.dataset.Channel | idelib.dataset.SubChannel) -> tuple[int, int]: + """ + Gets the g-rating of the sensor used from the given channel. + + :param ch: The channel of the dataset to work on. This channel should have a child, which will + be used to extract the data. + :return: a tuple of g-rating bounds. + """ + ch = ch if isinstance(ch, idelib.dataset.SubChannel) else ch[0] + if not ACCELERATION.match(ch): + raise ValueError("An acceleration channel should be given") + s_name = ch.sensor.name + + if s_name.startswith("ADXL"): + g = {"ADXL345": 16, "ADXL362": 16, + "ADXL357": 40, "ADXL359": 40, + "ADXL355": 8, "ADXL375": 200, + }[s_name.split(" ")[0]] + return (-1 * g, g) + + values = np.asarray([8, 16, 25, 100, 200, 500, 2000, 6000]) + #finds the value closest to ch.transform(0, 65535)[1]. This is because + #we PR have some resistance and won't by default give the value we want, + #and digital sensors do not return perfect values. + g = values[np.abs(np.asarray(values) - ch.transform(0, 65535)[1]).argmin()] + return (-1 * g, g) + +# ============================================================================ +# +# ============================================================================ + +def get_accelerometer_info(ch: idelib.dataset.Channel) -> dict: + """ + creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely + + - sensor_type: Literal["DC", "PE", "PR"]. Indicates if the sensor is digital, piezoelectric, + or piezoresistive respectively + - rating: the g rating of the sensor, or the maximum it can read while accurate + - noise: float. Indicates the noise of the sensor when the response curve is flat + - low_cutoff: the lowest rate at which the sensor is being shaken at where the response curve is flat. + - high_cutoff: the highest rate at which the sensor is being shaken at where the response curve is flat. + + :param ch: The channel of the dataset to work on. This channel should have a child, which will + be used to extract the data. This sensor must + + :return: a dictionary with the information stated above + """ + if not ACCELERATION.match(ch): + raise ValueError("An acceleration channel should be given") + + times = ch.getSession().arraySlice()[0, :] + #same thing as 1/ ((times[1] - times[0]) / 1000000) + sample_rate = 1000000 / (times[1] - times[0]) + sensor = ch[0].sensor.name + kwords = sensor.split(" ") + if kwords[0].startswith("ADXL"): + s_type = "DC" + else: + s_type = kwords[1] + rating = get_accelerometer_bounds(ch)[1] + + if (s_type == "PE"): + low = 10 + high = int(sample_rate / 5) + try: + noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06, 6000: 0.08}[rating] + except KeyError: + raise Exception(f"rating {rating} not supported for Piezoelectric sensors") + elif (s_type == "DC"): + try: + low, high = {8: (1, 150), 16: (1,300), 40: (1, 100)}[rating] + noise = {8: 2E-5, 16: 4E-3, 40: 8E-5}[rating] + except KeyError: + raise Exception(f"rating {rating} not supported for digital IMUs") + elif (s_type == "PR"): + low = 1 + high = int(sample_rate / 5) + try: + noise = {50: 3E-3, 500: 1.5E-2, 2000:6E-2}[rating] + except: + raise Exception(f"rating {rating} not supported for Piezoresistve sensors") + else: + raise Exception(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") + + return { + "sensor_type": s_type, + "noise": noise, + "rating": rating, + "low_cutoff": low, + "high_cutoff": high, + } \ No newline at end of file diff --git a/tests/ide/test_util.py b/tests/ide/test_util.py new file mode 100644 index 00000000..ff5777e3 --- /dev/null +++ b/tests/ide/test_util.py @@ -0,0 +1,42 @@ +from idelib.importer import importFile +import pytest +from endaq.ide.info import get_channels +from endaq.ide.util import get_accelerometer_info +import os + +IDE_FILENAME = os.path.join(os.path.dirname(__file__), "test.ide") + + +@pytest.fixture +def test_IDE(): + with importFile(IDE_FILENAME) as ds: + yield ds + + +def test_get_accelerometer_info(test_IDE): + """ + Tests highlighted by a comment above the assert. + `get_accelerometer_bounds` is also tested in this method. + """ + for accel in get_channels(test_IDE, "accel", False): + + sensor_info = get_accelerometer_info(accel) + l = sensor_info['low_cutoff'] + h = sensor_info['high_cutoff'] + + #lower cutoff is always lower than higher cutoff + assert l < h + #cutoffs are both positive. By transitivity h > 0 if this test passes. + assert l > 0 + #rating is one of the valid existing types + assert sensor_info['rating'] in [8, 16, 25, 100, 200, 500, 2000, 6000] + +def test_incorrect_sensor_get_accelerometer_info(test_IDE): + """ + tests that passing in non accelerometer info results in a fail. + """ + accels = get_channels(test_IDE, "accel", False) + for _, channel in test_IDE.channels.items(): + if channel not in accels: + with pytest.raises(ValueError): + get_accelerometer_info(channel) \ No newline at end of file From 8d5e916bc713e89cc854f0226e88a9f73809636f Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Fri, 3 Apr 2026 10:31:57 -0400 Subject: [PATCH 17/19] updated Exceptions to ValueErrors, updated some typing errors --- endaq/calc/utils.py | 2 +- endaq/ide/info.py | 4 +++- endaq/ide/util.py | 11 ++++++----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 42ece8e5..c17bdf1d 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -545,6 +545,6 @@ def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: datepoints = df.index break if datepoints is None: - raise Exception("resampling error, timestamps incosistent with inputs") + raise ValueError("resampling error, timestamps incosistent with inputs") for df in resampled_dfs: df.index = datepoints return resampled_dfs diff --git a/endaq/ide/info.py b/endaq/ide/info.py index 98a94ea0..f928e1d0 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -493,7 +493,9 @@ def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: return sum(averaged_dfs) -def _find_all_overlaps(bounds: typing.List[tuple[int, int]]) -> dict[tuple[int, int], typing.List[int]]: +def _find_all_overlaps( + bounds: typing.List[typing.Tuple[int, int]] + ) -> typing.Dict[typing.Tuple[int, int], typing.List[int]]: """ finds **all** possible overlaps of a list of start and end bounds. if an end bound and a start bound share the same value, it is not considered overlapping. diff --git a/endaq/ide/util.py b/endaq/ide/util.py index 044320b9..a44ae6af 100644 --- a/endaq/ide/util.py +++ b/endaq/ide/util.py @@ -4,6 +4,7 @@ import datetime import string from ebmlite import loadSchema +from typing import Union from .measurement import ACCELERATION import idelib.dataset @@ -131,7 +132,7 @@ def parse_time(t, datetime_start=None): # # ============================================================================ -def get_accelerometer_bounds(ch: idelib.dataset.Channel | idelib.dataset.SubChannel) -> tuple[int, int]: +def get_accelerometer_bounds(ch: Union[idelib.dataset.Channel, idelib.dataset.SubChannel]) -> tuple[int, int]: """ Gets the g-rating of the sensor used from the given channel. @@ -199,22 +200,22 @@ def get_accelerometer_info(ch: idelib.dataset.Channel) -> dict: try: noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06, 6000: 0.08}[rating] except KeyError: - raise Exception(f"rating {rating} not supported for Piezoelectric sensors") + raise ValueError(f"rating {rating} not supported for Piezoelectric sensors") elif (s_type == "DC"): try: low, high = {8: (1, 150), 16: (1,300), 40: (1, 100)}[rating] noise = {8: 2E-5, 16: 4E-3, 40: 8E-5}[rating] except KeyError: - raise Exception(f"rating {rating} not supported for digital IMUs") + raise ValueError(f"rating {rating} not supported for digital IMUs") elif (s_type == "PR"): low = 1 high = int(sample_rate / 5) try: noise = {50: 3E-3, 500: 1.5E-2, 2000:6E-2}[rating] except: - raise Exception(f"rating {rating} not supported for Piezoresistve sensors") + raise ValueError(f"rating {rating} not supported for Piezoresistve sensors") else: - raise Exception(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") + raise ValueError(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") return { "sensor_type": s_type, From 571a5d3d34ca66c3665bb7938e090b147022d887 Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Fri, 3 Apr 2026 14:34:50 -0400 Subject: [PATCH 18/19] added test methods for get_acelerometer_bounds --- tests/ide/test_util.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/ide/test_util.py b/tests/ide/test_util.py index ff5777e3..f65cf19e 100644 --- a/tests/ide/test_util.py +++ b/tests/ide/test_util.py @@ -1,7 +1,7 @@ from idelib.importer import importFile import pytest from endaq.ide.info import get_channels -from endaq.ide.util import get_accelerometer_info +from endaq.ide.util import get_accelerometer_info, get_accelerometer_bounds import os IDE_FILENAME = os.path.join(os.path.dirname(__file__), "test.ide") @@ -39,4 +39,20 @@ def test_incorrect_sensor_get_accelerometer_info(test_IDE): for _, channel in test_IDE.channels.items(): if channel not in accels: with pytest.raises(ValueError): - get_accelerometer_info(channel) \ No newline at end of file + get_accelerometer_info(channel) + +def test_get_accelerometer_bounds(test_IDE): + """ + Tests that the subchannels g rating matches the parents value, + that all g values are symmetric, and it's one of the g-ratings we + currently offer. + """ + acc_sub_chs = get_channels(test_IDE, "accel", True) + acc_chs = get_channels(test_IDE, "accel", False) + parent_gs = list(map(get_accelerometer_bounds, acc_chs)) + for g in parent_gs: + assert -1 * g[0] == g[1] + assert g[1] in [8, 16, 25, 40, 100, 200, 500, 2000, 6000] + for ch in acc_sub_chs: + expected_g = parent_gs[acc_chs.index(ch.parent)] + assert get_accelerometer_bounds(ch) == expected_g \ No newline at end of file From 1f3afa5234a2d32608a8350cf9b7a0fabea1350c Mon Sep 17 00:00:00 2001 From: Jeffrey S Date: Fri, 3 Apr 2026 14:57:41 -0400 Subject: [PATCH 19/19] added accelerometer bound tests for PE and PR sensors --- tests/ide/test_util.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/tests/ide/test_util.py b/tests/ide/test_util.py index f65cf19e..1f196d09 100644 --- a/tests/ide/test_util.py +++ b/tests/ide/test_util.py @@ -5,13 +5,22 @@ import os IDE_FILENAME = os.path.join(os.path.dirname(__file__), "test.ide") - - +PR_IDE_FILENAME = os.path.join(os.path.dirname(__file__), "../batch/DAQ09576_000001.IDE") +PE_IDE_FILENAME = os.path.join(os.path.dirname(__file__), "../batch/test1.IDE'") @pytest.fixture def test_IDE(): with importFile(IDE_FILENAME) as ds: yield ds +@pytest.fixture +def test_PR_IDE(): + with importFile(IDE_FILENAME) as ds: + yield ds + +@pytest.fixture +def test_PE_IDE(): + with importFile(IDE_FILENAME) as ds: + yield ds def test_get_accelerometer_info(test_IDE): """ @@ -41,18 +50,19 @@ def test_incorrect_sensor_get_accelerometer_info(test_IDE): with pytest.raises(ValueError): get_accelerometer_info(channel) -def test_get_accelerometer_bounds(test_IDE): +def test_get_accelerometer_bounds(test_IDE, test_PE_IDE, test_PR_IDE): """ Tests that the subchannels g rating matches the parents value, that all g values are symmetric, and it's one of the g-ratings we currently offer. """ - acc_sub_chs = get_channels(test_IDE, "accel", True) - acc_chs = get_channels(test_IDE, "accel", False) - parent_gs = list(map(get_accelerometer_bounds, acc_chs)) - for g in parent_gs: - assert -1 * g[0] == g[1] - assert g[1] in [8, 16, 25, 40, 100, 200, 500, 2000, 6000] - for ch in acc_sub_chs: - expected_g = parent_gs[acc_chs.index(ch.parent)] - assert get_accelerometer_bounds(ch) == expected_g \ No newline at end of file + for ide in [test_IDE, test_PE_IDE, test_PR_IDE]: + acc_sub_chs = get_channels(ide, "accel", True) + acc_chs = get_channels(ide, "accel", False) + parent_gs = list(map(get_accelerometer_bounds, acc_chs)) + for g in parent_gs: + assert -1 * g[0] == g[1] + assert g[1] in [8, 16, 25, 40, 100, 200, 500, 2000, 6000] + for ch in acc_sub_chs: + expected_g = parent_gs[acc_chs.index(ch.parent)] + assert get_accelerometer_bounds(ch) == expected_g \ No newline at end of file