diff --git a/endaq/batch/core.py b/endaq/batch/core.py index d2a0494..c8cf464 100644 --- a/endaq/batch/core.py +++ b/endaq/batch/core.py @@ -214,7 +214,7 @@ def _make_peak_windows(ch_data_cache: analyzer.CalcCache, margin_len): if sys.version_info < (3, 9): return aligned_peak_data.stack().stack().reorder_levels(levels) - return aligned_peak_data.stack(future_stack=True).stack().reorder_levels(levels) + return aligned_peak_data.stack(future_stack=True).stack(future_stack = False).reorder_levels(levels) def _make_vc_curves(ch_data_cache: analyzer.CalcCache): diff --git a/endaq/calc/filters.py b/endaq/calc/filters.py index 5882976..a593708 100644 --- a/endaq/calc/filters.py +++ b/endaq/calc/filters.py @@ -343,7 +343,6 @@ def ellip( return df - def _fftnoise(f): """ Generate time series noise for a given range of frequencies with random phase using ifft. diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 6bde969..c17bdf1 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -2,8 +2,9 @@ from __future__ import annotations +import bisect import typing -from typing import Optional, Union, Literal +from typing import List, Optional, Union, Literal import warnings import numpy as np @@ -129,31 +130,45 @@ def to_dB( } -def resample(df: pd.DataFrame, sample_rate: Optional[float] = None) -> pd.DataFrame: +def resample( + df: pd.DataFrame, + sample_rate: Optional[float] = None, + num_samples: Optional[int] = None + ) -> pd.DataFrame: """ - Resample a dataframe to a desired sample rate (in Hz) - + Resample a dataframe to a desired sample rate (in Hz) or a desired number of points. + Note that ``sample_rate`` and ``num_samples`` are mutually exclusive. If + neither of sample_rate or num_samples is suplied, it will use the same sample_rate + as it currently does, but makes the time stamps uniformly spaced. + :param df: The DataFrame to resample, indexed by time :param sample_rate: The desired sample rate to resample the given data to. - If one is not supplied, then it will use the same as it currently does, but - make the time stamps uniformly spaced + :param num_samples: The desired number of samples to resample the given data to. + :return: The resampled data in a DataFrame """ - if sample_rate is None: - num_samples_after_resampling = len(df) - else: + if sample_rate is not None and num_samples is not None: + raise ValueError("Only one of `sample_rate` and `num_samples` can be set.") + + if sample_rate is not None: dt = sample_spacing(df) num_samples_after_resampling = int(dt * len(df) * sample_rate) + elif num_samples is not None: + num_samples_after_resampling = num_samples + else: + num_samples_after_resampling = len(df) resampled_data, resampled_time = scipy.signal.resample( df, num_samples_after_resampling, t=df.index.values.astype(np.float64), ) - resampled_time = pd.date_range( - df.iloc[0].name, df.iloc[-1].name, - periods=num_samples_after_resampling, - ) + + if resampled_time[0] != df.index[0] or resampled_time[-1] != df.index[-1]: + resampled_time = pd.date_range( + df.index[0], df.index[-1], + periods=num_samples_after_resampling, + ) # Check for datetimes, if so localize if 'datetime' in str(df.index.dtype): @@ -462,3 +477,74 @@ def to_altitude(df: pd.DataFrame, # Return DataFrame with New Altitude Column return alt_df + +def align_dataframes(dfs: List[pd.DataFrame]) -> List[pd.DataFrame]: + """ + Resamples the given dataframes to all be equal-sized with resampled uniform timestamps. + Any timestamps outside of the shared range will be dropped. + + :param dfs: a List of dataframes with DateTimeIndex to align. + + :return: a list of dataframes in the same order that they were inputted in. + """ + aligned_start = max([df.index[0] for df in dfs]) + aligned_end = min([df.index[-1] for df in dfs]) + + if aligned_start >= aligned_end: + raise ValueError("No range of time shared between dataframes") + left_idx = [bisect.bisect_right(df.index, aligned_start) - 1 for df in dfs] #the most left point in bound + right_idx = [bisect.bisect_left(df.index, aligned_end) for df in dfs] #the first right point out of bounds + + #removes the start / end points + trimmed_dfs = [dfs[i][left_idx[i] + 1: right_idx[i] - 1] for i in range(len(dfs))] + + for i, (df, l_idx) in enumerate(zip(dfs, left_idx)): + #if the original timestamp is too early + if df.index[l_idx] != aligned_start: + #change in time and acceleration + dt = (df.index[l_idx] - df.index[l_idx - 1]).total_seconds() + da = (df.iloc[l_idx] - df.iloc[l_idx - 1]) / dt + new_dt = (aligned_start - df.index[l_idx]).total_seconds() + #compute the new point + new_point = df.iloc[l_idx - 1] + new_dt * da + #and add it back to the dataframe + trimmed_dfs[i] = pd.concat([ + pd.DataFrame([new_point], index= [aligned_start]), + trimmed_dfs[i] + ]) + #in the case that the data is already in the correct point, add it back in + else: + trimmed_dfs[i] = pd.concat([df.loc[[aligned_start]], trimmed_dfs[i]]) + + #repeating the steps above, with slight indexing differences to accomodate the right index + for i, (df, r_idx) in enumerate(zip(dfs, right_idx)): + if df.index[r_idx] != aligned_end: + dt = (df.index[r_idx] - df.index[r_idx - 1]).total_seconds() + da = (df.iloc[r_idx] - df.iloc[r_idx - 1]) / dt + new_dt = (aligned_end - (df.index[r_idx - 1])).total_seconds() + new_point = df.iloc[r_idx - 1] + new_dt * da + trimmed_dfs[i] = pd.concat([ + trimmed_dfs[i], + pd.DataFrame([new_point], index = [aligned_end]) + ]) + else: + trimmed_dfs[i] = pd.concat([trimmed_dfs[i], df.loc[[aligned_end]]]) + + #resamples the data to the dataframe with the most points available + total_samples = max(tdf.shape[0] for tdf in trimmed_dfs) + resampled_dfs = [resample(df, num_samples = total_samples) for df in trimmed_dfs] + + """ + In the current implementation of scipy's resample, there can be some inconsistent rounding point + when creating the datetimes. For this reason, we will find one that meets spec (correct start + and end points) and use that for all. + """ + datepoints = None + for df in resampled_dfs: + if df.index[0] == aligned_start and df.index[-1] == aligned_end: + datepoints = df.index + break + if datepoints is None: + raise ValueError("resampling error, timestamps incosistent with inputs") + for df in resampled_dfs: df.index = datepoints + return resampled_dfs diff --git a/endaq/ide/info.py b/endaq/ide/info.py index 94380d8..f928e1d 100644 --- a/endaq/ide/info.py +++ b/endaq/ide/info.py @@ -4,7 +4,7 @@ from __future__ import annotations import typing -from collections import defaultdict +from collections import defaultdict, namedtuple import datetime import dateutil.tz import warnings @@ -14,15 +14,16 @@ import pandas.io.formats.style import idelib.dataset -from .measurement import MeasurementType, ANY, get_channels +from .measurement import MeasurementType, ANY, get_channels, ACCELERATION from .files import get_doc -from .util import parse_time - +from .util import parse_time, get_accelerometer_info, get_accelerometer_bounds +from endaq.calc import utils, filters __all__ = [ "get_channel_table", "to_pandas", "get_primary_sensor_data", + "get_unified_acceleration" ] @@ -433,3 +434,134 @@ def get_primary_sensor_data( #Return only the subchannels with right units return data[channels.name] + +# ============================================================================ +# +# ============================================================================ +def get_unified_acceleration(doc: idelib.dataset.Dataset) -> pd.DataFrame: + """ + Computes a more accurate acceleration channel by filtering frequencies + out of range and combining frequencies where multiple accelerometers are accurate. + + Note that this method uses :py:func:`calc.utils.resample()`, which can cause artifacts + at the start or end of the data due to a assumption about signal periodicity. If important + data is recorded within those bounds, it is not recommended to use this method. + + :param doc: An open `Dataset` object, see :py:func:`~endaq.ide.get_doc()` + for more. + + :return: a pandas Dataframe with the flattened acceleration data. Output channel names + are [X, Y, Z]. + """ + + acceleration_channels = get_channels(doc, ACCELERATION, False) + + dfs = [to_pandas(ch) for ch in acceleration_channels] + + cleaned_dfs = [] + for idx in range(len(dfs)): + df = dfs[idx].copy() + #drop non-axes channels + df = df[[col for col in df.columns if col[0] in 'XYZ']] + #0 mean all data + for col in df.columns: + df[col] = df[col] - np.mean(df[col]) + cleaned_dfs.append(df) + + aligned_dfs = utils.align_dataframes(cleaned_dfs) + aligned_sr = 1 / (aligned_dfs[0].index[1] - aligned_dfs[0].index[0]).total_seconds() + sensor_info = [get_accelerometer_info(ch) for ch in acceleration_channels] + noises = np.array([si['noise'] for si in sensor_info]) + bounds = [(si['low_cutoff'], min(si['high_cutoff'], int(aligned_sr / 2) - 1)) for si in sensor_info] + #filters out the frequencies that the sensors can not accurately detect + dfs = [filters.butterworth(df, low_cutoff= l_bound, high_cutoff= r_bound) + for (df, (l_bound, r_bound)) in zip(aligned_dfs, bounds)] + for df in dfs: df.columns = ['X', 'Y', 'Z'] + + hz_overlaps = _find_all_overlaps(bounds = bounds) + averaged_dfs = [] + + #for each range, the good frequencies are isolated and averaged with the other datasets + #who share the same freuqency range + for k, v in hz_overlaps.items(): + cur_overlap_dfs = [dfs[v_i] for v_i in v] + cur_noises = noises[v] + hz_overlap_dfs = [filters.butterworth(df, low_cutoff=k[0], high_cutoff=k[1]) + for df in cur_overlap_dfs] + bound_avgd_df = _weighted_avg(hz_overlap_dfs, cur_noises) + averaged_dfs.append(bound_avgd_df) + + return sum(averaged_dfs) + +def _find_all_overlaps( + bounds: typing.List[typing.Tuple[int, int]] + ) -> typing.Dict[typing.Tuple[int, int], typing.List[int]]: + """ + finds **all** possible overlaps of a list of start and end bounds. if an end bound and a start + bound share the same value, it is not considered overlapping. + + :param bounds: a list of each start and end bounds, where the end bound is strictly greater + than the starting bounds. + + :return: a dictionary where the keys are the bounds for every overlap, and the values are + the indices that belong in each bound, respective to ``bounds`` + """ + + if len(bounds) == 0: + return {} + #labeling the components + lblHz = namedtuple("labeledTuple", ("idx", "Hz")) + labels = range(len(bounds)) + + lower_bounds = list(map(lblHz, labels, [b[0] for b in bounds])) + upper_bounds = list(map(lblHz, labels, [b[1] for b in bounds])) + bounds_sorted = sorted(lower_bounds + upper_bounds, key= lambda b: b.Hz) + + open_intervals: typing.List[lblHz.idx] = np.array([bounds_sorted[0].idx]) + closed_intervals: dict[tuple[int, int], typing.List[int]] = {} + + left_bound = bounds_sorted[0].Hz + bounds_sorted = bounds_sorted[1:] + + for point in bounds_sorted: + if left_bound != point.Hz: + if open_intervals.size != 0: + closed_intervals[left_bound, point.Hz] = open_intervals + left_bound = point.Hz + if point.idx in open_intervals: + open_intervals = np.setdiff1d(open_intervals, [point.idx]) + else: + open_intervals = np.append(open_intervals, point.idx) + return closed_intervals + + +def _weighted_avg(dfs: typing.List[pd.DataFrame], noise: typing.List[float]): + """ + Combines multiple dataframe into one by applying a weighted average base on noise in a linear-inverse fashion. + For sensors A and B, if the noise ratio is 2:1, the weighing ratio will be 1:2. Note that a value of 0 is + considered in the weighting. + + :param dfs: the dataframes to normalize. It is assumed that all dataframes have + the same DateTimeIndex values. + :param noise: The noise value to the index-respective dataframe. Noise will be normalized. + + :return: a single dataframe with the weighted values + """ + if len(dfs) != len(noise) or len(dfs) == 0: + raise ValueError("dataframes and noise need to have equal, non zero number of elements") + #normalize the weightings. If there is a weighting with 0 noise, return that instead + if 0 in noise: + return dfs[noise.index(0)] + noise_normalized = np.array(noise) / sum(noise) + + #formula for the inverse is for array [a0, a1, ..., an], inverse is + # [a1a2...an, a0a2...an, a1a2...an], and then normalized + new_noise = 1 / noise_normalized + for n in noise_normalized: + new_noise *= n + + new_noise = new_noise / sum(new_noise) + #in the case that len(dfs) == 1, the noise is 1 and the original is returned + return pd.DataFrame(sum([n * d for n, d in zip(new_noise, dfs)]), + columns = dfs[0].columns, index = dfs[0].index) + \ No newline at end of file diff --git a/endaq/ide/util.py b/endaq/ide/util.py index 1aa66e9..a44ae6a 100644 --- a/endaq/ide/util.py +++ b/endaq/ide/util.py @@ -4,8 +4,14 @@ import datetime import string from ebmlite import loadSchema +from typing import Union -__all__ = ['parse_time', 'validate'] +from .measurement import ACCELERATION +import idelib.dataset +import numpy as np + + +__all__ = ['parse_time', 'validate', 'get_accelerometer_bounds', 'get_accelerometer_info'] # ============================================================================ @@ -122,4 +128,99 @@ def parse_time(t, datetime_start=None): raise TypeError(f"Unsupported type for parse_time(): {type(t).__name__} ({t!r})") +# ============================================================================ +# +# ============================================================================ + +def get_accelerometer_bounds(ch: Union[idelib.dataset.Channel, idelib.dataset.SubChannel]) -> tuple[int, int]: + """ + Gets the g-rating of the sensor used from the given channel. + + :param ch: The channel of the dataset to work on. This channel should have a child, which will + be used to extract the data. + :return: a tuple of g-rating bounds. + """ + ch = ch if isinstance(ch, idelib.dataset.SubChannel) else ch[0] + if not ACCELERATION.match(ch): + raise ValueError("An acceleration channel should be given") + s_name = ch.sensor.name + + if s_name.startswith("ADXL"): + g = {"ADXL345": 16, "ADXL362": 16, + "ADXL357": 40, "ADXL359": 40, + "ADXL355": 8, "ADXL375": 200, + }[s_name.split(" ")[0]] + return (-1 * g, g) + + values = np.asarray([8, 16, 25, 100, 200, 500, 2000, 6000]) + #finds the value closest to ch.transform(0, 65535)[1]. This is because + #we PR have some resistance and won't by default give the value we want, + #and digital sensors do not return perfect values. + g = values[np.abs(np.asarray(values) - ch.transform(0, 65535)[1]).argmin()] + return (-1 * g, g) + +# ============================================================================ +# +# ============================================================================ + +def get_accelerometer_info(ch: idelib.dataset.Channel) -> dict: + """ + creates a dictionary with information relevant to :py:func:`refine_acceleration`, namely + + - sensor_type: Literal["DC", "PE", "PR"]. Indicates if the sensor is digital, piezoelectric, + or piezoresistive respectively + - rating: the g rating of the sensor, or the maximum it can read while accurate + - noise: float. Indicates the noise of the sensor when the response curve is flat + - low_cutoff: the lowest rate at which the sensor is being shaken at where the response curve is flat. + - high_cutoff: the highest rate at which the sensor is being shaken at where the response curve is flat. + + :param ch: The channel of the dataset to work on. This channel should have a child, which will + be used to extract the data. This sensor must + + :return: a dictionary with the information stated above + """ + if not ACCELERATION.match(ch): + raise ValueError("An acceleration channel should be given") + + times = ch.getSession().arraySlice()[0, :] + #same thing as 1/ ((times[1] - times[0]) / 1000000) + sample_rate = 1000000 / (times[1] - times[0]) + sensor = ch[0].sensor.name + kwords = sensor.split(" ") + if kwords[0].startswith("ADXL"): + s_type = "DC" + else: + s_type = kwords[1] + rating = get_accelerometer_bounds(ch)[1] + + if (s_type == "PE"): + low = 10 + high = int(sample_rate / 5) + try: + noise = {25: 8E-4, 100: 3E-3, 500: 1.5E-2, 2000: 0.06, 6000: 0.08}[rating] + except KeyError: + raise ValueError(f"rating {rating} not supported for Piezoelectric sensors") + elif (s_type == "DC"): + try: + low, high = {8: (1, 150), 16: (1,300), 40: (1, 100)}[rating] + noise = {8: 2E-5, 16: 4E-3, 40: 8E-5}[rating] + except KeyError: + raise ValueError(f"rating {rating} not supported for digital IMUs") + elif (s_type == "PR"): + low = 1 + high = int(sample_rate / 5) + try: + noise = {50: 3E-3, 500: 1.5E-2, 2000:6E-2}[rating] + except: + raise ValueError(f"rating {rating} not supported for Piezoresistve sensors") + else: + raise ValueError(f"Sensor type {s_type} not recognized, should be one of PE, DC, PR.") + + return { + "sensor_type": s_type, + "noise": noise, + "rating": rating, + "low_cutoff": low, + "high_cutoff": high, + } \ No newline at end of file diff --git a/tests/ide/test_info.py b/tests/ide/test_info.py index f68758e..d9fd4f3 100644 --- a/tests/ide/test_info.py +++ b/tests/ide/test_info.py @@ -7,7 +7,7 @@ from idelib.importer import importFile import pandas as pd -from endaq.ide import files, info +from endaq.ide import files, info, to_pandas IDE_FILENAME = os.path.join(os.path.dirname(__file__), "test.ide") @@ -204,3 +204,14 @@ def test_to_pandas_tz(test_IDE): if __name__ == '__main__': unittest.main() + +def test_get_unified_acceleration(test_IDE): + """ + Tests that the start and end stamps are correct, and + that all data "exists" (no NaN values) + """ + accel_channels = [to_pandas(ch) for ch in info.get_channels(test_IDE, 'accel', False)] + accel = info.get_unified_acceleration(test_IDE) + assert accel.notnull().all().all() + assert accel.index[0] == max([ac.index[0] for ac in accel_channels]) + assert accel.index[-1] == min([ac.index[-1] for ac in accel_channels]) \ No newline at end of file diff --git a/tests/ide/test_util.py b/tests/ide/test_util.py new file mode 100644 index 0000000..1f196d0 --- /dev/null +++ b/tests/ide/test_util.py @@ -0,0 +1,68 @@ +from idelib.importer import importFile +import pytest +from endaq.ide.info import get_channels +from endaq.ide.util import get_accelerometer_info, get_accelerometer_bounds +import os + +IDE_FILENAME = os.path.join(os.path.dirname(__file__), "test.ide") +PR_IDE_FILENAME = os.path.join(os.path.dirname(__file__), "../batch/DAQ09576_000001.IDE") +PE_IDE_FILENAME = os.path.join(os.path.dirname(__file__), "../batch/test1.IDE'") +@pytest.fixture +def test_IDE(): + with importFile(IDE_FILENAME) as ds: + yield ds + +@pytest.fixture +def test_PR_IDE(): + with importFile(IDE_FILENAME) as ds: + yield ds + +@pytest.fixture +def test_PE_IDE(): + with importFile(IDE_FILENAME) as ds: + yield ds + +def test_get_accelerometer_info(test_IDE): + """ + Tests highlighted by a comment above the assert. + `get_accelerometer_bounds` is also tested in this method. + """ + for accel in get_channels(test_IDE, "accel", False): + + sensor_info = get_accelerometer_info(accel) + l = sensor_info['low_cutoff'] + h = sensor_info['high_cutoff'] + + #lower cutoff is always lower than higher cutoff + assert l < h + #cutoffs are both positive. By transitivity h > 0 if this test passes. + assert l > 0 + #rating is one of the valid existing types + assert sensor_info['rating'] in [8, 16, 25, 100, 200, 500, 2000, 6000] + +def test_incorrect_sensor_get_accelerometer_info(test_IDE): + """ + tests that passing in non accelerometer info results in a fail. + """ + accels = get_channels(test_IDE, "accel", False) + for _, channel in test_IDE.channels.items(): + if channel not in accels: + with pytest.raises(ValueError): + get_accelerometer_info(channel) + +def test_get_accelerometer_bounds(test_IDE, test_PE_IDE, test_PR_IDE): + """ + Tests that the subchannels g rating matches the parents value, + that all g values are symmetric, and it's one of the g-ratings we + currently offer. + """ + for ide in [test_IDE, test_PE_IDE, test_PR_IDE]: + acc_sub_chs = get_channels(ide, "accel", True) + acc_chs = get_channels(ide, "accel", False) + parent_gs = list(map(get_accelerometer_bounds, acc_chs)) + for g in parent_gs: + assert -1 * g[0] == g[1] + assert g[1] in [8, 16, 25, 40, 100, 200, 500, 2000, 6000] + for ch in acc_sub_chs: + expected_g = parent_gs[acc_chs.index(ch.parent)] + assert get_accelerometer_bounds(ch) == expected_g \ No newline at end of file