-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
101 lines (87 loc) · 2.98 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import random
from collections import deque, namedtuple
import numpy as np
import torch
class ReplayBuffer:
"""Fixed-size buffer to store experience tuples."""
def __init__(self, action_size, buffer_size, batch_size, seed, device):
"""Initialize a ReplayBuffer object.
Params
======
action_size (int): dimension of each action
buffer_size (int): maximum size of buffer
batch_size (int): size of each training batch
seed (int): random seed
"""
self.action_size = action_size
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.experience = namedtuple(
"Experience",
field_names=["state", "action", "reward", "next_state", "done"],
)
self.seed = random.seed(seed)
self.device = device
def add(self, state, action, reward, next_state, done):
"""Add a new experience to memory."""
e = self.experience(state, action, reward, next_state, done)
self.memory.append(e)
def sample(self):
"""Randomly sample a batch of experiences from memory."""
experiences = random.sample(self.memory, k=self.batch_size)
states = (
torch.from_numpy(np.vstack([e.state for e in experiences if e is not None]))
.float()
.to(self.device)
)
actions = (
torch.from_numpy(
np.vstack([e.action for e in experiences if e is not None])
)
.float()
.to(self.device)
)
rewards = (
torch.from_numpy(
np.vstack([e.reward for e in experiences if e is not None])
)
.float()
.to(self.device)
)
next_states = (
torch.from_numpy(
np.vstack([e.next_state for e in experiences if e is not None])
)
.float()
.to(self.device)
)
dones = (
torch.from_numpy(
np.vstack([e.done for e in experiences if e is not None]).astype(
np.uint8
)
)
.float()
.to(self.device)
)
return (states, actions, rewards, next_states, dones)
def __len__(self):
"""Return the current size of internal memory."""
return len(self.memory)
class OrnsteinUhlenbeckProcess:
def __init__(self, size, seed, mu=0, std=0.2, theta=0.15, dt=1):
self.theta = theta
self.mu = mu * np.ones(size)
self.std = std
self.dt = dt
self.size = size
self.seed = random.seed(seed)
self.reset()
def sample(self):
dx = self.theta * (self.mu - self.x_prev) * self.dt + self.std * np.sqrt(
self.dt
) * np.random.standard_normal(self.size)
self.x_prev = self.x_prev + dx
return self.x_prev
def reset(self):
self.x_prev = self.mu * np.ones(self.size)