import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler import joblib from utils.timefeatures import time_features import os def get_ett_dataset_borders(dataset_name, data_len, input_len): """ ETT系列数据集的特定边界处理函数 Args: dataset_name (str): 数据集名称(如 'ETTm1', 'ETTh1') data_len (int): 数据总长度 input_len (int): 输入序列长度 Returns: tuple: (border1s, border2s) 边界点列表 """ if dataset_name.startswith('ETTm'): # ETTm1, ETTm2: 15分钟间隔,每天96个点 border1s = [0, 12 * 30 * 96 - input_len, 12 * 30 * 96 + 4 * 30 * 96 - input_len] border2s = [12 * 30 * 96, 12 * 30 * 96 + 4 * 30 * 96, 12 * 30 * 96 + 8 * 30 * 96] elif dataset_name.startswith('ETTh'): # ETTh1, ETTh2: 小时间隔,每天24个点 border1s = [0, 12 * 30 * 24 - input_len, 12 * 30 * 24 + 4 * 30 * 24 - input_len] border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24] else: raise ValueError(f"Unknown ETT dataset: {dataset_name}") return border1s, border2s # 示例:可以添加其他特定数据集的处理函数 # def get_weather_dataset_borders(dataset_name, data_len, input_len): # """ # Weather数据集的特定边界处理函数 # """ # # 假设weather数据集使用不同的分割策略 # # 比如:前80%训练,中间10%验证,后10%测试 # train_end = int(data_len * 0.8) # val_end = int(data_len * 0.9) # # border1s = [0, train_end - input_len, val_end - input_len] # border2s = [train_end, val_end, data_len] # # return border1s, border2s # 数据集处理函数映射表 DATASET_HANDLERS = { 'ETTm1': get_ett_dataset_borders, 'ETTm2': get_ett_dataset_borders, 'ETTh1': get_ett_dataset_borders, 'ETTh2': get_ett_dataset_borders, # 可以在这里添加更多数据集的处理函数 # 'weather': get_weather_dataset_borders, } def preprocess_time_series( csv_data, input_len, pred_len, slide_step, dataset_name=None, # 新增:数据集名称参数 data_path_name='ETTm1.csv', # 保留向后兼容,但优先使用dataset_name selected_columns=None, date_column='date', freq='h', # 按照分析,原文 ETTm1/ETTh1 实验均使用 'h' split_method='auto', # 'auto', 'specific', 'ratio' train_ratio=0.7, val_ratio=0.1, test_ratio=0.2, has_time_column=True, # 新增:是否包含时间列 ): """ 修改版:根据 TimesNet 原文逻辑预处理时序数据。 1. 支持三种分割方法:auto(自动选择)、specific(特定数据集)、ratio(比例分割) 2. 支持基于数据集名称的特定处理函数调用 3. 滑动窗口的目标 y 长度为 pred_len (按用户要求)。 4. 支持无时间列的数据集处理 Args: csv_data (pd.DataFrame or str): CSV data as DataFrame or path to CSV file input_len (int): Length of input sequence (seq_len in original paper) pred_len (int): Length of prediction sequence slide_step (int): Step size for sliding window dataset_name (str): 数据集名称(如 'ETTm1', 'weather'),优先使用此参数 data_path_name (str): 数据文件名(如 'ETTm1.csv'),向后兼容用 selected_columns (list): List of column names to use (default: None, uses all). date_column (str): Name of the date column (default: 'date'). freq (str): Frequency for time features ('h' for hourly, 't' for minutely). split_method (str): Data split method - 'auto', 'specific', or 'ratio' - 'auto': automatically choose based on dataset_name - 'specific': use dataset-specific split function - 'ratio': use ratio-based split train_ratio (float): Training set ratio (only used when split_method='ratio') val_ratio (float): Validation set ratio (only used when split_method='ratio') test_ratio (float): Test set ratio (only used when split_method='ratio') has_time_column (bool): Whether the dataset has a time column Returns: dict: Dictionary containing processed data. """ # 1. 加载数据 if isinstance(csv_data, str): try: data = pd.read_csv(csv_data) except FileNotFoundError: raise FileNotFoundError(f"CSV file not found: {csv_data}") except Exception as e: raise Exception(f"Error loading CSV file: {e}") else: data = csv_data.copy() # 2. 提取时间特征(仅在有时间列时) if has_time_column and date_column in data.columns: date_index = pd.to_datetime(data[date_column]) if isinstance(date_index, pd.Series): date_index = pd.DatetimeIndex(date_index) time_stamp = time_features(date_index, freq=freq) time_stamp = time_stamp.transpose(1, 0) # Shape: (n_samples, n_time_features) elif has_time_column: raise ValueError(f"Date column '{date_column}' not found in data") else: # 没有时间列,创建空的时间戳数组 time_stamp = None # 3. 选择数据列 if selected_columns is not None: data = data[selected_columns] else: if has_time_column: feature_columns = [col for col in data.columns if col != date_column] else: feature_columns = list(data.columns) data = data[feature_columns] # 4. 【核心修改】根据split_method选择数据集分割方式 # 确定使用的数据集名称 if dataset_name is None: # 向后兼容:从文件路径提取数据集名称 dataset_name = os.path.splitext(data_path_name)[0] if split_method == 'auto': # 自动选择:特定数据集用specific,其他用ratio if dataset_name in DATASET_HANDLERS: split_method = 'specific' else: split_method = 'ratio' if split_method == 'specific': # 使用特定数据集的处理函数 if dataset_name in DATASET_HANDLERS: handler_func = DATASET_HANDLERS[dataset_name] border1s, border2s = handler_func(dataset_name, len(data), input_len) print(f"Using specific split for dataset '{dataset_name}'") else: print(f"Warning: No specific handler for dataset '{dataset_name}'. Falling back to ratio split.") split_method = 'ratio' if split_method == 'ratio': # 使用比例分割数据集 # 验证比例和为1 if abs(train_ratio + val_ratio + test_ratio - 1.0) > 1e-6: raise ValueError(f"Ratios must sum to 1.0, got {train_ratio + val_ratio + test_ratio}") total_len = len(data) num_train = int(total_len * train_ratio) num_val = int(total_len * val_ratio) num_test = total_len - num_train - num_val # 确保所有数据都被使用 border1s = [0, num_train - input_len, num_train + num_val - input_len] border2s = [num_train, num_train + num_val, total_len] print(f"Using ratio split for dataset '{dataset_name}': train={train_ratio:.1%}, val={val_ratio:.1%}, test={test_ratio:.1%}") print(f"Data points: train={num_train}, val={num_val}, test={num_test}") train_data = data.iloc[border1s[0]:border2s[0]].values val_data = data.iloc[border1s[1]:border2s[1]].values test_data = data.iloc[border1s[2]:border2s[2]].values # 处理时间戳(仅在有时间列时) if time_stamp is not None: train_time_stamp = time_stamp[border1s[0]:border2s[0]] val_time_stamp = time_stamp[border1s[1]:border2s[1]] test_time_stamp = time_stamp[border1s[2]:border2s[2]] else: train_time_stamp = None val_time_stamp = None test_time_stamp = None # 5. 归一化 (Fit on training data only) scaler = StandardScaler() scaler.fit(train_data) train_data_scaled = scaler.transform(train_data) val_data_scaled = scaler.transform(val_data) test_data_scaled = scaler.transform(test_data) # 6. 【核心修改】使用您的滑窗逻辑创建样本 train_x, train_y = create_sliding_windows(train_data_scaled, input_len, pred_len, slide_step) val_x, val_y = create_sliding_windows(val_data_scaled, input_len, pred_len, slide_step) test_x, test_y = create_sliding_windows(test_data_scaled, input_len, pred_len, slide_step) # 处理时间标记(仅在有时间列时) if train_time_stamp is not None: train_x_mark, train_y_mark = create_sliding_windows(train_time_stamp, input_len, pred_len, slide_step) val_x_mark, val_y_mark = create_sliding_windows(val_time_stamp, input_len, pred_len, slide_step) test_x_mark, test_y_mark = create_sliding_windows(test_time_stamp, input_len, pred_len, slide_step) else: train_x_mark, train_y_mark = None, None val_x_mark, val_y_mark = None, None test_x_mark, test_y_mark = None, None return { 'train_x': train_x, 'train_y': train_y, 'train_x_mark': train_x_mark, 'train_y_mark': train_y_mark, 'val_x': val_x, 'val_y': val_y, 'val_x_mark': val_x_mark, 'val_y_mark': val_y_mark, 'test_x': test_x, 'test_y': test_y, 'test_x_mark': test_x_mark, 'test_y_mark': test_y_mark, 'scaler': scaler } def create_sliding_windows(data, input_len, pred_len, slide_step): """ Create sliding windows from time series data. Target `y` has length `pred_len`. Args: data (np.ndarray): Time series data (features or time marks) input_len (int): Length of input sequence pred_len (int): Length of prediction sequence slide_step (int): Step size for sliding window Returns: tuple: (X, y) where X is input sequences and y is target sequences """ total_window_len = input_len + pred_len X, y = [], [] n_samples = len(data) for start_idx in range(0, n_samples, slide_step): end_idx = start_idx + total_window_len # Skip if there's not enough data for a full window if end_idx > n_samples: break # Split window into input and target input_window = data[start_idx : start_idx + input_len] target_window = data[start_idx + input_len : end_idx] X.append(input_window) y.append(target_window) return np.array(X), np.array(y) def process_and_save_time_series( csv_path, output_file, input_len, pred_len, slide_step, dataset_name=None, # 新增:数据集名称参数 selected_columns=None, date_column='date', freq='h', split_method='auto', train_ratio=0.7, val_ratio=0.1, test_ratio=0.2, has_time_column=True, # 新增:是否包含时间列 ): """ Process time series data and save it as an NPZ file along with the fitted scaler. This function now calls the modified preprocess_time_series with flexible split methods. Args: dataset_name (str): 数据集名称(如 'ETTm1', 'weather'),优先使用此参数 split_method (str): Data split method - 'auto', 'specific', or 'ratio' train_ratio (float): Training set ratio (only used when split_method='ratio') val_ratio (float): Validation set ratio (only used when split_method='ratio') test_ratio (float): Test set ratio (only used when split_method='ratio') has_time_column (bool): Whether the dataset has a time column """ # Create output directory if it doesn't exist output_dir = os.path.dirname(os.path.abspath(output_file)) os.makedirs(output_dir, exist_ok=True) # Extract data file name from path data_path_name = os.path.basename(csv_path) # Load and preprocess the time series data using the new logic result = preprocess_time_series( csv_data=csv_path, input_len=input_len, pred_len=pred_len, slide_step=slide_step, dataset_name=dataset_name, data_path_name=data_path_name, selected_columns=selected_columns, date_column=date_column, freq=freq, split_method=split_method, train_ratio=train_ratio, val_ratio=val_ratio, test_ratio=test_ratio, has_time_column=has_time_column ) # Extract the processed data scaler = result.pop('scaler') # Pop scaler to not save it in the npz # Save the scaler object separately scaler_file = output_file.replace('.npz', '_scaler.gz') joblib.dump(scaler, scaler_file) print(f"Saved scaler to {scaler_file}") # Save the processed data arrays as .npz file np.savez(output_file, **result) print(f"Saved processed data to {output_file}") return result