Skip to content

Commit e5f1903

Browse files
committed
Error on mixed async/sync
Pull Request resolved: #1626 ghstack-source-id: 317483496 @exported-using-ghexport Differential Revision: [D85090856](https://our.internmc.facebook.com/intern/diff/D85090856/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85090856/)!
1 parent 073b963 commit e5f1903

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

python/monarch/_src/actor/actor_mesh.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,9 @@ def __init__(
11141114
self._inner: "ActorMeshProtocol" = inner
11151115
self._shape = shape
11161116
self._proc_mesh = proc_mesh
1117+
1118+
async_endpoints = []
1119+
sync_endpoints = []
11171120
for attr_name in dir(self._class):
11181121
attr_value = getattr(self._class, attr_name, None)
11191122
if isinstance(attr_value, EndpointProperty):
@@ -1133,6 +1136,17 @@ def __init__(
11331136
attr_value._explicit_response_port,
11341137
),
11351138
)
1139+
if inspect.iscoroutinefunction(attr_value._method):
1140+
async_endpoints.append(attr_name)
1141+
else:
1142+
sync_endpoints.append(attr_name)
1143+
1144+
if sync_endpoints and async_endpoints:
1145+
raise ValueError(
1146+
f"{self._class} mixes both async and sync endpoints."
1147+
"Synchronous endpoints cannot be mixed with async endpoints because they can cause the asyncio loop to deadlock if they wait."
1148+
f"sync: {sync_endpoints} async: {async_endpoints}"
1149+
)
11361150

11371151
def __getattr__(self, attr: str) -> NotAnEndpoint:
11381152
if attr in dir(self._class):

python/tests/test_python_actors.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,14 @@ async def incr(self):
8787
async def value(self) -> int:
8888
return self.v
8989

90+
91+
class SyncCounter(Actor):
92+
def __init__(self, c):
93+
self.c = c
94+
9095
@endpoint
9196
def value_sync_endpoint(self) -> int:
92-
return self.v
97+
return self.c.value.choose().get()
9398

9499

95100
class Indirect(Actor):
@@ -112,7 +117,8 @@ async def test_choose():
112117

113118
assert result == result2
114119

115-
result3 = await v.value_sync_endpoint.choose()
120+
v2 = proc.spawn("sync_counter", SyncCounter, v)
121+
result3 = v2.value_sync_endpoint.choose().get()
116122
assert_type(result, int)
117123
assert result2 == result3
118124

@@ -335,15 +341,15 @@ def __init__(self):
335341
self.local.value = 0
336342

337343
@endpoint
338-
def increment(self):
344+
async def increment(self):
339345
self.local.value += 1
340346

341347
@endpoint
342348
async def increment_async(self):
343349
self.local.value += 1
344350

345351
@endpoint
346-
def get_value(self):
352+
async def get_value(self):
347353
return self.local.value
348354

349355
@endpoint
@@ -408,18 +414,6 @@ async def no_more(self) -> None:
408414
self.should_exit = True
409415

410416

411-
@pytest.mark.timeout(60)
412-
async def test_async_concurrency():
413-
"""Test that async endpoints will be processed concurrently."""
414-
pm = this_host().spawn_procs(per_host={})
415-
am = pm.spawn("async", AsyncActor)
416-
fut = am.sleep.call()
417-
# This call should go through and exit the sleep loop, as long as we are
418-
# actually concurrently processing messages.
419-
await am.no_more.call()
420-
await fut
421-
422-
423417
async def awaitit(f):
424418
return await f
425419

0 commit comments

Comments
 (0)