From 28e65a6055f0d166f467aa7723bd55f7243818b9 Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Wed, 3 Jul 2024 01:43:13 +0800 Subject: [PATCH 01/10] =?UTF-8?q?feat:=20=E6=8F=92=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/dataset/traffic_impute_dataset.py | 211 ++++++++++++++++++ libcity/evaluator/ImputeEvaluator.py | 124 ++++++++++ libcity/executor/traffic_impute_executor.py | 50 +++++ 3 files changed, 385 insertions(+) create mode 100644 libcity/data/dataset/traffic_impute_dataset.py create mode 100644 libcity/evaluator/ImputeEvaluator.py create mode 100644 libcity/executor/traffic_impute_executor.py diff --git a/libcity/data/dataset/traffic_impute_dataset.py b/libcity/data/dataset/traffic_impute_dataset.py new file mode 100644 index 00000000..fc749c1e --- /dev/null +++ b/libcity/data/dataset/traffic_impute_dataset.py @@ -0,0 +1,211 @@ +import os +import numpy as np + +from libcity.data.utils import generate_dataloader +from libcity.utils import ensure_dir +from libcity.data.dataset import TrafficStateDataset + + +class TrafficImputeDataset(TrafficStateDataset): + + def __init__(self, config): + self.missing_pattern = config.get("missing_pattern", "point") + self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'int'} + self.missing_ratio = config.get("missing_ratio", None) + super().__init__(config) + + def sample_mask(self, shape, p=0.0015, p_noise=0.05, max_seq=1, min_seq=1, rng=None): + if rng is None: + rand = np.random.random + randint = np.random.randint + else: + rand = rng.random + randint = rng.integers + mask = rand(shape) < p + # [samples, len_time, num_nodes, dim] + if self.missing_pattern != "point": + for sample in range(mask.shape[0]): + for col in range(mask.shape[2]): + # 不为0的mask索引 + idxs = np.flatnonzero(mask[sample, :, col, :]) + if not len(idxs): + continue + fault_len = min_seq + if max_seq > min_seq: + fault_len = fault_len + int(randint(max_seq - min_seq)) + # len(idxs) * fault_len + idxs_ext = np.concatenate([np.arange(i, i + fault_len) for i in idxs]) + # 去除其中重复的元素 ,并按元素 由小到大 返回一个新的无元素重复的元组或者列表。 + idxs = np.unique(idxs_ext) + # 截取数组中小于或者大于某值的部分,并使得被截取部分等于固定值。 + idxs = np.clip(idxs, 0, shape[1] - 1) + mask[sample, idxs, col, :] = True + mask = mask | (rand(mask.shape) < p_noise) + return mask.astype('uint8') + + def _generate_input_data(self, df): + """ + 根据全局参数`input_window`和`output_window`切分输入,产生模型需要的张量输入, + 即使用过去`input_window`长度的时间序列去预测未来`output_window`长度的时间序列 + + Args: + df(np.ndarray): 数据数组,shape: (len_time, ..., feature_dim) + + Returns: + tuple: tuple contains: + x(np.ndarray): 模型输入数据,(epoch_size, input_length, ..., feature_dim) \n + y(np.ndarray): 模型输出数据,(epoch_size, output_length, ..., feature_dim) + """ + num_samples = df.shape[0] + # 预测用的过去时间窗口长度 取决于self.input_window + x_offsets = np.sort(np.concatenate((np.arange(-self.input_window + 1, 1, 1),))) + + x, y = [], [] + min_t = abs(min(x_offsets)) + max_t = abs(num_samples) + for t in range(min_t, max_t): + x_t = df[t + x_offsets, ...] + x.append(x_t) + x = np.stack(x, axis=0) + y = np.copy(x) + + return x, y + + def _split_train_val_test(self, x, y): + """ + 划分训练集、测试集、验证集,并缓存数据集 + + Args: + x(np.ndarray): 输入数据 (num_samples, input_length, ..., feature_dim) + y(np.ndarray): 输出数据 (num_samples, input_length, ..., feature_dim) + + Returns: + tuple: tuple contains: + x_train: (num_samples, input_length, ..., feature_dim) \n + y_train: (num_samples, input_length, ..., feature_dim) \n + x_val: (num_samples, input_length, ..., feature_dim) \n + y_val: (num_samples, input_length, ..., feature_dim) \n + x_test: (num_samples, input_length, ..., feature_dim) \n + y_test: (num_samples, input_length, ..., feature_dim) + """ + test_rate = 1 - self.train_rate - self.eval_rate + num_samples = x.shape[0] + num_test = round(num_samples * test_rate) + num_train = round(num_samples * self.train_rate) + num_val = num_samples - num_test - num_train + + SEED = 9101112 + rng = np.random.default_rng(SEED) + + if self.missing_pattern == 'block': + eval_mask = self.sample_mask(shape=x.shape, p=0.0015, p_noise=0.05, min_seq=self.input_window // 2, + max_seq=self.input_window * 4, + rng=rng) + if self.missing_ratio is not None: + eval_mask = self.sample_mask(shape=x.shape, p=self.missing_ratio, + p_noise=0.05, min_seq=self.input_window // 2, + max_seq=self.input_window * 4, + rng=rng) + elif self.missing_pattern == 'point': + eval_mask = self.sample_mask(shape=x.shape, p=0., p_noise=0.25, max_seq=self.input_window // 2, + min_seq=self.input_window * 4, + rng=rng) + # train + x_train, y_train, mask_train = x[:num_train], y[:num_train], eval_mask[:num_train] + # val + x_val, y_val, mask_val = x[num_train: num_train + num_val], y[num_train: num_train + num_val], \ + eval_mask[num_train: num_train + num_val] + # test + x_test, y_test, mask_test = x[-num_test:], y[-num_test:], eval_mask[-num_test:] + self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape)) + self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape)) + self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape)) + + if self.cache_dataset: + ensure_dir(self.cache_file_folder) + np.savez_compressed( + self.cache_file_name, + x_train=x_train, + y_train=y_train, + mask_train=mask_train, + x_test=x_test, + y_test=y_test, + mask_test=mask_test, + x_val=x_val, + y_val=y_val, + mask_val=mask_val, + ) + self._logger.info('Saved at ' + self.cache_file_name) + return x_train, y_train, mask_train, x_val, y_val, mask_val, x_test, y_test, mask_test + + def get_data(self): + """ + 返回数据的DataLoader,包括训练数据、测试数据、验证数据 + + Returns: + tuple: tuple contains: + train_dataloader: Dataloader composed of Batch (class) \n + eval_dataloader: Dataloader composed of Batch (class) \n + test_dataloader: Dataloader composed of Batch (class) + """ + # 加载数据集 + x_train, y_train, mask_train, x_val, y_val, mask_val, x_test, y_test, mask_test = [], [], [], [], [], [], [], [], [] + if self.data is None: + self.data = {} + if self.cache_dataset and os.path.exists(self.cache_file_name): + x_train, y_train, mask_train, x_val, y_val, mask_val, x_test, y_test, mask_test = self._load_cache_train_val_test() + else: + x_train, y_train, mask_train, x_val, y_val, mask_val, x_test, y_test, mask_test = self._generate_train_val_test() + + # 数据归一化 + self.feature_dim = x_train.shape[-1] + self.ext_dim = self.feature_dim - self.output_dim + self.scaler = self._get_scalar(self.scaler_type, + x_train[..., :self.output_dim], y_train[..., :self.output_dim]) + self.ext_scaler = self._get_scalar(self.ext_scaler_type, + x_train[..., self.output_dim:], y_train[..., self.output_dim:]) + x_train[..., :self.output_dim] = self.scaler.transform(x_train[..., :self.output_dim]) + y_train[..., :self.output_dim] = self.scaler.transform(y_train[..., :self.output_dim]) + x_val[..., :self.output_dim] = self.scaler.transform(x_val[..., :self.output_dim]) + y_val[..., :self.output_dim] = self.scaler.transform(y_val[..., :self.output_dim]) + x_test[..., :self.output_dim] = self.scaler.transform(x_test[..., :self.output_dim]) + y_test[..., :self.output_dim] = self.scaler.transform(y_test[..., :self.output_dim]) + if self.normal_external: + x_train[..., self.output_dim:] = self.ext_scaler.transform(x_train[..., self.output_dim:]) + y_train[..., self.output_dim:] = self.ext_scaler.transform(y_train[..., self.output_dim:]) + x_val[..., self.output_dim:] = self.ext_scaler.transform(x_val[..., self.output_dim:]) + y_val[..., self.output_dim:] = self.ext_scaler.transform(y_val[..., self.output_dim:]) + x_test[..., self.output_dim:] = self.ext_scaler.transform(x_test[..., self.output_dim:]) + y_test[..., self.output_dim:] = self.ext_scaler.transform(y_test[..., self.output_dim:]) + # 把训练集的X和y聚合在一起成为list,测试集验证集同理 + # x_train/y_train: (num_samples, input_length, ..., feature_dim) + # train_data(list): train_data[i]是一个元组,由x_train[i]和y_train[i]组成 + x_train = x_train * (1 - mask_train) + x_val = x_val * (1 - mask_val) + x_test = x_test * (1 - mask_test) + train_data = list(zip(x_train, y_train, mask_train)) + eval_data = list(zip(x_val, y_val, mask_val)) + test_data = list(zip(x_test, y_test, mask_test)) + # 转Dataloader + self.train_dataloader, self.eval_dataloader, self.test_dataloader = \ + generate_dataloader(train_data, eval_data, test_data, self.feature_name, + self.batch_size, self.num_workers, pad_with_last_sample=self.pad_with_last_sample) + self.num_batches = len(self.train_dataloader) + return self.train_dataloader, self.eval_dataloader, self.test_dataloader + + def get_data_feature(self): + """ + 返回数据集特征,子类必须实现这个函数,返回必要的特征 + + Returns: + dict: 包含数据集的相关特征的字典 + """ + raise NotImplementedError('Please implement the function `get_data_feature()`.') + + +if __name__ == '__main__': + SEED = 9101112 + rng = np.random.default_rng(SEED) + # sample_mask(shape=(14324, 12, 200, 1), p=0., p_noise=0.25, max_seq=12 // 2, + # min_seq=12 * 4, + # rng=rng) diff --git a/libcity/evaluator/ImputeEvaluator.py b/libcity/evaluator/ImputeEvaluator.py new file mode 100644 index 00000000..dc837bed --- /dev/null +++ b/libcity/evaluator/ImputeEvaluator.py @@ -0,0 +1,124 @@ +import os +import json +import datetime +import pandas as pd +from libcity.utils import ensure_dir +from libcity.model import loss +from logging import getLogger +from libcity.evaluator.abstract_evaluator import AbstractEvaluator + + +class ImputeEvaluator(AbstractEvaluator): + + def __init__(self, config): + self.metrics = config.get('metrics', ['MAE']) # 评估指标, 是一个 list + self.allowed_metrics = ['MAE', 'MSE', 'RMSE', 'MAPE', 'masked_MAE', + 'masked_MSE', 'masked_RMSE', 'masked_MAPE'] + self.save_modes = config.get('save_modes', ['csv', 'json']) + self.config = config + self.result = {} # 每一种指标的结果 + self.intermediate_result = {} # 每一种指标每一个batch的结果 + self._check_config() + self._logger = getLogger() + + def _check_config(self): + if not isinstance(self.metrics, list): + raise TypeError('Evaluator type is not list') + for metric in self.metrics: + if metric not in self.allowed_metrics: + raise ValueError('the metric {} is not allowed in ETAEvaluator'.format(str(metric))) + + def collect(self, batch): + """ + 收集一 batch 的评估输入 + + Args: + batch(dict): 输入数据,字典类型,包含两个Key:(y_true, y_pred): + batch['y_true']: (batch_size, 1) + batch['y_pred']: (batch_size, 1) + """ + if not isinstance(batch, dict): + raise TypeError('evaluator.collect input is not a dict of user') + y_true = batch['y_true'] # tensor + y_pred = batch['y_pred'] # tensor + eval_mask = batch['eval_mask'] + if y_true.shape != y_pred.shape: + raise ValueError("batch['y_true'].shape is not equal to batch['y_pred'].shape") + for metric in self.metrics: + if metric not in self.intermediate_result: + self.intermediate_result[metric] = [] + for metric in self.metrics: + if metric == 'masked_MAE': + self.intermediate_result[metric].append( + loss.masked_mae_torch(y_pred, y_true, 0, eval_mask=eval_mask).item()) + elif metric == 'masked_MSE': + self.intermediate_result[metric].append( + loss.masked_mse_torch(y_pred, y_true, 0, eval_mask=eval_mask).item()) + elif metric == 'masked_RMSE': + self.intermediate_result[metric].append( + loss.masked_rmse_torch(y_pred, y_true, 0, eval_mask=eval_mask).item()) + elif metric == 'masked_MAPE': + self.intermediate_result[metric].append( + loss.masked_mape_torch(y_pred, y_true, 0, eval_mask=eval_mask).item()) + elif metric == 'MAE': + self.intermediate_result[metric].append( + loss.masked_mae_torch(y_pred, y_true, eval_mask=eval_mask).item()) + elif metric == 'MSE': + self.intermediate_result[metric].append( + loss.masked_mse_torch(y_pred, y_true, eval_mask=eval_mask).item()) + elif metric == 'RMSE': + self.intermediate_result[metric].append( + loss.masked_rmse_torch(y_pred, y_true, eval_mask=eval_mask).item()) + elif metric == 'MAPE': + self.intermediate_result[metric].append( + loss.masked_mape_torch(y_pred, y_true, eval_mask=eval_mask).item()) + + def evaluate(self): + """ + 返回之前收集到的所有 batch 的评估结果 + """ + for metric in self.metrics: + self.result[metric] = sum(self.intermediate_result[metric]) / \ + len(self.intermediate_result[metric]) + return self.result + + def save_result(self, save_path, filename=None): + """ + 将评估结果保存到 save_path 文件夹下的 filename 文件中 + + Args: + save_path: 保存路径 + filename: 保存文件名 + """ + self.evaluate() + ensure_dir(save_path) + if filename is None: # 使用时间戳 + filename = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + '_' + \ + self.config['model'] + '_' + self.config['dataset'] + + if 'json' in self.save_modes: + self._logger.info('Evaluate result is ' + json.dumps(self.result)) + with open(os.path.join(save_path, '{}.json'.format(filename)), 'w') as f: + json.dump(self.result, f) + self._logger.info('Evaluate result is saved at ' + + os.path.join(save_path, '{}.json'.format(filename))) + + dataframe = {} + if 'csv' in self.save_modes: + for metric in self.metrics: + dataframe[metric] = [] + for metric in self.metrics: + dataframe[metric].append(self.result[metric]) + dataframe = pd.DataFrame(dataframe, index=range(1, 2)) + dataframe.to_csv(os.path.join(save_path, '{}.csv'.format(filename)), index=False) + self._logger.info('Evaluate result is saved at ' + + os.path.join(save_path, '{}.csv'.format(filename))) + self._logger.info("\n" + str(dataframe)) + return dataframe + + def clear(self): + """ + 清除之前收集到的 batch 的评估信息,适用于每次评估开始时进行一次清空,排除之前的评估输入的影响。 + """ + self.result = {} + self.intermediate_result = {} diff --git a/libcity/executor/traffic_impute_executor.py b/libcity/executor/traffic_impute_executor.py new file mode 100644 index 00000000..47cf6c14 --- /dev/null +++ b/libcity/executor/traffic_impute_executor.py @@ -0,0 +1,50 @@ +import os +import time + +import numpy as np +import torch + +from libcity.executor import TrafficStateExecutor + +class TrafficImputeExecutor(TrafficStateExecutor): + def __init__(self, config, model, data_feature): + super().__init__(config, model, data_feature) + + def evaluate(self, test_dataloader): + """ + use model to test data + + Args: + test_dataloader(torch.Dataloader): Dataloader + """ + self._logger.info('Start evaluating ...') + with torch.no_grad(): + self.model.eval() + # self.evaluator.clear() + y_truths = [] + y_preds = [] + eval_masks = [] + for batch in test_dataloader: + batch.to_tensor(self.device) + output = self.model.predict(batch) + y_true = self._scaler.inverse_transform(batch['y'][..., :self.output_dim]) + eval_mask = batch['mask'] + y_pred = self._scaler.inverse_transform(output[..., :self.output_dim]) + y_truths.append(y_true.cpu().numpy()) + y_preds.append(y_pred.cpu().numpy()) + eval_masks.append(eval_mask.cpu().numpy()) + # evaluate_input = {'y_true': y_true, 'y_pred': y_pred} + # self.evaluator.collect(evaluate_input) + # self.evaluator.save_result(self.evaluate_res_dir) + y_preds = np.concatenate(y_preds, axis=0) + y_truths = np.concatenate(y_truths, axis=0) # concatenate on batch + eval_masks = np.concatenate(eval_masks, axis=0) + outputs = {'prediction': y_preds, 'truth': y_truths} + filename = \ + time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime(time.time())) + '_' \ + + self.config['model'] + '_' + self.config['dataset'] + '_predictions.npz' + np.savez_compressed(os.path.join(self.evaluate_res_dir, filename), **outputs) + self.evaluator.clear() + self.evaluator.collect({'y_true': torch.tensor(y_truths), 'y_pred': torch.tensor(y_preds), 'eval_mask': torch.tensor(eval_masks)}) + test_result = self.evaluator.save_result(self.evaluate_res_dir) + return test_result \ No newline at end of file From 5e1e83a0666ebc56ad5dcd431f0ab988deaac880 Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Wed, 3 Jul 2024 02:33:57 +0800 Subject: [PATCH 02/10] feat: config --- libcity/config/data/TrafficImputeDataset.json | 15 ++++++++ libcity/config/evaluator/ImputeEvaluator.json | 5 +++ .../executor/TrafficImputeExecutor.json | 33 +++++++++++++++++ libcity/data/dataset/__init__.py | 4 ++- .../data/dataset/traffic_impute_dataset.py | 35 +++++++++++++++++-- libcity/evaluator/__init__.py | 2 ++ ...ImputeEvaluator.py => impute_evaluator.py} | 0 libcity/executor/__init__.py | 3 +- libcity/model/loss.py | 16 ++++++--- 9 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 libcity/config/data/TrafficImputeDataset.json create mode 100644 libcity/config/evaluator/ImputeEvaluator.json create mode 100644 libcity/config/executor/TrafficImputeExecutor.json rename libcity/evaluator/{ImputeEvaluator.py => impute_evaluator.py} (100%) diff --git a/libcity/config/data/TrafficImputeDataset.json b/libcity/config/data/TrafficImputeDataset.json new file mode 100644 index 00000000..17454a95 --- /dev/null +++ b/libcity/config/data/TrafficImputeDataset.json @@ -0,0 +1,15 @@ +{ + "batch_size": 64, + "cache_dataset": true, + "num_workers": 0, + "pad_with_last_sample": true, + "train_rate": 0.7, + "eval_rate": 0.1, + "scaler": "none", + "load_external": false, + "normal_external": false, + "ext_scaler": "none", + "input_window": 12, + "add_time_in_day": false, + "add_day_in_week": false +} diff --git a/libcity/config/evaluator/ImputeEvaluator.json b/libcity/config/evaluator/ImputeEvaluator.json new file mode 100644 index 00000000..91d477b1 --- /dev/null +++ b/libcity/config/evaluator/ImputeEvaluator.json @@ -0,0 +1,5 @@ +{ + "metrics": ["MAE", "MAPE", "MSE", "RMSE", "masked_MAE", "masked_MAPE", "masked_MSE", "masked_RMSE"], + "mode": "single", + "save_modes": ["csv"] +} \ No newline at end of file diff --git a/libcity/config/executor/TrafficImputeExecutor.json b/libcity/config/executor/TrafficImputeExecutor.json new file mode 100644 index 00000000..f167506c --- /dev/null +++ b/libcity/config/executor/TrafficImputeExecutor.json @@ -0,0 +1,33 @@ +{ + "gpu": true, + "gpu_id": 0, + "max_epoch": 100, + "train_loss": "none", + "epoch": 0, + "learner": "adam", + "learning_rate": 0.01, + "weight_decay": 0, + "lr_epsilon": 1e-8, + "lr_beta1": 0.9, + "lr_beta2": 0.999, + "lr_alpha": 0.99, + "lr_momentum": 0, + "lr_decay": false, + "lr_scheduler": "multisteplr", + "lr_decay_ratio": 0.1, + "steps": [5, 20, 40, 70], + "step_size": 10, + "lr_T_max": 30, + "lr_eta_min": 0, + "lr_patience": 10, + "lr_threshold": 1e-4, + "clip_grad_norm": false, + "max_grad_norm": 1.0, + "use_early_stop": false, + "patience": 50, + "log_level": "INFO", + "log_every": 1, + "saved_model": true, + "load_best_epoch": true, + "hyper_tune": false +} diff --git a/libcity/data/dataset/__init__.py b/libcity/data/dataset/__init__.py index 04d95781..fc24d4a4 100644 --- a/libcity/data/dataset/__init__.py +++ b/libcity/data/dataset/__init__.py @@ -9,6 +9,7 @@ from libcity.data.dataset.eta_dataset import ETADataset from libcity.data.dataset.map_matching_dataset import MapMatchingDataset from libcity.data.dataset.roadnetwork_dataset import RoadNetWorkDataset +from libcity.data.dataset.traffic_impute_dataset import TrafficImputeDataset __all__ = [ "AbstractDataset", @@ -21,5 +22,6 @@ "TrafficStateGridOdDataset", "ETADataset", "MapMatchingDataset", - "RoadNetWorkDataset" + "RoadNetWorkDataset", + "TrafficImputeDataset", ] diff --git a/libcity/data/dataset/traffic_impute_dataset.py b/libcity/data/dataset/traffic_impute_dataset.py index fc749c1e..07dd9c5e 100644 --- a/libcity/data/dataset/traffic_impute_dataset.py +++ b/libcity/data/dataset/traffic_impute_dataset.py @@ -10,9 +10,35 @@ class TrafficImputeDataset(TrafficStateDataset): def __init__(self, config): self.missing_pattern = config.get("missing_pattern", "point") - self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'int'} self.missing_ratio = config.get("missing_ratio", None) super().__init__(config) + self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'int'} + + def _load_dyna(self, filename): + """ + 加载.dyna文件,格式[dyna_id, type, time, entity_id, properties(若干列)] + 其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载 + + Args: + filename(str): 数据文件名,不包含后缀 + + Returns: + np.ndarray: 数据数组, 3d-array (len_time, num_nodes, feature_dim) + """ + return super()._load_dyna_3d(filename) + + def _add_external_information(self, df, ext_data=None): + """ + 增加外部信息(一周中的星期几/day of week,一天中的某个时刻/time of day,外部数据) + + Args: + df(np.ndarray): 交通状态数据多维数组, (len_time, num_nodes, feature_dim) + ext_data(np.ndarray): 外部数据 + + Returns: + np.ndarray: 融合后的外部数据和交通状态数据, (len_time, num_nodes, feature_dim_plus) + """ + return super()._add_external_information_3d(df, ext_data) def sample_mask(self, shape, p=0.0015, p_noise=0.05, max_seq=1, min_seq=1, rng=None): if rng is None: @@ -195,12 +221,15 @@ def get_data(self): def get_data_feature(self): """ - 返回数据集特征,子类必须实现这个函数,返回必要的特征 + 返回数据集特征,scaler是归一化方法,adj_mx是邻接矩阵,num_nodes是点的个数, + feature_dim是输入数据的维度,output_dim是模型输出的维度 Returns: dict: 包含数据集的相关特征的字典 """ - raise NotImplementedError('Please implement the function `get_data_feature()`.') + return {"scaler": self.scaler, "adj_mx": self.adj_mx, "ext_dim": self.ext_dim, + "num_nodes": self.num_nodes, "feature_dim": self.feature_dim, + "output_dim": self.output_dim, "num_batches": self.num_batches} if __name__ == '__main__': diff --git a/libcity/evaluator/__init__.py b/libcity/evaluator/__init__.py index ae448162..0565e714 100644 --- a/libcity/evaluator/__init__.py +++ b/libcity/evaluator/__init__.py @@ -6,6 +6,7 @@ from libcity.evaluator.road_representation_evaluator import RoadRepresentationEvaluator from libcity.evaluator.eta_evaluator import ETAEvaluator from libcity.evaluator.traffic_accident_evaluator import TrafficAccidentEvaluator +from libcity.evaluator.impute_evaluator import ImputeEvaluator __all__ = [ "TrajLocPredEvaluator", @@ -16,4 +17,5 @@ "RoadRepresentationEvaluator", "ETAEvaluator", "TrafficAccidentEvaluator", + "ImputeEvaluator", ] diff --git a/libcity/evaluator/ImputeEvaluator.py b/libcity/evaluator/impute_evaluator.py similarity index 100% rename from libcity/evaluator/ImputeEvaluator.py rename to libcity/evaluator/impute_evaluator.py diff --git a/libcity/executor/__init__.py b/libcity/executor/__init__.py index fe5f0a01..789ac4b1 100644 --- a/libcity/executor/__init__.py +++ b/libcity/executor/__init__.py @@ -16,7 +16,7 @@ from libcity.executor.sstban_executor import SSTBANExecutor from libcity.executor.testam_executor import TESTAMExecutor from libcity.executor.timemixer_executor import TimeMixerExecutor - +from libcity.executor.traffic_impute_executor import TrafficImputeExecutor __all__ = [ "TrajLocPredExecutor", @@ -37,4 +37,5 @@ "FOGSExecutor", "TESTAMExecutor", "TimeMixerExecutor", + "TrafficImputeExecutor", ] diff --git a/libcity/model/loss.py b/libcity/model/loss.py index 37d7560b..5aac361c 100644 --- a/libcity/model/loss.py +++ b/libcity/model/loss.py @@ -14,7 +14,7 @@ def masked_mae_loss(y_pred, y_true): return loss.mean() -def masked_mae_torch(preds, labels, null_val=np.nan, reduce=True): +def masked_mae_torch(preds, labels, null_val=np.nan, reduce=True, eval_mask=None): labels[torch.abs(labels) < 1e-4] = 0 if np.isnan(null_val): mask = ~torch.isnan(labels) @@ -25,6 +25,8 @@ def masked_mae_torch(preds, labels, null_val=np.nan, reduce=True): mask = torch.where(torch.isnan(mask), torch.zeros_like(mask), mask) loss = torch.abs(torch.sub(preds, labels)) loss = loss * mask + if eval_mask is not None: + loss = loss * eval_mask loss = torch.where(torch.isnan(loss), torch.zeros_like(loss), loss) if reduce: return torch.mean(loss) @@ -54,7 +56,7 @@ def quantile_loss(preds, labels, delta=0.25): return torch.mean(torch.where(condition, large_res, small_res)) -def masked_mape_torch(preds, labels, null_val=np.nan, eps=0): +def masked_mape_torch(preds, labels, null_val=np.nan, eps=0, eval_mask=None): labels[torch.abs(labels) < 1e-4] = 0 if np.isnan(null_val) and eps != 0: loss = torch.abs((preds - labels) / (labels + eps)) @@ -68,11 +70,13 @@ def masked_mape_torch(preds, labels, null_val=np.nan, eps=0): mask = torch.where(torch.isnan(mask), torch.zeros_like(mask), mask) loss = torch.abs((preds - labels) / labels) loss = loss * mask + if eval_mask is not None: + loss = loss * eval_mask loss = torch.where(torch.isnan(loss), torch.zeros_like(loss), loss) return torch.mean(loss) -def masked_mse_torch(preds, labels, null_val=np.nan): +def masked_mse_torch(preds, labels, null_val=np.nan, eval_mask=None): labels[torch.abs(labels) < 1e-4] = 0 if np.isnan(null_val): mask = ~torch.isnan(labels) @@ -83,14 +87,16 @@ def masked_mse_torch(preds, labels, null_val=np.nan): mask = torch.where(torch.isnan(mask), torch.zeros_like(mask), mask) loss = torch.square(torch.sub(preds, labels)) loss = loss * mask + if eval_mask is not None: + loss = loss * eval_mask loss = torch.where(torch.isnan(loss), torch.zeros_like(loss), loss) return torch.mean(loss) -def masked_rmse_torch(preds, labels, null_val=np.nan): +def masked_rmse_torch(preds, labels, null_val=np.nan, eval_mask=None): labels[torch.abs(labels) < 1e-4] = 0 return torch.sqrt(masked_mse_torch(preds=preds, labels=labels, - null_val=null_val)) + null_val=null_val, eval_mask=eval_mask)) def r2_score_torch(preds, labels): From 6cd00585aef89c4a2f4161574fc52f9b2faaf200 Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Sun, 7 Jul 2024 22:50:28 +0800 Subject: [PATCH 03/10] =?UTF-8?q?fix:=20=E5=90=88=E5=B9=B6mask?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libcity/model/loss.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libcity/model/loss.py b/libcity/model/loss.py index 5aac361c..0e20b556 100644 --- a/libcity/model/loss.py +++ b/libcity/model/loss.py @@ -20,13 +20,13 @@ def masked_mae_torch(preds, labels, null_val=np.nan, reduce=True, eval_mask=None mask = ~torch.isnan(labels) else: mask = labels.ne(null_val) + if eval_mask is not None: + mask &= eval_mask mask = mask.float() mask /= torch.mean(mask) mask = torch.where(torch.isnan(mask), torch.zeros_like(mask), mask) loss = torch.abs(torch.sub(preds, labels)) loss = loss * mask - if eval_mask is not None: - loss = loss * eval_mask loss = torch.where(torch.isnan(loss), torch.zeros_like(loss), loss) if reduce: return torch.mean(loss) @@ -65,13 +65,13 @@ def masked_mape_torch(preds, labels, null_val=np.nan, eps=0, eval_mask=None): mask = ~torch.isnan(labels) else: mask = labels.ne(null_val) + if eval_mask is not None: + mask &= eval_mask mask = mask.float() mask /= torch.mean(mask) mask = torch.where(torch.isnan(mask), torch.zeros_like(mask), mask) loss = torch.abs((preds - labels) / labels) loss = loss * mask - if eval_mask is not None: - loss = loss * eval_mask loss = torch.where(torch.isnan(loss), torch.zeros_like(loss), loss) return torch.mean(loss) @@ -82,13 +82,13 @@ def masked_mse_torch(preds, labels, null_val=np.nan, eval_mask=None): mask = ~torch.isnan(labels) else: mask = labels.ne(null_val) + if eval_mask is not None: + mask &= eval_mask mask = mask.float() mask /= torch.mean(mask) mask = torch.where(torch.isnan(mask), torch.zeros_like(mask), mask) loss = torch.square(torch.sub(preds, labels)) loss = loss * mask - if eval_mask is not None: - loss = loss * eval_mask loss = torch.where(torch.isnan(loss), torch.zeros_like(loss), loss) return torch.mean(loss) From 222848a068d09e54c591de4adfee7bc2d23b4b02 Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 00:02:37 +0800 Subject: [PATCH 04/10] =?UTF-8?q?fix:=20=E5=90=88=E5=B9=B6mask?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libcity/data/dataset/traffic_impute_dataset.py | 9 +++++---- libcity/model/loss.py | 3 +++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/libcity/data/dataset/traffic_impute_dataset.py b/libcity/data/dataset/traffic_impute_dataset.py index 07dd9c5e..b4c125ef 100644 --- a/libcity/data/dataset/traffic_impute_dataset.py +++ b/libcity/data/dataset/traffic_impute_dataset.py @@ -12,7 +12,7 @@ def __init__(self, config): self.missing_pattern = config.get("missing_pattern", "point") self.missing_ratio = config.get("missing_ratio", None) super().__init__(config) - self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'int'} + self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'bool'} def _load_dyna(self, filename): """ @@ -186,6 +186,9 @@ def get_data(self): # 数据归一化 self.feature_dim = x_train.shape[-1] self.ext_dim = self.feature_dim - self.output_dim + x_train = x_train * (1 - mask_train) + x_val = x_val * (1 - mask_val) + x_test = x_test * (1 - mask_test) self.scaler = self._get_scalar(self.scaler_type, x_train[..., :self.output_dim], y_train[..., :self.output_dim]) self.ext_scaler = self._get_scalar(self.ext_scaler_type, @@ -206,9 +209,7 @@ def get_data(self): # 把训练集的X和y聚合在一起成为list,测试集验证集同理 # x_train/y_train: (num_samples, input_length, ..., feature_dim) # train_data(list): train_data[i]是一个元组,由x_train[i]和y_train[i]组成 - x_train = x_train * (1 - mask_train) - x_val = x_val * (1 - mask_val) - x_test = x_test * (1 - mask_test) + train_data = list(zip(x_train, y_train, mask_train)) eval_data = list(zip(x_val, y_val, mask_val)) test_data = list(zip(x_test, y_test, mask_test)) diff --git a/libcity/model/loss.py b/libcity/model/loss.py index 0e20b556..abbedd1a 100644 --- a/libcity/model/loss.py +++ b/libcity/model/loss.py @@ -21,6 +21,7 @@ def masked_mae_torch(preds, labels, null_val=np.nan, reduce=True, eval_mask=None else: mask = labels.ne(null_val) if eval_mask is not None: + eval_mask = eval_mask.bool() mask &= eval_mask mask = mask.float() mask /= torch.mean(mask) @@ -66,6 +67,7 @@ def masked_mape_torch(preds, labels, null_val=np.nan, eps=0, eval_mask=None): else: mask = labels.ne(null_val) if eval_mask is not None: + eval_mask = eval_mask.bool() mask &= eval_mask mask = mask.float() mask /= torch.mean(mask) @@ -83,6 +85,7 @@ def masked_mse_torch(preds, labels, null_val=np.nan, eval_mask=None): else: mask = labels.ne(null_val) if eval_mask is not None: + eval_mask = eval_mask.bool() mask &= eval_mask mask = mask.float() mask /= torch.mean(mask) From a85973dacaab757e39cca9a8b3ab38c598c3740f Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 00:34:19 +0800 Subject: [PATCH 05/10] =?UTF-8?q?fix:=20mean=E5=92=8Cstd=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libcity/data/dataset/traffic_state_datatset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libcity/data/dataset/traffic_state_datatset.py b/libcity/data/dataset/traffic_state_datatset.py index d52bdcc5..d22bedd1 100644 --- a/libcity/data/dataset/traffic_state_datatset.py +++ b/libcity/data/dataset/traffic_state_datatset.py @@ -906,7 +906,8 @@ def _get_scalar(self, scaler_type, x_train, y_train): scaler = NormalScaler(maxx=max(x_train.max(), y_train.max())) self._logger.info('NormalScaler max: ' + str(scaler.max)) elif scaler_type == "standard": - scaler = StandardScaler(mean=x_train.mean(), std=x_train.std()) + non_zero_data = x_train[x_train != 0.0] + scaler = StandardScaler(mean=non_zero_data.mean(), std=non_zero_data.std()) self._logger.info('StandardScaler mean: ' + str(scaler.mean) + ', std: ' + str(scaler.std)) elif scaler_type == "minmax01": scaler = MinMax01Scaler( From 1e2b58b184105f7fcdf8f25383e4108db5ed335c Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 00:47:44 +0800 Subject: [PATCH 06/10] =?UTF-8?q?fix:=20mean=E5=92=8Cstd=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libcity/data/dataset/traffic_state_datatset.py | 2 +- libcity/executor/traffic_impute_executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libcity/data/dataset/traffic_state_datatset.py b/libcity/data/dataset/traffic_state_datatset.py index d22bedd1..89137cd5 100644 --- a/libcity/data/dataset/traffic_state_datatset.py +++ b/libcity/data/dataset/traffic_state_datatset.py @@ -906,7 +906,7 @@ def _get_scalar(self, scaler_type, x_train, y_train): scaler = NormalScaler(maxx=max(x_train.max(), y_train.max())) self._logger.info('NormalScaler max: ' + str(scaler.max)) elif scaler_type == "standard": - non_zero_data = x_train[x_train != 0.0] + non_zero_data = x_train[np.abs(x_train) > 1e-4] scaler = StandardScaler(mean=non_zero_data.mean(), std=non_zero_data.std()) self._logger.info('StandardScaler mean: ' + str(scaler.mean) + ', std: ' + str(scaler.std)) elif scaler_type == "minmax01": diff --git a/libcity/executor/traffic_impute_executor.py b/libcity/executor/traffic_impute_executor.py index 47cf6c14..5729aa67 100644 --- a/libcity/executor/traffic_impute_executor.py +++ b/libcity/executor/traffic_impute_executor.py @@ -39,7 +39,7 @@ def evaluate(self, test_dataloader): y_preds = np.concatenate(y_preds, axis=0) y_truths = np.concatenate(y_truths, axis=0) # concatenate on batch eval_masks = np.concatenate(eval_masks, axis=0) - outputs = {'prediction': y_preds, 'truth': y_truths} + outputs = {'prediction': y_preds, 'truth': y_truths, 'eval_mask': eval_masks} filename = \ time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime(time.time())) + '_' \ + self.config['model'] + '_' + self.config['dataset'] + '_predictions.npz' From 66c2de27fef206c921c9122118904e5a32918ef0 Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 01:14:42 +0800 Subject: [PATCH 07/10] fix: cache --- .../data/dataset/traffic_impute_dataset.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/libcity/data/dataset/traffic_impute_dataset.py b/libcity/data/dataset/traffic_impute_dataset.py index b4c125ef..fc56e9fe 100644 --- a/libcity/data/dataset/traffic_impute_dataset.py +++ b/libcity/data/dataset/traffic_impute_dataset.py @@ -97,6 +97,35 @@ def _generate_input_data(self, df): return x, y + def _load_cache_train_val_test(self): + """ + 加载之前缓存好的训练集、测试集、验证集 + + Returns: + tuple: tuple contains: + x_train: (num_samples, input_length, ..., feature_dim) \n + y_train: (num_samples, input_length, ..., feature_dim) \n + x_val: (num_samples, input_length, ..., feature_dim) \n + y_val: (num_samples, input_length, ..., feature_dim) \n + x_test: (num_samples, input_length, ..., feature_dim) \n + y_test: (num_samples, input_length, ..., feature_dim) + """ + self._logger.info('Loading ' + self.cache_file_name) + cat_data = np.load(self.cache_file_name) + x_train = cat_data['x_train'] + y_train = cat_data['y_train'] + mask_train = cat_data['mask_train'] + x_test = cat_data['x_test'] + y_test = cat_data['y_test'] + mask_test = cat_data['mask_test'] + x_val = cat_data['x_val'] + y_val = cat_data['y_val'] + mask_val = cat_data['mask_val'] + self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape)) + self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape)) + self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape)) + return x_train, y_train, mask_train, x_val, y_val, mask_val, x_test, y_test, mask_test + def _split_train_val_test(self, x, y): """ 划分训练集、测试集、验证集,并缓存数据集 From f6070bd61ac9173d1efa0949a529bf42f443c59a Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 01:21:13 +0800 Subject: [PATCH 08/10] fix: cache --- libcity/data/dataset/traffic_impute_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libcity/data/dataset/traffic_impute_dataset.py b/libcity/data/dataset/traffic_impute_dataset.py index fc56e9fe..0dbebf10 100644 --- a/libcity/data/dataset/traffic_impute_dataset.py +++ b/libcity/data/dataset/traffic_impute_dataset.py @@ -12,7 +12,7 @@ def __init__(self, config): self.missing_pattern = config.get("missing_pattern", "point") self.missing_ratio = config.get("missing_ratio", None) super().__init__(config) - self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'bool'} + self.feature_name = {'X': 'float', 'y': 'float', 'mask': 'int'} def _load_dyna(self, filename): """ From f71c82dd26b55a6afb8a5e2a7e147c8f12d69ab7 Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 15:05:13 +0800 Subject: [PATCH 09/10] =?UTF-8?q?fix:=20scalar=E9=A1=BA=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libcity/data/dataset/traffic_impute_dataset.py | 7 +++---- libcity/data/dataset/traffic_state_datatset.py | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/libcity/data/dataset/traffic_impute_dataset.py b/libcity/data/dataset/traffic_impute_dataset.py index 0dbebf10..6173142c 100644 --- a/libcity/data/dataset/traffic_impute_dataset.py +++ b/libcity/data/dataset/traffic_impute_dataset.py @@ -215,13 +215,13 @@ def get_data(self): # 数据归一化 self.feature_dim = x_train.shape[-1] self.ext_dim = self.feature_dim - self.output_dim - x_train = x_train * (1 - mask_train) - x_val = x_val * (1 - mask_val) - x_test = x_test * (1 - mask_test) self.scaler = self._get_scalar(self.scaler_type, x_train[..., :self.output_dim], y_train[..., :self.output_dim]) self.ext_scaler = self._get_scalar(self.ext_scaler_type, x_train[..., self.output_dim:], y_train[..., self.output_dim:]) + x_train[..., :self.output_dim] = x_train[..., :self.output_dim] * (1 - mask_train)[..., :self.output_dim] + x_val[..., :self.output_dim] = x_val[..., :self.output_dim] * (1 - mask_val)[..., :self.output_dim] + x_test[..., :self.output_dim] = x_test[..., :self.output_dim] * (1 - mask_test)[..., :self.output_dim] x_train[..., :self.output_dim] = self.scaler.transform(x_train[..., :self.output_dim]) y_train[..., :self.output_dim] = self.scaler.transform(y_train[..., :self.output_dim]) x_val[..., :self.output_dim] = self.scaler.transform(x_val[..., :self.output_dim]) @@ -238,7 +238,6 @@ def get_data(self): # 把训练集的X和y聚合在一起成为list,测试集验证集同理 # x_train/y_train: (num_samples, input_length, ..., feature_dim) # train_data(list): train_data[i]是一个元组,由x_train[i]和y_train[i]组成 - train_data = list(zip(x_train, y_train, mask_train)) eval_data = list(zip(x_val, y_val, mask_val)) test_data = list(zip(x_test, y_test, mask_test)) diff --git a/libcity/data/dataset/traffic_state_datatset.py b/libcity/data/dataset/traffic_state_datatset.py index 89137cd5..d52bdcc5 100644 --- a/libcity/data/dataset/traffic_state_datatset.py +++ b/libcity/data/dataset/traffic_state_datatset.py @@ -906,8 +906,7 @@ def _get_scalar(self, scaler_type, x_train, y_train): scaler = NormalScaler(maxx=max(x_train.max(), y_train.max())) self._logger.info('NormalScaler max: ' + str(scaler.max)) elif scaler_type == "standard": - non_zero_data = x_train[np.abs(x_train) > 1e-4] - scaler = StandardScaler(mean=non_zero_data.mean(), std=non_zero_data.std()) + scaler = StandardScaler(mean=x_train.mean(), std=x_train.std()) self._logger.info('StandardScaler mean: ' + str(scaler.mean) + ', std: ' + str(scaler.std)) elif scaler_type == "minmax01": scaler = MinMax01Scaler( From 6aad2cdadac13813218e0beb726e1efc8b83710d Mon Sep 17 00:00:00 2001 From: wangyongyao <837826068@qq.com> Date: Mon, 8 Jul 2024 21:46:23 +0800 Subject: [PATCH 10/10] fix: mask --- libcity/executor/traffic_impute_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libcity/executor/traffic_impute_executor.py b/libcity/executor/traffic_impute_executor.py index 5729aa67..71201677 100644 --- a/libcity/executor/traffic_impute_executor.py +++ b/libcity/executor/traffic_impute_executor.py @@ -28,7 +28,7 @@ def evaluate(self, test_dataloader): batch.to_tensor(self.device) output = self.model.predict(batch) y_true = self._scaler.inverse_transform(batch['y'][..., :self.output_dim]) - eval_mask = batch['mask'] + eval_mask = batch['mask'][..., :self.output_dim] y_pred = self._scaler.inverse_transform(output[..., :self.output_dim]) y_truths.append(y_true.cpu().numpy()) y_preds.append(y_pred.cpu().numpy())