diff --git a/alf/algorithms/data_transformer.py b/alf/algorithms/data_transformer.py index b7e783abd..05ce47ac3 100644 --- a/alf/algorithms/data_transformer.py +++ b/alf/algorithms/data_transformer.py @@ -137,13 +137,18 @@ def __init__(self, data_transformer_ctors, observation_spec): @staticmethod def _validate_order(data_transformers): + # Hindsight should probably not be used together with FrameStacker, + # unless done really carefully. Hindsight after FrameStacker is + # simply wrong, because Hindsight would read ``achieved_goal`` field + # of a future step directly from the replay buffer without stacking. def _tier_of(data_transformer): if isinstance(data_transformer, UntransformedTimeStep): return 1 - if isinstance(data_transformer, - (HindsightExperienceTransformer, FrameStacker)): + if isinstance(data_transformer, HindsightExperienceTransformer): return 2 - return 3 + if isinstance(data_transformer, FrameStacker): + return 3 + return 4 prev_tier = 0 for i in range(len(data_transformers)): diff --git a/alf/algorithms/rl_algorithm.py b/alf/algorithms/rl_algorithm.py index c0d8e2a41..760dfdda4 100644 --- a/alf/algorithms/rl_algorithm.py +++ b/alf/algorithms/rl_algorithm.py @@ -223,12 +223,13 @@ def __init__(self, replay_buffer_length = adjust_replay_buffer_length( config, self._num_earliest_frames_ignored) + total_replay_size = replay_buffer_length * self._env.batch_size if config.whole_replay_buffer_training and config.clear_replay_buffer: # For whole replay buffer training, we would like to be sure # that the replay buffer have enough samples in it to perform # the training, which will most likely happen in the 2nd # iteration. The minimum_initial_collect_steps guarantees that. - minimum_initial_collect_steps = replay_buffer_length * self._env.batch_size + minimum_initial_collect_steps = total_replay_size if config.initial_collect_steps < minimum_initial_collect_steps: common.info( 'Set the initial_collect_steps to minimum required ' @@ -236,6 +237,9 @@ def __init__(self, 'whole_replay_buffer_training is on.') config.initial_collect_steps = minimum_initial_collect_steps + assert config.initial_collect_steps <= total_replay_size, \ + "Training will not happen - insufficient replay buffer size." + self.set_replay_buffer(self._env.batch_size, replay_buffer_length, config.priority_replay) diff --git a/alf/config_util.py b/alf/config_util.py index 6eccaf5fe..2c69a0103 100644 --- a/alf/config_util.py +++ b/alf/config_util.py @@ -297,6 +297,8 @@ def config1(config_name, value, mutable=True, raise_if_used=True): config_node = _get_config_node(config_name) if raise_if_used and config_node.is_used(): + # Log error because pre_config catches and silences the ValueError. + logging.error("Config '%s' used before configured." % config_name) raise ValueError( "Config '%s' has already been used. You should config " "its value before using it." % config_name) diff --git a/alf/data_structures.py b/alf/data_structures.py index 9cd820990..7f72b6c55 100644 --- a/alf/data_structures.py +++ b/alf/data_structures.py @@ -279,7 +279,8 @@ def _generate_time_step(batched, if env_id is None: env_id = md.arange(batch_size, dtype=md.int32) if reward is not None: - assert reward.shape[:1] == outer_dims + assert reward.shape[:1] == outer_dims, "%s, %s" % (reward.shape, + outer_dims) if prev_action is not None: flat_action = nest.flatten(prev_action) assert flat_action[0].shape[:1] == outer_dims diff --git a/alf/networks/critic_networks.py b/alf/networks/critic_networks.py index 72a910cc2..6978188a4 100644 --- a/alf/networks/critic_networks.py +++ b/alf/networks/critic_networks.py @@ -77,6 +77,7 @@ def __init__(self, joint_fc_layer_params=None, activation=torch.relu_, kernel_initializer=None, + last_bias_init_value=0.0, use_fc_bn=False, use_naive_parallel_network=False, name="CriticNetwork"): @@ -174,7 +175,8 @@ def __init__(self, last_activation=math_ops.identity, use_fc_bn=use_fc_bn, last_kernel_initializer=last_kernel_initializer, - name=name) + last_bias_init_value=last_bias_init_value, + name=name + ".joint_encoder") self._use_naive_parallel_network = use_naive_parallel_network def make_parallel(self, n): diff --git a/alf/networks/encoding_networks.py b/alf/networks/encoding_networks.py index 479401cc0..42329a7b4 100644 --- a/alf/networks/encoding_networks.py +++ b/alf/networks/encoding_networks.py @@ -405,6 +405,7 @@ def __init__(self, last_layer_size=None, last_activation=None, last_kernel_initializer=None, + last_bias_init_value=0.0, last_use_fc_bn=False, name="EncodingNetwork"): """ @@ -540,7 +541,8 @@ def __init__(self, last_layer_size, activation=last_activation, use_bn=last_use_fc_bn, - kernel_initializer=last_kernel_initializer)) + kernel_initializer=last_kernel_initializer, + bias_init_value=last_bias_init_value)) input_size = last_layer_size if output_tensor_spec is not None: diff --git a/alf/trainers/policy_trainer.py b/alf/trainers/policy_trainer.py index 8a9e078ac..2e9c27dc7 100644 --- a/alf/trainers/policy_trainer.py +++ b/alf/trainers/policy_trainer.py @@ -498,6 +498,7 @@ def __init__(self, config: TrainerConfig, ddp_rank: int = -1): logging.info( "observation_spec=%s" % pprint.pformat(env.observation_spec())) logging.info("action_spec=%s" % pprint.pformat(env.action_spec())) + logging.info("reward_spec=%s" % pprint.pformat(env.reward_spec())) # for offline buffer construction untransformed_observation_spec = env.observation_spec() diff --git a/alf/utils/data_buffer_test.py b/alf/utils/data_buffer_test.py index 28acc99ba..c1567bff2 100644 --- a/alf/utils/data_buffer_test.py +++ b/alf/utils/data_buffer_test.py @@ -29,7 +29,7 @@ DataItem = alf.data_structures.namedtuple( "DataItem", [ "env_id", "x", "o", "reward", "step_type", "batch_info", - "replay_buffer", "rollout_info_field" + "replay_buffer", "rollout_info_field", "discount" ], default_value=()) @@ -40,12 +40,20 @@ def get_batch(env_ids, dim, t, x): batch_size = len(env_ids) x = torch.as_tensor(x, dtype=torch.float32, device="cpu") t = torch.as_tensor(t, dtype=torch.int32, device="cpu") - ox = (x * torch.arange( - batch_size, dtype=torch.float32, requires_grad=True, - device="cpu").unsqueeze(1) * torch.arange( - dim, dtype=torch.float32, requires_grad=True, - device="cpu").unsqueeze(0)) - a = x * torch.ones(batch_size, dtype=torch.float32, device="cpu") + + # We allow x and t inputs to be scalars, which will be expanded to be + # consistent with the batch_size. + + def _need_to_expand(x): + return not (batch_size > 1 and x.ndim > 0 and batch_size == x.shape[0]) + + if _need_to_expand(x): + a = x * torch.ones(batch_size, dtype=torch.float32, device="cpu") + else: + a = x + if _need_to_expand(t): + t = t * torch.ones(batch_size, dtype=torch.int32, device="cpu") + ox = a.unsqueeze(1).clone().requires_grad_(True) g = torch.zeros(batch_size, dtype=torch.float32, device="cpu") # reward function adapted from ReplayBuffer: default_reward_fn r = torch.where( @@ -60,6 +68,10 @@ def get_batch(env_ids, dim, t, x): "a": a, "g": g }), + discount=torch.tensor( + t != alf.data_structures.StepType.LAST, + dtype=torch.float32, + device="cpu"), reward=r) @@ -79,6 +91,7 @@ def __init__(self, *args): "a": alf.TensorSpec(shape=(), dtype=torch.float32), "g": alf.TensorSpec(shape=(), dtype=torch.float32) }), + discount=alf.TensorSpec(shape=(), dtype=torch.float32), reward=alf.TensorSpec(shape=(), dtype=torch.float32)) @parameterized.named_parameters([ diff --git a/alf/utils/external_configurables.py b/alf/utils/external_configurables.py index 5a00b7e2e..e9e6cda84 100644 --- a/alf/utils/external_configurables.py +++ b/alf/utils/external_configurables.py @@ -46,3 +46,5 @@ gin.external_configurable(torch.nn.init.xavier_normal_, 'torch.nn.init.xavier_normal_') +gin.external_configurable(torch.nn.Embedding, 'torch.nn.Embedding') +gin.external_configurable(torch.nn.Sequential, 'torch.nn.Sequential') diff --git a/alf/utils/normalizers.py b/alf/utils/normalizers.py index 9ef8e2ca0..9f01e7560 100644 --- a/alf/utils/normalizers.py +++ b/alf/utils/normalizers.py @@ -138,7 +138,11 @@ def _summary(name, val): def _summarize_all(path, t, m2, m): if path: path += "." - spec = TensorSpec.from_tensor(m if m2 is None else m2) + if m2 is not None: + spec = TensorSpec.from_tensor(m2) + else: + assert m is not None + spec = TensorSpec.from_tensor(m) _summary(path + "tensor.batch_min", _reduce_along_batch_dims(t, spec, torch.min)) _summary(path + "tensor.batch_max",