diff --git a/dataflow/__init__.py b/dataflow/__init__.py index aced101..f1d1ff4 100644 --- a/dataflow/__init__.py +++ b/dataflow/__init__.py @@ -1,3 +1,21 @@ -from .tsf import preprocess_time_series, load_and_split_time_series, process_and_save_time_series +from .tsf import preprocess_time_series, process_and_save_time_series +from .data_factory import data_provider, data_dict +from .data_loader import ( + Dataset_ETT_hour, + Dataset_ETT_minute, + Dataset_Custom, + Dataset_Solar, + Dataset_Pred +) -__all__ = ['preprocess_time_series', 'load_and_split_time_series', 'process_and_save_time_series'] +__all__ = [ + 'preprocess_time_series', + 'process_and_save_time_series', + 'data_provider', + 'data_dict', + 'Dataset_ETT_hour', + 'Dataset_ETT_minute', + 'Dataset_Custom', + 'Dataset_Solar', + 'Dataset_Pred' +] diff --git a/dataflow/data_factory.py b/dataflow/data_factory.py new file mode 100644 index 0000000..c14be34 --- /dev/null +++ b/dataflow/data_factory.py @@ -0,0 +1,57 @@ +from .data_loader import Dataset_ETT_hour, Dataset_ETT_minute, Dataset_Custom, Dataset_Solar, Dataset_Pred +from torch.utils.data import DataLoader + +data_dict = { + 'ETTh1': Dataset_ETT_hour, + 'ETTh2': Dataset_ETT_hour, + 'ETTm1': Dataset_ETT_minute, + 'ETTm2': Dataset_ETT_minute, + 'Solar': Dataset_Solar, + 'custom': Dataset_Custom, +} + + +def data_provider(args, flag): + Data = data_dict[args.data] + timeenc = 0 if args.embed != 'timeF' else 1 + train_only = args.train_only + + if flag == 'test': + shuffle_flag = False + drop_last = True + # drop_last = False # without the "drop-last" trick + batch_size = args.batch_size + freq = args.freq + elif flag == 'pred': + shuffle_flag = False + drop_last = False + batch_size = 1 + freq = args.freq + Data = Dataset_Pred + else: + shuffle_flag = True + drop_last = True + batch_size = args.batch_size + freq = args.freq + # if flag == 'train': + # drop_last = False + + data_set = Data( + root_path=args.root_path, + data_path=args.data_path, + flag=flag, + size=[args.seq_len, args.label_len, args.pred_len], + features=args.features, + target=args.target, + timeenc=timeenc, + freq=freq, + train_only=train_only + ) + print(flag, len(data_set)) + data_loader = DataLoader( + data_set, + batch_size=batch_size, + shuffle=shuffle_flag, + num_workers=args.num_workers, + drop_last=drop_last) + return data_set, data_loader diff --git a/dataflow/data_loader.py b/dataflow/data_loader.py new file mode 100644 index 0000000..386ad09 --- /dev/null +++ b/dataflow/data_loader.py @@ -0,0 +1,479 @@ +import os +import numpy as np +import pandas as pd +import torch +from torch.utils.data import Dataset +from sklearn.preprocessing import StandardScaler +from utils.timefeatures import time_features +import warnings + +warnings.filterwarnings('ignore') + + +class Dataset_ETT_hour(Dataset): + def __init__(self, root_path, flag='train', size=None, + features='S', data_path='ETTh1.csv', + target='OT', scale=True, timeenc=0, freq='h', train_only=None): + # size [seq_len, label_len, pred_len] + # info + if size == None: + self.seq_len = 24 * 4 * 4 + self.label_len = 24 * 4 + self.pred_len = 24 * 4 + else: + self.seq_len = size[0] + self.label_len = size[1] + self.pred_len = size[2] + # init + assert flag in ['train', 'test', 'val'] + type_map = {'train': 0, 'val': 1, 'test': 2} + self.set_type = type_map[flag] + + self.features = features + self.target = target + self.scale = scale + self.timeenc = timeenc + self.freq = freq + + self.root_path = root_path + self.data_path = data_path + self.__read_data__() + + def __read_data__(self): + self.scaler = StandardScaler() + df_raw = pd.read_csv(os.path.join(self.root_path, + self.data_path)) + + border1s = [0, 12 * 30 * 24 - self.seq_len, 12 * 30 * 24 + 4 * 30 * 24 - self.seq_len] + border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24] + border1 = border1s[self.set_type] + border2 = border2s[self.set_type] + + if self.features == 'M' or self.features == 'MS': + cols_data = df_raw.columns[1:] + df_data = df_raw[cols_data] + elif self.features == 'S': + df_data = df_raw[[self.target]] + + if self.scale: + train_data = df_data[border1s[0]:border2s[0]] + self.scaler.fit(train_data.values) + data = self.scaler.transform(df_data.values) + else: + data = df_data.values + + df_stamp = df_raw[['date']][border1:border2] + df_stamp['date'] = pd.to_datetime(df_stamp.date) + if self.timeenc == 0: + df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1) + df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1) + df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1) + df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1) + data_stamp = df_stamp.drop(['date'], 1).values + elif self.timeenc == 1: + data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq) + data_stamp = data_stamp.transpose(1, 0) + + self.data_x = data[border1:border2] + self.data_y = data[border1:border2] + self.data_stamp = data_stamp + + def __getitem__(self, index): + s_begin = index + s_end = s_begin + self.seq_len + r_begin = s_end - self.label_len + r_end = r_begin + self.label_len + self.pred_len + + seq_x = self.data_x[s_begin:s_end] + seq_y = self.data_y[r_begin:r_end] + seq_x_mark = self.data_stamp[s_begin:s_end] + seq_y_mark = self.data_stamp[r_begin:r_end] + + return seq_x, seq_y, seq_x_mark, seq_y_mark + + def __len__(self): + return len(self.data_x) - self.seq_len - self.pred_len + 1 + + def inverse_transform(self, data): + return self.scaler.inverse_transform(data) + + +class Dataset_ETT_minute(Dataset): + def __init__(self, root_path, flag='train', size=None, + features='S', data_path='ETTm1.csv', + target='OT', scale=True, timeenc=0, freq='t', train_only=False): + # size [seq_len, label_len, pred_len] + # info + if size == None: + self.seq_len = 24 * 4 * 4 + self.label_len = 24 * 4 + self.pred_len = 24 * 4 + else: + self.seq_len = size[0] + self.label_len = size[1] + self.pred_len = size[2] + # init + assert flag in ['train', 'test', 'val'] + type_map = {'train': 0, 'val': 1, 'test': 2} + self.set_type = type_map[flag] + + self.features = features + self.target = target + self.scale = scale + self.timeenc = timeenc + self.freq = freq + + self.root_path = root_path + self.data_path = data_path + self.__read_data__() + + def __read_data__(self): + self.scaler = StandardScaler() + df_raw = pd.read_csv(os.path.join(self.root_path, + self.data_path)) + + border1s = [0, 12 * 30 * 24 * 4 - self.seq_len, 12 * 30 * 24 * 4 + 4 * 30 * 24 * 4 - self.seq_len] + border2s = [12 * 30 * 24 * 4, 12 * 30 * 24 * 4 + 4 * 30 * 24 * 4, 12 * 30 * 24 * 4 + 8 * 30 * 24 * 4] + border1 = border1s[self.set_type] + border2 = border2s[self.set_type] + + if self.features == 'M' or self.features == 'MS': + cols_data = df_raw.columns[1:] + df_data = df_raw[cols_data] + elif self.features == 'S': + df_data = df_raw[[self.target]] + + if self.scale: + train_data = df_data[border1s[0]:border2s[0]] + self.scaler.fit(train_data.values) + data = self.scaler.transform(df_data.values) + else: + data = df_data.values + + df_stamp = df_raw[['date']][border1:border2] + df_stamp['date'] = pd.to_datetime(df_stamp.date) + if self.timeenc == 0: + df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1) + df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1) + df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1) + df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1) + df_stamp['minute'] = df_stamp.date.apply(lambda row: row.minute, 1) + df_stamp['minute'] = df_stamp.minute.map(lambda x: x // 15) + data_stamp = df_stamp.drop(['date'], 1).values + elif self.timeenc == 1: + data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq) + data_stamp = data_stamp.transpose(1, 0) + + self.data_x = data[border1:border2] + self.data_y = data[border1:border2] + self.data_stamp = data_stamp + + def __getitem__(self, index): + s_begin = index + s_end = s_begin + self.seq_len + r_begin = s_end - self.label_len + r_end = r_begin + self.label_len + self.pred_len + + seq_x = self.data_x[s_begin:s_end] + seq_y = self.data_y[r_begin:r_end] + seq_x_mark = self.data_stamp[s_begin:s_end] + seq_y_mark = self.data_stamp[r_begin:r_end] + + return seq_x, seq_y, seq_x_mark, seq_y_mark + + def __len__(self): + return len(self.data_x) - self.seq_len - self.pred_len + 1 + + def inverse_transform(self, data): + return self.scaler.inverse_transform(data) + + +class Dataset_Custom(Dataset): + def __init__(self, root_path, flag='train', size=None, + features='S', data_path='ETTh1.csv', + target='OT', scale=True, timeenc=0, freq='h', train_only=False): + # size [seq_len, label_len, pred_len] + # info + if size == None: + self.seq_len = 24 * 4 * 4 + self.label_len = 24 * 4 + self.pred_len = 24 * 4 + else: + self.seq_len = size[0] + self.label_len = size[1] + self.pred_len = size[2] + # init + assert flag in ['train', 'test', 'val'] + type_map = {'train': 0, 'val': 1, 'test': 2} + self.set_type = type_map[flag] + + self.features = features + self.target = target + self.scale = scale + self.timeenc = timeenc + self.freq = freq + self.train_only = train_only + + self.root_path = root_path + self.data_path = data_path + self.__read_data__() + + def __read_data__(self): + self.scaler = StandardScaler() + df_raw = pd.read_csv(os.path.join(self.root_path, + self.data_path)) + + ''' + df_raw.columns: ['date', ...(other features), target feature] + ''' + cols = list(df_raw.columns) + if self.features == 'S': + cols.remove(self.target) + cols.remove('date') + # print(cols) + num_train = int(len(df_raw) * (0.7 if not self.train_only else 1)) + num_test = int(len(df_raw) * 0.2) + num_vali = len(df_raw) - num_train - num_test + border1s = [0, num_train - self.seq_len, len(df_raw) - num_test - self.seq_len] + border2s = [num_train, num_train + num_vali, len(df_raw)] + border1 = border1s[self.set_type] + border2 = border2s[self.set_type] + + if self.features == 'M' or self.features == 'MS': + df_raw = df_raw[['date'] + cols] + cols_data = df_raw.columns[1:] + df_data = df_raw[cols_data] + elif self.features == 'S': + df_raw = df_raw[['date'] + cols + [self.target]] + df_data = df_raw[[self.target]] + + if self.scale: + train_data = df_data[border1s[0]:border2s[0]] + self.scaler.fit(train_data.values) + # print(self.scaler.mean_) + # exit() + data = self.scaler.transform(df_data.values) + else: + data = df_data.values + + df_stamp = df_raw[['date']][border1:border2] + df_stamp['date'] = pd.to_datetime(df_stamp.date) + if self.timeenc == 0: + df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1) + df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1) + df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1) + df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1) + data_stamp = df_stamp.drop(['date'], 1).values + elif self.timeenc == 1: + data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq) + data_stamp = data_stamp.transpose(1, 0) + + self.data_x = data[border1:border2] + self.data_y = data[border1:border2] + self.data_stamp = data_stamp + + def __getitem__(self, index): + s_begin = index + s_end = s_begin + self.seq_len + r_begin = s_end - self.label_len + r_end = r_begin + self.label_len + self.pred_len + + seq_x = self.data_x[s_begin:s_end] + seq_y = self.data_y[r_begin:r_end] + seq_x_mark = self.data_stamp[s_begin:s_end] + seq_y_mark = self.data_stamp[r_begin:r_end] + + return seq_x, seq_y, seq_x_mark, seq_y_mark + + def __len__(self): + return len(self.data_x) - self.seq_len - self.pred_len + 1 + + def inverse_transform(self, data): + return self.scaler.inverse_transform(data) + +class Dataset_Solar(Dataset): + def __init__(self, root_path, flag='train', size=None, + features='S', data_path='ETTh1.csv', + target='OT', scale=True, timeenc=0, freq='h', train_only=False): + # size [seq_len, label_len, pred_len] + # info + self.seq_len = size[0] + self.label_len = size[1] + self.pred_len = size[2] + # init + assert flag in ['train', 'test', 'val'] + type_map = {'train': 0, 'val': 1, 'test': 2} + self.set_type = type_map[flag] + + self.features = features + self.target = target + self.scale = scale + self.timeenc = timeenc + self.freq = freq + + self.root_path = root_path + self.data_path = data_path + self.__read_data__() + + def __read_data__(self): + self.scaler = StandardScaler() + df_raw = [] + with open(os.path.join(self.root_path, self.data_path), "r", encoding='utf-8') as f: + for line in f.readlines(): + line = line.strip('\n').split(',') + data_line = np.stack([float(i) for i in line]) + df_raw.append(data_line) + df_raw = np.stack(df_raw, 0) + df_raw = pd.DataFrame(df_raw) + + num_train = int(len(df_raw) * 0.7) + num_test = int(len(df_raw) * 0.2) + num_valid = int(len(df_raw) * 0.1) + border1s = [0, num_train - self.seq_len, len(df_raw) - num_test - self.seq_len] + border2s = [num_train, num_train + num_valid, len(df_raw)] + border1 = border1s[self.set_type] + border2 = border2s[self.set_type] + + df_data = df_raw.values + + if self.scale: + train_data = df_data[border1s[0]:border2s[0]] + self.scaler.fit(train_data) + data = self.scaler.transform(df_data) + else: + data = df_data + + self.data_x = data[border1:border2] + self.data_y = data[border1:border2] + + def __getitem__(self, index): + # 1. 定义输入序列 seq_x 的起止位置 + s_begin = index + s_end = s_begin + self.seq_len + # 2. 定义目标序列 seq_y 的起止位置 + # seq_y 的开始 (r_begin) 就是 seq_x 的结束 (s_end) + r_begin = s_end + # seq_y 的结束 (r_end) 是其开始位置加上预测长度 (pred_len) + r_end = r_begin + self.pred_len + # 3. 根据起止位置切片数据 + seq_x = self.data_x[s_begin:s_end] + seq_y = self.data_y[r_begin:r_end] + seq_x_mark = torch.zeros((seq_x.shape[0], 1)) + seq_y_mark = torch.zeros((seq_y.shape[0], 1)) # 长度为 pred_len + seq_x = seq_x.astype('float32') + seq_y = seq_y.astype('float32') + return seq_x, seq_y, seq_x_mark, seq_y_mark + + def __len__(self): + return len(self.data_x) - self.seq_len - self.pred_len + 1 + + def inverse_transform(self, data): + return self.scaler.inverse_transform(data) + +class Dataset_Pred(Dataset): + def __init__(self, root_path, flag='pred', size=None, + features='S', data_path='ETTh1.csv', + target='OT', scale=True, inverse=False, timeenc=0, freq='15min', cols=None, train_only=False): + # size [seq_len, label_len, pred_len] + # info + if size == None: + self.seq_len = 24 * 4 * 4 + self.label_len = 24 * 4 + self.pred_len = 24 * 4 + else: + self.seq_len = size[0] + self.label_len = size[1] + self.pred_len = size[2] + # init + assert flag in ['pred'] + + self.features = features + self.target = target + self.scale = scale + self.inverse = inverse + self.timeenc = timeenc + self.freq = freq + self.cols = cols + self.root_path = root_path + self.data_path = data_path + self.__read_data__() + + def __read_data__(self): + self.scaler = StandardScaler() + df_raw = pd.read_csv(os.path.join(self.root_path, + self.data_path)) + ''' + df_raw.columns: ['date', ...(other features), target feature] + ''' + if self.cols: + cols = self.cols.copy() + else: + cols = list(df_raw.columns) + self.cols = cols.copy() + cols.remove('date') + if self.features == 'S': + cols.remove(self.target) + border1 = len(df_raw) - self.seq_len + border2 = len(df_raw) + + if self.features == 'M' or self.features == 'MS': + df_raw = df_raw[['date'] + cols] + cols_data = df_raw.columns[1:] + df_data = df_raw[cols_data] + elif self.features == 'S': + df_raw = df_raw[['date'] + cols + [self.target]] + df_data = df_raw[[self.target]] + + if self.scale: + self.scaler.fit(df_data.values) + data = self.scaler.transform(df_data.values) + else: + data = df_data.values + + tmp_stamp = df_raw[['date']][border1:border2] + tmp_stamp['date'] = pd.to_datetime(tmp_stamp.date) + pred_dates = pd.date_range(tmp_stamp.date.values[-1], periods=self.pred_len + 1, freq=self.freq) + + df_stamp = pd.DataFrame(columns=['date']) + df_stamp.date = list(tmp_stamp.date.values) + list(pred_dates[1:]) + self.future_dates = list(pred_dates[1:]) + if self.timeenc == 0: + df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1) + df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1) + df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1) + df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1) + df_stamp['minute'] = df_stamp.date.apply(lambda row: row.minute, 1) + df_stamp['minute'] = df_stamp.minute.map(lambda x: x // 15) + data_stamp = df_stamp.drop(['date'], 1).values + elif self.timeenc == 1: + data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq) + data_stamp = data_stamp.transpose(1, 0) + + self.data_x = data[border1:border2] + if self.inverse: + self.data_y = df_data.values[border1:border2] + else: + self.data_y = data[border1:border2] + self.data_stamp = data_stamp + + def __getitem__(self, index): + s_begin = index + s_end = s_begin + self.seq_len + r_begin = s_end - self.label_len + r_end = r_begin + self.label_len + self.pred_len + + seq_x = self.data_x[s_begin:s_end] + if self.inverse: + seq_y = self.data_x[r_begin:r_begin + self.label_len] + else: + seq_y = self.data_y[r_begin:r_begin + self.label_len] + seq_x_mark = self.data_stamp[s_begin:s_end] + seq_y_mark = self.data_stamp[r_begin:r_end] + + return seq_x, seq_y, seq_x_mark, seq_y_mark + + def __len__(self): + return len(self.data_x) - self.seq_len + 1 + + def inverse_transform(self, data): + return self.scaler.inverse_transform(data) diff --git a/dataflow/tsf.py b/dataflow/tsf.py index 0306ad0..0caeb2c 100644 --- a/dataflow/tsf.py +++ b/dataflow/tsf.py @@ -3,53 +3,104 @@ import numpy as np from sklearn.preprocessing import StandardScaler import joblib from utils.timefeatures import time_features +import os +def get_ett_dataset_borders(dataset_name, data_len, input_len): + """ + ETT系列数据集的特定边界处理函数 + + Args: + dataset_name (str): 数据集名称(如 'ETTm1', 'ETTh1') + data_len (int): 数据总长度 + input_len (int): 输入序列长度 + + Returns: + tuple: (border1s, border2s) 边界点列表 + """ + if dataset_name.startswith('ETTm'): + # ETTm1, ETTm2: 15分钟间隔,每天96个点 + border1s = [0, 12 * 30 * 96 - input_len, 12 * 30 * 96 + 4 * 30 * 96 - input_len] + border2s = [12 * 30 * 96, 12 * 30 * 96 + 4 * 30 * 96, 12 * 30 * 96 + 8 * 30 * 96] + elif dataset_name.startswith('ETTh'): + # ETTh1, ETTh2: 小时间隔,每天24个点 + border1s = [0, 12 * 30 * 24 - input_len, 12 * 30 * 24 + 4 * 30 * 24 - input_len] + border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24] + else: + raise ValueError(f"Unknown ETT dataset: {dataset_name}") + + return border1s, border2s + +# 示例:可以添加其他特定数据集的处理函数 +# def get_weather_dataset_borders(dataset_name, data_len, input_len): +# """ +# Weather数据集的特定边界处理函数 +# """ +# # 假设weather数据集使用不同的分割策略 +# # 比如:前80%训练,中间10%验证,后10%测试 +# train_end = int(data_len * 0.8) +# val_end = int(data_len * 0.9) +# +# border1s = [0, train_end - input_len, val_end - input_len] +# border2s = [train_end, val_end, data_len] +# +# return border1s, border2s + +# 数据集处理函数映射表 +DATASET_HANDLERS = { + 'ETTm1': get_ett_dataset_borders, + 'ETTm2': get_ett_dataset_borders, + 'ETTh1': get_ett_dataset_borders, + 'ETTh2': get_ett_dataset_borders, + # 可以在这里添加更多数据集的处理函数 + # 'weather': get_weather_dataset_borders, +} def preprocess_time_series( csv_data, input_len, pred_len, slide_step, - train_ratio=0.6, - test_ratio=0.2, - val_ratio=0.2, + dataset_name=None, # 新增:数据集名称参数 + data_path_name='ETTm1.csv', # 保留向后兼容,但优先使用dataset_name selected_columns=None, date_column='date', - freq='T', + freq='h', # 按照分析,原文 ETTm1/ETTh1 实验均使用 'h' + split_method='auto', # 'auto', 'specific', 'ratio' + train_ratio=0.7, + val_ratio=0.1, + test_ratio=0.2, + has_time_column=True, # 新增:是否包含时间列 ): """ - Preprocess time series data from CSV for model training, testing and validation. - Applies global Z-score normalization using only training data statistics. - + 修改版:根据 TimesNet 原文逻辑预处理时序数据。 + 1. 支持三种分割方法:auto(自动选择)、specific(特定数据集)、ratio(比例分割) + 2. 支持基于数据集名称的特定处理函数调用 + 3. 滑动窗口的目标 y 长度为 pred_len (按用户要求)。 + 4. 支持无时间列的数据集处理 + Args: csv_data (pd.DataFrame or str): CSV data as DataFrame or path to CSV file - input_len (int): Length of input sequence + input_len (int): Length of input sequence (seq_len in original paper) pred_len (int): Length of prediction sequence slide_step (int): Step size for sliding window - train_ratio (float): Ratio of data to use for training (default: 0.6) - test_ratio (float): Ratio of data to use for testing (default: 0.2) - val_ratio (float): Ratio of data to use for validation (default: 0.2) - selected_columns (list): List of column names to use (default: None, uses all) - date_column (str): Name of the date column (default: 'date') - freq (str): Frequency of the time series data (default: 'T' for minutely) + dataset_name (str): 数据集名称(如 'ETTm1', 'weather'),优先使用此参数 + data_path_name (str): 数据文件名(如 'ETTm1.csv'),向后兼容用 + selected_columns (list): List of column names to use (default: None, uses all). + date_column (str): Name of the date column (default: 'date'). + freq (str): Frequency for time features ('h' for hourly, 't' for minutely). + split_method (str): Data split method - 'auto', 'specific', or 'ratio' + - 'auto': automatically choose based on dataset_name + - 'specific': use dataset-specific split function + - 'ratio': use ratio-based split + train_ratio (float): Training set ratio (only used when split_method='ratio') + val_ratio (float): Validation set ratio (only used when split_method='ratio') + test_ratio (float): Test set ratio (only used when split_method='ratio') + has_time_column (bool): Whether the dataset has a time column Returns: - dict: Dictionary containing: - - train_x: Training input sequences - - train_y: Training target sequences - - train_x_mark: Training input time features - - train_y_mark: Training target time features - - test_x: Testing input sequences - - test_y: Testing target sequences - - test_x_mark: Testing input time features - - test_y_mark: Testing target time features - - val_x: Validation input sequences - - val_y: Validation target sequences - - val_x_mark: Validation input time features - - val_y_mark: Validation target time features - - scaler: Fitted StandardScaler object for inverse transformation + dict: Dictionary containing processed data. """ - # Load data if path to CSV is provided + # 1. 加载数据 if isinstance(csv_data, str): try: data = pd.read_csv(csv_data) @@ -60,97 +111,113 @@ def preprocess_time_series( else: data = csv_data.copy() - # Extract time features from date column - if date_column in data.columns: + # 2. 提取时间特征(仅在有时间列时) + if has_time_column and date_column in data.columns: date_index = pd.to_datetime(data[date_column]) if isinstance(date_index, pd.Series): date_index = pd.DatetimeIndex(date_index) time_stamp = time_features(date_index, freq=freq) time_stamp = time_stamp.transpose(1, 0) # Shape: (n_samples, n_time_features) - else: + elif has_time_column: raise ValueError(f"Date column '{date_column}' not found in data") + else: + # 没有时间列,创建空的时间戳数组 + time_stamp = None - # Select columns if specified (excluding date column) + # 3. 选择数据列 if selected_columns is not None: data = data[selected_columns] else: - # Use all columns except the date column - feature_columns = [col for col in data.columns if col != date_column] + if has_time_column: + feature_columns = [col for col in data.columns if col != date_column] + else: + feature_columns = list(data.columns) data = data[feature_columns] - # Validate ratios sum to 1 - if abs(train_ratio + test_ratio + val_ratio - 1.0) > 1e-6: - raise ValueError(f"Ratios must sum to 1.0, got {train_ratio + test_ratio + val_ratio}") + # 4. 【核心修改】根据split_method选择数据集分割方式 + # 确定使用的数据集名称 + if dataset_name is None: + # 向后兼容:从文件路径提取数据集名称 + dataset_name = os.path.splitext(data_path_name)[0] - # Calculate split points - total_len = len(data) - train_len = int(total_len * train_ratio) - test_len = int(total_len * test_ratio) + if split_method == 'auto': + # 自动选择:特定数据集用specific,其他用ratio + if dataset_name in DATASET_HANDLERS: + split_method = 'specific' + else: + split_method = 'ratio' - # Split data into train, test and validation sets - train_data = data.iloc[:train_len].values - test_data = data.iloc[train_len:train_len + test_len].values - val_data = data.iloc[train_len + test_len:].values + if split_method == 'specific': + # 使用特定数据集的处理函数 + if dataset_name in DATASET_HANDLERS: + handler_func = DATASET_HANDLERS[dataset_name] + border1s, border2s = handler_func(dataset_name, len(data), input_len) + print(f"Using specific split for dataset '{dataset_name}'") + else: + print(f"Warning: No specific handler for dataset '{dataset_name}'. Falling back to ratio split.") + split_method = 'ratio' - # Split time features correspondingly - train_time_stamp = time_stamp[:train_len] - test_time_stamp = time_stamp[train_len:train_len + test_len] - val_time_stamp = time_stamp[train_len + test_len:] + if split_method == 'ratio': + # 使用比例分割数据集 + # 验证比例和为1 + if abs(train_ratio + val_ratio + test_ratio - 1.0) > 1e-6: + raise ValueError(f"Ratios must sum to 1.0, got {train_ratio + val_ratio + test_ratio}") + + total_len = len(data) + num_train = int(total_len * train_ratio) + num_val = int(total_len * val_ratio) + num_test = total_len - num_train - num_val # 确保所有数据都被使用 + + border1s = [0, num_train - input_len, num_train + num_val - input_len] + border2s = [num_train, num_train + num_val, total_len] + + print(f"Using ratio split for dataset '{dataset_name}': train={train_ratio:.1%}, val={val_ratio:.1%}, test={test_ratio:.1%}") + print(f"Data points: train={num_train}, val={num_val}, test={num_test}") + + train_data = data.iloc[border1s[0]:border2s[0]].values + val_data = data.iloc[border1s[1]:border2s[1]].values + test_data = data.iloc[border1s[2]:border2s[2]].values + + # 处理时间戳(仅在有时间列时) + if time_stamp is not None: + train_time_stamp = time_stamp[border1s[0]:border2s[0]] + val_time_stamp = time_stamp[border1s[1]:border2s[1]] + test_time_stamp = time_stamp[border1s[2]:border2s[2]] + else: + train_time_stamp = None + val_time_stamp = None + test_time_stamp = None - # Global Z-Score normalization using only training data statistics + # 5. 归一化 (Fit on training data only) scaler = StandardScaler() - scaler.fit(train_data) # Fit only on training data to avoid data leakage + scaler.fit(train_data) - # Apply normalization to all datasets using the same scaler train_data_scaled = scaler.transform(train_data) - test_data_scaled = scaler.transform(test_data) if len(test_data) > 0 else test_data - val_data_scaled = scaler.transform(val_data) if len(val_data) > 0 else val_data + val_data_scaled = scaler.transform(val_data) + test_data_scaled = scaler.transform(test_data) - # Create sliding windows for training data - train_x, train_y = create_sliding_windows( - train_data_scaled, input_len, pred_len, slide_step - ) - train_x_mark, train_y_mark = create_sliding_windows( - train_time_stamp, input_len, pred_len, slide_step - ) + # 6. 【核心修改】使用您的滑窗逻辑创建样本 + train_x, train_y = create_sliding_windows(train_data_scaled, input_len, pred_len, slide_step) + val_x, val_y = create_sliding_windows(val_data_scaled, input_len, pred_len, slide_step) + test_x, test_y = create_sliding_windows(test_data_scaled, input_len, pred_len, slide_step) - # Create sliding windows for testing data - if len(test_data) > 0: - test_x, test_y = create_sliding_windows( - test_data_scaled, input_len, pred_len, slide_step - ) - test_x_mark, test_y_mark = create_sliding_windows( - test_time_stamp, input_len, pred_len, slide_step - ) + # 处理时间标记(仅在有时间列时) + if train_time_stamp is not None: + train_x_mark, train_y_mark = create_sliding_windows(train_time_stamp, input_len, pred_len, slide_step) + val_x_mark, val_y_mark = create_sliding_windows(val_time_stamp, input_len, pred_len, slide_step) + test_x_mark, test_y_mark = create_sliding_windows(test_time_stamp, input_len, pred_len, slide_step) else: - test_x, test_y = np.array([]), np.array([]) - test_x_mark, test_y_mark = np.array([]), np.array([]) - - # Create sliding windows for validation data - if len(val_data) > 0: - val_x, val_y = create_sliding_windows( - val_data_scaled, input_len, pred_len, slide_step - ) - val_x_mark, val_y_mark = create_sliding_windows( - val_time_stamp, input_len, pred_len, slide_step - ) - else: - val_x, val_y = np.array([]), np.array([]) - val_x_mark, val_y_mark = np.array([]), np.array([]) + train_x_mark, train_y_mark = None, None + val_x_mark, val_y_mark = None, None + test_x_mark, test_y_mark = None, None return { - 'train_x': train_x, - 'train_y': train_y, - 'train_x_mark': train_x_mark, - 'train_y_mark': train_y_mark, - 'test_x': test_x, - 'test_y': test_y, - 'test_x_mark': test_x_mark, - 'test_y_mark': test_y_mark, - 'val_x': val_x, - 'val_y': val_y, - 'val_x_mark': val_x_mark, - 'val_y_mark': val_y_mark, + 'train_x': train_x, 'train_y': train_y, + 'train_x_mark': train_x_mark, 'train_y_mark': train_y_mark, + 'val_x': val_x, 'val_y': val_y, + 'val_x_mark': val_x_mark, 'val_y_mark': val_y_mark, + 'test_x': test_x, 'test_y': test_y, + 'test_x_mark': test_x_mark, 'test_y_mark': test_y_mark, 'scaler': scaler } @@ -158,9 +225,10 @@ def preprocess_time_series( def create_sliding_windows(data, input_len, pred_len, slide_step): """ Create sliding windows from time series data. + Target `y` has length `pred_len`. Args: - data (np.ndarray): Time series data + data (np.ndarray): Time series data (features or time marks) input_len (int): Length of input sequence pred_len (int): Length of prediction sequence slide_step (int): Step size for sliding window @@ -168,78 +236,26 @@ def create_sliding_windows(data, input_len, pred_len, slide_step): Returns: tuple: (X, y) where X is input sequences and y is target sequences """ - total_len = input_len + pred_len + total_window_len = input_len + pred_len X, y = [], [] - # Start indices for sliding windows - start_indices = range(0, len(data) - total_len + 1, slide_step) + n_samples = len(data) - for start_idx in start_indices: - end_idx = start_idx + total_len + for start_idx in range(0, n_samples, slide_step): + end_idx = start_idx + total_window_len - # Skip if there's not enough data - if end_idx > len(data): + # Skip if there's not enough data for a full window + if end_idx > n_samples: break - # Get window - window = data[start_idx:end_idx] - # Split window into input and target - x = window[:input_len] - target = window[input_len:end_idx] + input_window = data[start_idx : start_idx + input_len] + target_window = data[start_idx + input_len : end_idx] - X.append(x) - y.append(target) + X.append(input_window) + y.append(target_window) - # Convert to numpy arrays - X = np.array(X) - y = np.array(y) - - return X, y - - -def load_and_split_time_series( - csv_path, - input_len, - pred_len, - slide_step, - train_ratio=0.6, - test_ratio=0.2, - val_ratio=0.2, - selected_columns=None, - date_column='date', - freq='T', -): - """ - Convenience function to load CSV file and preprocess time series data. - - Args: - csv_path (str): Path to CSV file - input_len (int): Length of input sequence - pred_len (int): Length of prediction sequence - slide_step (int): Step size for sliding window - train_ratio (float): Ratio of data to use for training (default: 0.6) - test_ratio (float): Ratio of data to use for testing (default: 0.2) - val_ratio (float): Ratio of data to use for validation (default: 0.2) - selected_columns (list): List of column names to use (default: None, uses all) - date_column (str): Name of the date column (default: 'date') - freq (str): Frequency of the time series data (default: 'T' for minutely) - - Returns: - dict: Dictionary containing processed data including time features - """ - return preprocess_time_series( - csv_path, - input_len, - pred_len, - slide_step, - train_ratio, - test_ratio, - val_ratio, - selected_columns, - date_column, - freq - ) + return np.array(X), np.array(y) def process_and_save_time_series( @@ -248,90 +264,65 @@ def process_and_save_time_series( input_len, pred_len, slide_step, - train_ratio=0.6, - test_ratio=0.2, - val_ratio=0.2, + dataset_name=None, # 新增:数据集名称参数 selected_columns=None, date_column='date', - freq='T', + freq='h', + split_method='auto', + train_ratio=0.7, + val_ratio=0.1, + test_ratio=0.2, + has_time_column=True, # 新增:是否包含时间列 ): """ Process time series data and save it as an NPZ file along with the fitted scaler. + This function now calls the modified preprocess_time_series with flexible split methods. Args: - csv_path (str): Path to CSV file - output_file (str): Path to output NPZ file - input_len (int): Length of input sequence - pred_len (int): Length of prediction sequence - slide_step (int): Step size for sliding window - train_ratio (float): Ratio of data to use for training (default: 0.6) - test_ratio (float): Ratio of data to use for testing (default: 0.2) - val_ratio (float): Ratio of data to use for validation (default: 0.2) - selected_columns (list): List of column names to use (default: None, uses all) - date_column (str): Name of the date column (default: 'date') - freq (str): Frequency of the time series data (default: 'T' for minutely) - - Returns: - dict: Dictionary containing processed data including time features + dataset_name (str): 数据集名称(如 'ETTm1', 'weather'),优先使用此参数 + split_method (str): Data split method - 'auto', 'specific', or 'ratio' + train_ratio (float): Training set ratio (only used when split_method='ratio') + val_ratio (float): Validation set ratio (only used when split_method='ratio') + test_ratio (float): Test set ratio (only used when split_method='ratio') + has_time_column (bool): Whether the dataset has a time column """ - import os - import numpy as np - # Create output directory if it doesn't exist output_dir = os.path.dirname(os.path.abspath(output_file)) os.makedirs(output_dir, exist_ok=True) - # Load and preprocess the time series data - result = load_and_split_time_series( - csv_path=csv_path, + # Extract data file name from path + data_path_name = os.path.basename(csv_path) + + # Load and preprocess the time series data using the new logic + result = preprocess_time_series( + csv_data=csv_path, input_len=input_len, pred_len=pred_len, slide_step=slide_step, - train_ratio=train_ratio, - test_ratio=test_ratio, - val_ratio=val_ratio, + dataset_name=dataset_name, + data_path_name=data_path_name, selected_columns=selected_columns, date_column=date_column, - freq=freq + freq=freq, + split_method=split_method, + train_ratio=train_ratio, + val_ratio=val_ratio, + test_ratio=test_ratio, + has_time_column=has_time_column ) # Extract the processed data - train_x = result['train_x'] - train_y = result['train_y'] - train_x_mark = result['train_x_mark'] - train_y_mark = result['train_y_mark'] - test_x = result['test_x'] - test_y = result['test_y'] - test_x_mark = result['test_x_mark'] - test_y_mark = result['test_y_mark'] - val_x = result['val_x'] - val_y = result['val_y'] - val_x_mark = result['val_x_mark'] - val_y_mark = result['val_y_mark'] - scaler = result['scaler'] + scaler = result.pop('scaler') # Pop scaler to not save it in the npz - # Save the scaler object + # Save the scaler object separately scaler_file = output_file.replace('.npz', '_scaler.gz') joblib.dump(scaler, scaler_file) print(f"Saved scaler to {scaler_file}") - # Save the processed data as .npz file - np.savez( - output_file, - train_x=train_x, - train_y=train_y, - train_x_mark=train_x_mark, - train_y_mark=train_y_mark, - test_x=test_x, - test_y=test_y, - test_x_mark=test_x_mark, - test_y_mark=test_y_mark, - val_x=val_x, - val_y=val_y, - val_x_mark=val_x_mark, - val_y_mark=val_y_mark - ) - + # Save the processed data arrays as .npz file + np.savez(output_file, **result) print(f"Saved processed data to {output_file}") return result + +