import os import numpy as np import pandas as pd import torch def collate_fn(data, max_len=None): """Build mini-batch tensors from a list of (X, mask) tuples. Mask input. Create Args: data: len(batch_size) list of tuples (X, y). - X: torch tensor of shape (seq_length, feat_dim); variable seq_length. - y: torch tensor of shape (num_labels,) : class indices or numerical targets (for classification or regression, respectively). num_labels > 1 for multi-task models max_len: global fixed sequence length. Used for architectures requiring fixed length input, where the batch length cannot vary dynamically. Longer sequences are clipped, shorter are padded with 0s Returns: X: (batch_size, padded_length, feat_dim) torch tensor of masked features (input) targets: (batch_size, padded_length, feat_dim) torch tensor of unmasked features (output) target_masks: (batch_size, padded_length, feat_dim) boolean torch tensor 0 indicates masked values to be predicted, 1 indicates unaffected/"active" feature values padding_masks: (batch_size, padded_length) boolean tensor, 1 means keep vector at this position, 0 means padding """ batch_size = len(data) features, labels = zip(*data) # Stack and pad features and masks (convert 2D to 3D tensors, i.e. add batch dimension) lengths = [X.shape[0] for X in features] # original sequence length for each time series if max_len is None: max_len = max(lengths) X = torch.zeros(batch_size, max_len, features[0].shape[-1]) # (batch_size, padded_length, feat_dim) for i in range(batch_size): end = min(lengths[i], max_len) X[i, :end, :] = features[i][:end, :] targets = torch.stack(labels, dim=0) # (batch_size, num_labels) padding_masks = padding_mask(torch.tensor(lengths, dtype=torch.int16), max_len=max_len) # (batch_size, padded_length) boolean tensor, "1" means keep return X, targets, padding_masks def padding_mask(lengths, max_len=None): """ Used to mask padded positions: creates a (batch_size, max_len) boolean mask from a tensor of sequence lengths, where 1 means keep element at this position (time step) """ batch_size = lengths.numel() max_len = max_len or lengths.max_val() # trick works because of overloading of 'or' operator for non-boolean types return (torch.arange(0, max_len, device=lengths.device) .type_as(lengths) .repeat(batch_size, 1) .lt(lengths.unsqueeze(1))) class Normalizer(object): """ Normalizes dataframe across ALL contained rows (time steps). Different from per-sample normalization. """ def __init__(self, norm_type='standardization', mean=None, std=None, min_val=None, max_val=None): """ Args: norm_type: choose from: "standardization", "minmax": normalizes dataframe across ALL contained rows (time steps) "per_sample_std", "per_sample_minmax": normalizes each sample separately (i.e. across only its own rows) mean, std, min_val, max_val: optional (num_feat,) Series of pre-computed values """ self.norm_type = norm_type self.mean = mean self.std = std self.min_val = min_val self.max_val = max_val def normalize(self, df): """ Args: df: input dataframe Returns: df: normalized dataframe """ if self.norm_type == "standardization": if self.mean is None: self.mean = df.mean() self.std = df.std() return (df - self.mean) / (self.std + np.finfo(float).eps) elif self.norm_type == "minmax": if self.max_val is None: self.max_val = df.max() self.min_val = df.min() return (df - self.min_val) / (self.max_val - self.min_val + np.finfo(float).eps) elif self.norm_type == "per_sample_std": grouped = df.groupby(by=df.index) return (df - grouped.transform('mean')) / grouped.transform('std') elif self.norm_type == "per_sample_minmax": grouped = df.groupby(by=df.index) min_vals = grouped.transform('min') return (df - min_vals) / (grouped.transform('max') - min_vals + np.finfo(float).eps) else: raise (NameError(f'Normalize method "{self.norm_type}" not implemented')) def interpolate_missing(y): """ Replaces NaN values in pd.Series `y` using linear interpolation """ if y.isna().any(): y = y.interpolate(method='linear', limit_direction='both') return y def subsample(y, limit=256, factor=2): """ If a given Series is longer than `limit`, returns subsampled sequence by the specified integer factor """ if len(y) > limit: return y[::factor].reset_index(drop=True) return y