-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpettingzooenv.py
274 lines (223 loc) · 9.74 KB
/
pettingzooenv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class PettingZooEnv(MultiAgentEnv):
"""An interface to the PettingZoo MARL environment library.
See: https://github.com/PettingZoo-Team/PettingZoo
Inherits from MultiAgentEnv and exposes a given AEC
(actor-environment-cycle) game from the PettingZoo project via the
MultiAgentEnv public API.
It reduces the class of AEC games to Partially Observable Markov (POM)
games by imposing the following important restrictions onto an AEC
environment:
1. Each agent steps in order specified in agents list (unless they are
done, in which case, they should be skipped).
2. Agents act simultaneously (-> No hard-turn games like chess).
3. All agents have the same action_spaces and observation_spaces.
Note: If, within your aec game, agents do not have homogeneous action /
observation spaces, apply SuperSuit wrappers
to apply padding functionality: https://github.com/PettingZoo-Team/
SuperSuit#built-in-multi-agent-only-functions
4. Environments are positive sum games (-> Agents are expected to cooperate
to maximize reward). This isn't a hard restriction, it just that
standard algorithms aren't expected to work well in highly competitive
games.
Examples:
>>> from pettingzoo.gamma import prison_v0
>>> env = POMGameEnv(env_creator=prison_v0})
>>> obs = env.reset()
>>> print(obs)
{
"0": [110, 119],
"1": [105, 102],
"2": [99, 95],
}
>>> obs, rewards, dones, infos = env.step(
action_dict={
"0": 1, "1": 0, "2": 2,
})
>>> print(rewards)
{
"0": 0,
"1": 1,
"2": 0,
}
>>> print(dones)
{
"0": False, # agent 0 is still running
"1": True, # agent 1 is done
"__all__": False, # the env is not done
}
>>> print(infos)
{
"0": {}, # info for agent 0
"1": {}, # info for agent 1
}
"""
def __init__(self, env):
"""
Parameters:
-----------
env: AECenv object.
"""
self.aec_env = env
# agent idx list
self.agents = self.aec_env.agents
# Get dictionaries of obs_spaces and act_spaces
self.observation_spaces = self.aec_env.observation_spaces
self.action_spaces = self.aec_env.action_spaces
# Get first observation space, assuming all agents have equal space
self.observation_space = self.observation_spaces[self.agents[0]]
# Get first action space, assuming all agents have equal space
self.action_space = self.action_spaces[self.agents[0]]
assert all(obs_space == self.observation_space
for obs_space
in self.aec_env.observation_spaces.values()), \
"Observation spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_observations wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_observations(env)`"
assert all(act_space == self.action_space
for act_space in self.aec_env.action_spaces.values()), \
"Action spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_action_space wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_action_space(env)`"
self.rewards = {}
self.dones = {}
self.obs = {}
self.infos = {}
_ = self.reset()
def _init_dicts(self):
# initialize with zero
self.rewards = dict(zip(self.agents, [0 for _ in self.agents]))
# initialize with False
self.dones = dict(zip(self.agents, [False for _ in self.agents]))
self.dones["__all__"] = False
# initialize with None info object
self.infos = dict(zip(self.agents, [{} for _ in self.agents]))
# initialize empty observations
self.obs = dict(zip(self.agents, [None for _ in self.agents]))
def reset(self):
"""
Resets the env and returns observations from ready agents.
Returns:
obs (dict): New observations for each ready agent.
"""
# 1. Reset environment; agent pointer points to first agent.
self.aec_env.reset()
# 2. Copy agents from environment
self.agents = self.aec_env.agents
# 3. Reset dictionaries
self._init_dicts()
# 4. Get initial observations
for agent in self.agents:
# For each agent get initial observations
self.obs[agent] = self.aec_env.observe(agent)
return self.obs
def step(self, action_dict):
"""
Executes input actions from RL agents and returns observations from
environment agents.
The returns are dicts mapping from agent_id strings to values. The
number of agents in the env can vary over time.
Returns
-------
obs (dict): New observations for each ready agent.
rewards (dict): Reward values for each ready agent. If the
episode is just started, the value will be None.
dones (dict): Done values for each ready agent. The special key
"__all__" (required) is used to indicate env termination.
infos (dict): Optional info values for each agent id.
"""
act_dict_copy = dict(**action_dict)
for agent in self.aec_env.agents:
if agent not in action_dict:
action_dict[agent] = self.action_space.sample()
stepped_agents = set()
while (self.aec_env.agent_selection not in stepped_agents and self.aec_env.dones[self.aec_env.agent_selection]):
agent = self.aec_env.agent_selection
self.aec_env.step(None)
stepped_agents.add(agent)
stepped_agents = set()
# print(action_dict)
while (self.aec_env.agent_selection not in stepped_agents):
agent = self.aec_env.agent_selection
assert agent in action_dict or self.aec_env.dones[agent], \
"Live environment agent is not in actions dictionary"
self.aec_env.step(action_dict[agent])
stepped_agents.add(agent)
# print(self.aec_env.dones)
# print(stepped_agents)
assert all(agent in stepped_agents or self.aec_env.dones[agent]
for agent in action_dict), \
"environment has a nontrivial ordering, and cannot be used with"\
" the POMGameEnv wrapper"
self.obs = {}
self.rewards = {}
self.dones = {}
self.infos = {}
# update self.agents
live_agents = list(act_dict_copy.keys())
for agent in live_agents:
self.obs[agent] = self.aec_env.observe(agent)
self.dones[agent] = self.aec_env.dones[agent]
self.rewards[agent] = self.aec_env.rewards[agent]
self.infos[agent] = self.aec_env.infos[agent]
self.dones["__all__"] = all(self.aec_env.dones.values())
return self.obs, self.rewards, self.dones, self.infos
def render(self, mode="human"):
return self.aec_env.render(mode=mode)
def close(self):
self.aec_env.close()
def seed(self, seed=None):
self.aec_env.seed(seed)
def with_agent_groups(self, groups, obs_space=None, act_space=None):
raise NotImplementedError
class ParallelPettingZooEnv(MultiAgentEnv):
def __init__(self, env):
self.par_env = env
# agent idx list
self.agents = self.par_env.agents
# Get dictionaries of obs_spaces and act_spaces
self.observation_spaces = self.par_env.observation_spaces
self.action_spaces = self.par_env.action_spaces
# Get first observation space, assuming all agents have equal space
self.observation_space = self.observation_spaces[self.agents[0]]
# Get first action space, assuming all agents have equal space
self.action_space = self.action_spaces[self.agents[0]]
assert all(obs_space == self.observation_space
for obs_space
in self.par_env.observation_spaces.values()), \
"Observation spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_observations wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_observations(env)`"
assert all(act_space == self.action_space
for act_space in self.par_env.action_spaces.values()), \
"Action spaces for all agents must be identical. Perhaps " \
"SuperSuit's pad_action_space wrapper can help (useage: " \
"`supersuit.aec_wrappers.pad_action_space(env)`"
self.reset()
def reset(self):
return self.par_env.reset()
def step(self, action_dict):
for agent in self.agents:
if agent not in action_dict:
action_dict[agent] = self.action_space.sample()
aobs, arew, adones, ainfo = self.par_env.step(action_dict)
obss = {}
rews = {}
dones = {}
infos = {}
for agent in action_dict:
obss[agent] = aobs[agent]
rews[agent] = arew[agent]
dones[agent] = adones[agent]
infos[agent] = ainfo[agent]
self.rewards = rews
self.dones = dones
self.infos = infos
dones["__all__"] = all(adones.values())
return obss, rews, dones, infos
def close(self):
self.par_env.close()
def seed(self, seed=None):
self.par_env.seed(seed)
def render(self, mode="human"):
return self.par_env.render(mode)