Skip to content

Commit 126825d

Browse files
StarJourneyMingsingmingsingacroca
authored
Wait for the Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking, ensuring the asyncio gRPC stream can close properly. (#839)
* wait for Dapr health check asynchronously Switch Dapr health check from blocking call to async call to avoid blocking the event loop in async environments Signed-off-by: mingsing <[email protected]> * add StatusCode.UNKNOWN branch Signed-off-by: mingsing <[email protected]> * aio dapr health Signed-off-by: mingsing <[email protected]> * add healthcheck test Signed-off-by: mingsing <[email protected]> * ruff pass Signed-off-by: mingsing <[email protected]> * fix async health check Signed-off-by: mingsing <[email protected]> * use aiohttp Signed-off-by: mingsing <[email protected]> * use aiohttp for asynchronous health check Signed-off-by: mingsing <[email protected]> * remove deprecated wait_until_ready in async DaprHealth Signed-off-by: mingsing <[email protected]> * rm DaprHealth.get_ssl_context in test_dapr_grpc_client_async_secure Signed-off-by: mingsing <[email protected]> * format Signed-off-by: mingsing <[email protected]> * Revert "rm DaprHealth.get_ssl_context in test_dapr_grpc_client_async_secure" Signed-off-by: mingsing <[email protected]> * ruff check Signed-off-by: mingsing <[email protected]> --------- Signed-off-by: mingsing <[email protected]> Co-authored-by: mingsing <[email protected]> Co-authored-by: Albert Callarisa <[email protected]>
1 parent 7657688 commit 126825d

File tree

6 files changed

+262
-4
lines changed

6 files changed

+262
-4
lines changed

dapr/aio/clients/grpc/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1906,7 +1906,7 @@ async def wait(self, timeout_s: float):
19061906
remaining = (start + timeout_s) - time.time()
19071907
if remaining < 0:
19081908
raise e
1909-
asyncio.sleep(min(1, remaining))
1909+
await asyncio.sleep(min(1, remaining))
19101910

19111911
async def get_metadata(self) -> GetMetadataResponse:
19121912
"""Returns information about the sidecar allowing for runtime

dapr/aio/clients/grpc/subscription.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from grpc import StatusCode
44
from grpc.aio import AioRpcError
55

6+
from dapr.aio.clients.health import DaprHealth
67
from dapr.clients.grpc._response import TopicEventResponse
7-
from dapr.clients.health import DaprHealth
88
from dapr.common.pubsub.subscription import (
99
StreamCancelledError,
1010
StreamInactiveError,
@@ -52,7 +52,7 @@ async def outgoing_request_iterator():
5252

5353
async def reconnect_stream(self):
5454
await self.close()
55-
DaprHealth.wait_for_sidecar()
55+
await DaprHealth.wait_for_sidecar()
5656
print('Attempting to reconnect...')
5757
await self.start()
5858

@@ -67,7 +67,7 @@ async def next_message(self):
6767
return None
6868
return SubscriptionMessage(message.event_message)
6969
except AioRpcError as e:
70-
if e.code() == StatusCode.UNAVAILABLE:
70+
if e.code() == StatusCode.UNAVAILABLE or e.code() == StatusCode.UNKNOWN:
7171
print(
7272
f'gRPC error while reading from stream: {e.details()}, '
7373
f'Status Code: {e.code()}. '

dapr/aio/clients/health.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2024 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
import asyncio
17+
import time
18+
19+
import aiohttp
20+
21+
from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, DAPR_USER_AGENT, USER_AGENT_HEADER
22+
from dapr.clients.http.helpers import get_api_url
23+
from dapr.conf import settings
24+
25+
26+
class DaprHealth:
27+
@staticmethod
28+
async def wait_for_sidecar():
29+
health_url = f'{get_api_url()}/healthz/outbound'
30+
headers = {USER_AGENT_HEADER: DAPR_USER_AGENT}
31+
if settings.DAPR_API_TOKEN is not None:
32+
headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
33+
timeout = float(settings.DAPR_HEALTH_TIMEOUT)
34+
35+
start = time.time()
36+
ssl_context = DaprHealth.get_ssl_context()
37+
38+
connector = aiohttp.TCPConnector(ssl=ssl_context)
39+
async with aiohttp.ClientSession(connector=connector) as session:
40+
while True:
41+
try:
42+
async with session.get(health_url, headers=headers) as response:
43+
if 200 <= response.status < 300:
44+
break
45+
except aiohttp.ClientError as e:
46+
print(f'Health check on {health_url} failed: {e}')
47+
except Exception as e:
48+
print(f'Unexpected error during health check: {e}')
49+
50+
remaining = (start + timeout) - time.time()
51+
if remaining <= 0:
52+
raise TimeoutError(f'Dapr health check timed out, after {timeout}.')
53+
await asyncio.sleep(min(1, remaining))
54+
55+
@staticmethod
56+
def get_ssl_context():
57+
# This method is used (overwritten) from tests
58+
# to return context for self-signed certificates
59+
return None

tests/clients/test_dapr_grpc_client_async_secure.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from unittest.mock import patch
1818

1919
from dapr.aio.clients.grpc.client import DaprGrpcClientAsync
20+
from dapr.aio.clients.health import DaprHealth as DaprHealthAsync
2021
from dapr.clients.health import DaprHealth
2122
from dapr.conf import settings
2223
from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context
@@ -25,6 +26,7 @@
2526
from .fake_dapr_server import FakeDaprSidecar
2627

2728
DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func
29+
DaprHealthAsync.get_ssl_context = replacement_get_health_context
2830
DaprHealth.get_ssl_context = replacement_get_health_context
2931

3032

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2025 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
import asyncio
17+
import time
18+
import unittest
19+
from unittest.mock import AsyncMock, MagicMock, patch
20+
21+
from dapr.aio.clients.health import DaprHealth
22+
from dapr.conf import settings
23+
from dapr.version import __version__
24+
25+
26+
class DaprHealthCheckAsyncTests(unittest.IsolatedAsyncioTestCase):
27+
@patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500')
28+
@patch('aiohttp.ClientSession.get')
29+
async def test_wait_for_sidecar_success(self, mock_get):
30+
# Create mock response
31+
mock_response = MagicMock()
32+
mock_response.status = 200
33+
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
34+
mock_response.__aexit__ = AsyncMock(return_value=None)
35+
mock_get.return_value = mock_response
36+
37+
try:
38+
await DaprHealth.wait_for_sidecar()
39+
except Exception as e:
40+
self.fail(f'wait_for_sidecar() raised an exception unexpectedly: {e}')
41+
42+
mock_get.assert_called_once()
43+
44+
# Check URL
45+
called_url = mock_get.call_args[0][0]
46+
self.assertEqual(called_url, 'http://domain.com:3500/v1.0/healthz/outbound')
47+
48+
# Check headers are properly set
49+
headers = mock_get.call_args[1]['headers']
50+
self.assertIn('User-Agent', headers)
51+
self.assertEqual(headers['User-Agent'], f'dapr-sdk-python/{__version__}')
52+
53+
@patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500')
54+
@patch.object(settings, 'DAPR_API_TOKEN', 'mytoken')
55+
@patch('aiohttp.ClientSession.get')
56+
async def test_wait_for_sidecar_success_with_api_token(self, mock_get):
57+
# Create mock response
58+
mock_response = MagicMock()
59+
mock_response.status = 200
60+
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
61+
mock_response.__aexit__ = AsyncMock(return_value=None)
62+
mock_get.return_value = mock_response
63+
64+
try:
65+
await DaprHealth.wait_for_sidecar()
66+
except Exception as e:
67+
self.fail(f'wait_for_sidecar() raised an exception unexpectedly: {e}')
68+
69+
mock_get.assert_called_once()
70+
71+
# Check headers are properly set
72+
headers = mock_get.call_args[1]['headers']
73+
self.assertIn('User-Agent', headers)
74+
self.assertEqual(headers['User-Agent'], f'dapr-sdk-python/{__version__}')
75+
self.assertIn('dapr-api-token', headers)
76+
self.assertEqual(headers['dapr-api-token'], 'mytoken')
77+
78+
@patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5')
79+
@patch('aiohttp.ClientSession.get')
80+
async def test_wait_for_sidecar_timeout(self, mock_get):
81+
# Create mock response that always returns 500
82+
mock_response = MagicMock()
83+
mock_response.status = 500
84+
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
85+
mock_response.__aexit__ = AsyncMock(return_value=None)
86+
mock_get.return_value = mock_response
87+
88+
start = time.time()
89+
90+
with self.assertRaises(TimeoutError):
91+
await DaprHealth.wait_for_sidecar()
92+
93+
self.assertGreaterEqual(time.time() - start, 2.5)
94+
self.assertGreater(mock_get.call_count, 1)
95+
96+
@patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500')
97+
@patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '5.0')
98+
@patch('aiohttp.ClientSession.get')
99+
async def test_health_check_does_not_block(self, mock_get):
100+
"""Test that health check doesn't block other async tasks from running"""
101+
# Mock health check to retry several times before succeeding
102+
call_count = [0] # Use list to allow modification in nested function
103+
104+
def side_effect(*args, **kwargs):
105+
call_count[0] += 1
106+
# First 2 calls fail with ClientError, then succeed
107+
# This will cause ~2 seconds of retries (1 second sleep after each failure)
108+
if call_count[0] <= 2:
109+
import aiohttp
110+
111+
raise aiohttp.ClientError('Connection refused')
112+
else:
113+
mock_response = MagicMock()
114+
mock_response.status = 200
115+
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
116+
mock_response.__aexit__ = AsyncMock(return_value=None)
117+
return mock_response
118+
119+
mock_get.side_effect = side_effect
120+
121+
# Counter that will be incremented by background task
122+
counter = [0] # Use list to allow modification in nested function
123+
is_running = [True]
124+
125+
async def increment_counter():
126+
"""Background task that increments counter every 0.5 seconds"""
127+
while is_running[0]:
128+
await asyncio.sleep(0.5)
129+
counter[0] += 1
130+
131+
# Start the background task
132+
counter_task = asyncio.create_task(increment_counter())
133+
134+
try:
135+
# Run health check (will take ~2 seconds with retries)
136+
await DaprHealth.wait_for_sidecar()
137+
138+
# Stop the background task
139+
is_running[0] = False
140+
await asyncio.sleep(0.1) # Give it time to finish current iteration
141+
142+
# Verify the counter was incremented during health check
143+
# In 2 seconds with 0.5s intervals, we expect at least 3 increments
144+
self.assertGreaterEqual(
145+
counter[0],
146+
3,
147+
f'Expected counter to increment at least 3 times during health check, '
148+
f'but got {counter[0]}. This indicates health check may be blocking.',
149+
)
150+
151+
# Verify health check made multiple attempts
152+
self.assertGreaterEqual(call_count[0], 2)
153+
154+
finally:
155+
# Clean up
156+
is_running[0] = False
157+
counter_task.cancel()
158+
try:
159+
await counter_task
160+
except asyncio.CancelledError:
161+
pass
162+
163+
@patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500')
164+
@patch('aiohttp.ClientSession.get')
165+
async def test_multiple_health_checks_concurrent(self, mock_get):
166+
"""Test that multiple health check calls can run concurrently"""
167+
# Create mock response
168+
mock_response = MagicMock()
169+
mock_response.status = 200
170+
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
171+
mock_response.__aexit__ = AsyncMock(return_value=None)
172+
mock_get.return_value = mock_response
173+
174+
# Run multiple health checks concurrently
175+
start_time = time.time()
176+
results = await asyncio.gather(
177+
DaprHealth.wait_for_sidecar(),
178+
DaprHealth.wait_for_sidecar(),
179+
DaprHealth.wait_for_sidecar(),
180+
)
181+
elapsed = time.time() - start_time
182+
183+
# All should complete successfully
184+
self.assertEqual(len(results), 3)
185+
self.assertIsNone(results[0])
186+
self.assertIsNone(results[1])
187+
self.assertIsNone(results[2])
188+
189+
# Should complete quickly since they run concurrently
190+
self.assertLess(elapsed, 1.0)
191+
192+
# Verify multiple calls were made
193+
self.assertGreaterEqual(mock_get.call_count, 3)
194+
195+
196+
if __name__ == '__main__':
197+
unittest.main()

0 commit comments

Comments
 (0)