From 1ffd8185420ee1ba6f2be45babeba2886388ebbe Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Aug 2025 12:10:56 -0500 Subject: [PATCH 1/2] Fix ONVIF camera connection issues by disabling aiohttp auto-decompression --- onvif/zeep_aiohttp.py | 20 ++++----------- tests/test_zeep_transport.py | 49 ++++++++++++++++++------------------ 2 files changed, 29 insertions(+), 40 deletions(-) diff --git a/onvif/zeep_aiohttp.py b/onvif/zeep_aiohttp.py index 4076c2e..12b34fe 100644 --- a/onvif/zeep_aiohttp.py +++ b/onvif/zeep_aiohttp.py @@ -10,9 +10,8 @@ from zeep.transports import Transport from zeep.utils import get_version from zeep.wsdl.utils import etree_to_string -from multidict import CIMultiDict import httpx -from aiohttp import ClientResponse, ClientSession, hdrs +from aiohttp import ClientResponse, ClientSession from requests import Response from requests.structures import CaseInsensitiveDict @@ -66,15 +65,6 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: async def aclose(self) -> None: """Close the transport session.""" - def _filter_headers(self, headers: CIMultiDict[str]) -> list[tuple[str, str]]: - """Filter out Content-Encoding header. - - Since aiohttp has already decompressed the content, we need to - remove the Content-Encoding header to prevent zeep from trying - to decompress it again, which would cause a zlib error. - """ - return [(k, v) for k, v in headers.items() if k != hdrs.CONTENT_ENCODING] - def _aiohttp_to_httpx_response( self, aiohttp_response: ClientResponse, content: bytes ) -> httpx.Response: @@ -82,7 +72,7 @@ def _aiohttp_to_httpx_response( # Create httpx Response with the content httpx_response = httpx.Response( status_code=aiohttp_response.status, - headers=httpx.Headers(self._filter_headers(aiohttp_response.headers)), + headers=httpx.Headers(aiohttp_response.headers), content=content, request=httpx.Request( method=aiohttp_response.method, @@ -115,9 +105,7 @@ def _aiohttp_to_requests_response( new._content = content new.status_code = aiohttp_response.status # Use dict comprehension for requests.Response headers - new.headers = CaseInsensitiveDict( - self._filter_headers(aiohttp_response.headers) - ) + new.headers = CaseInsensitiveDict(aiohttp_response.headers) # Convert aiohttp cookies to requests format if aiohttp_response.cookies: for name, cookie in aiohttp_response.cookies.items(): @@ -154,6 +142,7 @@ async def _post_internal( headers=headers, proxy=self.proxy, timeout=self._client_timeout, + auto_decompress=False, # Let zeep handle decompression ) # Read the content to log it before checking status @@ -240,6 +229,7 @@ async def get( headers=headers, proxy=self.proxy, timeout=self._client_timeout, + auto_decompress=False, # Let zeep handle decompression ) # Read content and log before checking status diff --git a/tests/test_zeep_transport.py b/tests/test_zeep_transport.py index d349d7b..16c2240 100644 --- a/tests/test_zeep_transport.py +++ b/tests/test_zeep_transport.py @@ -1,5 +1,6 @@ """Tests for AIOHTTPTransport to ensure compatibility with zeep's AsyncTransport.""" +import gzip from http.cookies import SimpleCookie from unittest.mock import AsyncMock, Mock, patch @@ -857,19 +858,17 @@ async def test_cookie_jar_type(): @pytest.mark.asyncio -async def test_gzip_content_encoding_header_removed(): - """Test that Content-Encoding: gzip header is removed after aiohttp decompresses. +async def test_gzip_content_encoding_header_preserved(): + """Test that Content-Encoding: gzip header is preserved when auto_decompress=False. - This fixes the issue where aiohttp automatically decompresses gzip content - but the Content-Encoding header was still passed to zeep, causing it to - attempt decompression again on already-decompressed content, resulting in - zlib errors. + With auto_decompress=False, aiohttp won't decompress the content and will + preserve the Content-Encoding header, allowing zeep to handle decompression. """ mock_session = create_mock_session() transport = AIOHTTPTransport(session=mock_session) # Mock response with Content-Encoding: gzip - # aiohttp will have already decompressed the content + # With auto_decompress=False, content remains compressed mock_aiohttp_response = Mock(spec=aiohttp.ClientResponse) mock_aiohttp_response.status = 200 # Simulate headers with Content-Encoding: gzip @@ -883,9 +882,10 @@ async def test_gzip_content_encoding_header_removed(): mock_aiohttp_response.charset = "utf-8" mock_aiohttp_response.cookies = {} - # Content is already decompressed by aiohttp - decompressed_content = b'test' - mock_aiohttp_response.read = AsyncMock(return_value=decompressed_content) + # Content remains compressed (simulate gzipped content) + original_content = b'test' + compressed_content = gzip.compress(original_content) + mock_aiohttp_response.read = AsyncMock(return_value=compressed_content) mock_session = Mock(spec=aiohttp.ClientSession) mock_session.post = AsyncMock(return_value=mock_aiohttp_response) @@ -896,34 +896,32 @@ async def test_gzip_content_encoding_header_removed(): "http://camera.local/onvif/device_service", "", {} ) - # Verify Content-Encoding header was removed - assert "content-encoding" not in httpx_result.headers - assert "Content-Encoding" not in httpx_result.headers + # Verify Content-Encoding header is preserved + assert httpx_result.headers["content-encoding"] == "gzip" # Other headers should still be present assert httpx_result.headers["content-type"] == "application/soap+xml; charset=utf-8" assert httpx_result.headers["server"] == "PelcoOnvifNvt" - # Content should be the decompressed data - assert httpx_result.read() == decompressed_content + # httpx will decompress the content automatically when reading + assert httpx_result.read() == original_content # Test requests response (from get) mock_session.get = AsyncMock(return_value=mock_aiohttp_response) requests_result = await transport.get("http://camera.local/onvif/device_service") - # Verify Content-Encoding header was removed from requests response too - assert "content-encoding" not in requests_result.headers - assert "Content-Encoding" not in requests_result.headers + # Verify Content-Encoding header is preserved in requests response too + assert requests_result.headers["content-encoding"] == "gzip" # Other headers should still be present assert ( requests_result.headers["content-type"] == "application/soap+xml; charset=utf-8" ) assert requests_result.headers["server"] == "PelcoOnvifNvt" - # Content should be the decompressed data - assert requests_result.content == decompressed_content + # Content should be the compressed data (requests doesn't auto-decompress in Response object) + assert requests_result.content == compressed_content @pytest.mark.asyncio async def test_multiple_duplicate_headers_preserved(): - """Test that duplicate headers (except Content-Encoding) are preserved.""" + """Test that duplicate headers are preserved.""" mock_session = create_mock_session() transport = AIOHTTPTransport(session=mock_session) @@ -937,14 +935,15 @@ async def test_multiple_duplicate_headers_preserved(): headers.add("Set-Cookie", "user=john; Path=/api") headers.add("Set-Cookie", "token=xyz789; Secure") headers.add("Content-Type", "text/xml") - headers.add("Content-Encoding", "gzip") # This should be removed + headers.add("Content-Encoding", "gzip") # This is now preserved mock_aiohttp_response.headers = headers mock_aiohttp_response.method = "POST" mock_aiohttp_response.url = "http://example.com" mock_aiohttp_response.charset = "utf-8" mock_aiohttp_response.cookies = {} - mock_aiohttp_response.read = AsyncMock(return_value=b"test") + # Since Content-Encoding: gzip is present, content should be gzipped + mock_aiohttp_response.read = AsyncMock(return_value=gzip.compress(b"test")) mock_session = Mock(spec=aiohttp.ClientSession) mock_session.post = AsyncMock(return_value=mock_aiohttp_response) @@ -953,8 +952,8 @@ async def test_multiple_duplicate_headers_preserved(): # Test httpx response httpx_result = await transport.post("http://example.com", "test", {}) - # Content-Encoding should be removed - assert "content-encoding" not in httpx_result.headers + # Content-Encoding should be preserved now + assert httpx_result.headers["content-encoding"] == "gzip" # All Set-Cookie headers should be preserved set_cookie_values = httpx_result.headers.get_list("set-cookie") From a8ca02e005ad0dc051feb95567bf80b2519bf609 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 18 Aug 2025 13:57:29 -0500 Subject: [PATCH 2/2] adjust code --- onvif/zeep_aiohttp.py | 50 ++++++------- tests/test_zeep_transport.py | 137 ++++++++++++++++++++++++----------- 2 files changed, 118 insertions(+), 69 deletions(-) diff --git a/onvif/zeep_aiohttp.py b/onvif/zeep_aiohttp.py index 12b34fe..04f6e17 100644 --- a/onvif/zeep_aiohttp.py +++ b/onvif/zeep_aiohttp.py @@ -136,25 +136,24 @@ async def _post_internal( data = message try: - response = await self.session.post( + async with self.session.post( address, data=data, headers=headers, proxy=self.proxy, timeout=self._client_timeout, auto_decompress=False, # Let zeep handle decompression - ) - - # Read the content to log it before checking status - content = await response.read() - _LOGGER.debug( - "HTTP Response from %s (status: %d):\n%s", - address, - response.status, - content, - ) + ) as response: + # Read the content to log it before checking status + content = await response.read() + _LOGGER.debug( + "HTTP Response from %s (status: %d):\n%s", + address, + response.status, + content, + ) - return response, content + return response, content except RuntimeError as exc: # Handle RuntimeError which may occur if the session is closed raise RuntimeError(f"Failed to post to {address}: {exc}") from exc @@ -223,27 +222,26 @@ async def get( headers.setdefault("User-Agent", f"Zeep/{get_version()}") try: - response = await self.session.get( + async with self.session.get( address, params=params, headers=headers, proxy=self.proxy, timeout=self._client_timeout, auto_decompress=False, # Let zeep handle decompression - ) - - # Read content and log before checking status - content = await response.read() - - _LOGGER.debug( - "HTTP Response from %s (status: %d):\n%s", - address, - response.status, - content, - ) + ) as response: + # Read content and log before checking status + content = await response.read() + + _LOGGER.debug( + "HTTP Response from %s (status: %d):\n%s", + address, + response.status, + content, + ) - # Convert directly to requests.Response - return self._aiohttp_to_requests_response(response, content) + # Convert directly to requests.Response + return self._aiohttp_to_requests_response(response, content) except RuntimeError as exc: # Handle RuntimeError which may occur if the session is closed raise RuntimeError(f"Failed to get from {address}: {exc}") from exc diff --git a/tests/test_zeep_transport.py b/tests/test_zeep_transport.py index 16c2240..48e3606 100644 --- a/tests/test_zeep_transport.py +++ b/tests/test_zeep_transport.py @@ -16,6 +16,10 @@ def create_mock_session(timeout=None): """Create a mock aiohttp session with optional timeout.""" mock_session = Mock(spec=aiohttp.ClientSession) + # Set up post and get as Mock (not AsyncMock) + # They will return mock responses directly + mock_session.post = Mock() + mock_session.get = Mock() if timeout: mock_session.timeout = timeout else: @@ -25,6 +29,30 @@ def create_mock_session(timeout=None): return mock_session +class AsyncContextManagerMock: + """Mock that implements async context manager protocol.""" + + def __init__(self, return_value): + self.return_value = return_value + + async def __aenter__(self): + return self.return_value + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + +def create_capturing_mock(mock_response, capture_dict): + """Create a mock that captures arguments and returns an async context manager.""" + + def capture_and_return(*args, **kwargs): + capture_dict["args"] = args + capture_dict["kwargs"] = kwargs + return AsyncContextManagerMock(mock_response) + + return Mock(side_effect=capture_and_return) + + @pytest.mark.asyncio async def test_post_returns_httpx_response(): """Test that post() returns an httpx.Response object.""" @@ -43,7 +71,9 @@ async def test_post_returns_httpx_response(): mock_content = b"test" mock_aiohttp_response.read = AsyncMock(return_value=mock_content) - mock_session.post = AsyncMock(return_value=mock_aiohttp_response) + # Make the response work as an async context manager + + mock_session.post.return_value = AsyncContextManagerMock(mock_aiohttp_response) # Call post result = await transport.post( @@ -76,8 +106,7 @@ async def test_post_xml_returns_requests_response(): mock_content = b"test" mock_aiohttp_response.read = AsyncMock(return_value=mock_content) - - mock_session.post = AsyncMock(return_value=mock_aiohttp_response) + mock_session.post.return_value = AsyncContextManagerMock(mock_aiohttp_response) # Create XML envelope envelope = etree.Element("Envelope") @@ -111,8 +140,7 @@ async def test_get_returns_requests_response(): mock_content = b"test" mock_aiohttp_response.read = AsyncMock(return_value=mock_content) - - mock_session.get = AsyncMock(return_value=mock_aiohttp_response) + mock_session.get.return_value = AsyncContextManagerMock(mock_aiohttp_response) # Call get result = await transport.get( @@ -181,7 +209,16 @@ async def test_timeout_handling(): # Mock session that times out mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(side_effect=TimeoutError()) + + # Create a class that raises on __aenter__ + class TimeoutContextManager: + async def __aenter__(self): + raise TimeoutError() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + mock_session.post = Mock(return_value=TimeoutContextManager()) transport.session = mock_session @@ -199,7 +236,16 @@ async def test_connection_error_handling(): # Mock session that fails mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(side_effect=aiohttp.ClientError("Connection failed")) + + # Create a class that raises on __aenter__ + class ErrorContextManager: + async def __aenter__(self): + raise aiohttp.ClientError("Connection failed") + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + mock_session.get = Mock(return_value=ErrorContextManager()) transport.session = mock_session @@ -250,7 +296,9 @@ async def test_post_with_bytes_message(): mock_aiohttp_response.read = AsyncMock(return_value=b"") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_aiohttp_response) + mock_session.post = Mock( + return_value=AsyncContextManagerMock(mock_aiohttp_response) + ) transport.session = mock_session # Test with bytes message @@ -277,7 +325,7 @@ async def test_get_with_none_params(): mock_aiohttp_response.read = AsyncMock(return_value=b"") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_aiohttp_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_aiohttp_response)) transport.session = mock_session # Test without params/headers (should work with None) @@ -303,15 +351,16 @@ async def test_user_agent_header(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - post_mock = AsyncMock(return_value=mock_aiohttp_response) - mock_session.post = post_mock + + # Capture the arguments while returning an async context manager + captured_args = {} + mock_session.post = create_capturing_mock(mock_aiohttp_response, captured_args) transport.session = mock_session await transport.post("http://example.com", "test", {}) # Check User-Agent was set - call_args = post_mock.call_args - headers = call_args[1]["headers"] + headers = captured_args["kwargs"]["headers"] assert "User-Agent" in headers assert headers["User-Agent"].startswith("Zeep/") @@ -334,15 +383,14 @@ async def test_custom_timeout_used(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - post_mock = AsyncMock(return_value=mock_aiohttp_response) - mock_session.post = post_mock + captured_args = {} + mock_session.post = create_capturing_mock(mock_aiohttp_response, captured_args) transport.session = mock_session await transport.post("http://example.com", "test", {}) # Check that custom timeout was used - call_args = post_mock.call_args - timeout = call_args[1]["timeout"] + timeout = captured_args["kwargs"]["timeout"] assert timeout is not None assert timeout == custom_timeout @@ -364,15 +412,14 @@ async def test_proxy_parameter(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - get_mock = AsyncMock(return_value=mock_aiohttp_response) - mock_session.get = get_mock + captured_args = {} + mock_session.get = create_capturing_mock(mock_aiohttp_response, captured_args) transport.session = mock_session await transport.get("http://example.com") # Check proxy was passed - call_args = get_mock.call_args - assert call_args[1]["proxy"] == "http://proxy:8080" + assert captured_args["kwargs"]["proxy"] == "http://proxy:8080" @pytest.mark.asyncio @@ -411,7 +458,7 @@ async def test_response_encoding(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_aiohttp_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_aiohttp_response)) transport.session = mock_session result = await transport.get("http://example.com") @@ -448,7 +495,9 @@ async def test_cookies_in_httpx_response(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_aiohttp_response) + mock_session.post = Mock( + return_value=AsyncContextManagerMock(mock_aiohttp_response) + ) transport.session = mock_session # Test httpx response (from post) @@ -476,7 +525,7 @@ async def test_cookies_in_requests_response(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_aiohttp_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_aiohttp_response)) transport.session = mock_session # Test requests response (from get) @@ -517,8 +566,7 @@ async def test_session_reuse(): mock_aiohttp_response.cookies = {} mock_aiohttp_response.raise_for_status = Mock() mock_aiohttp_response.read = AsyncMock(return_value=b"test") - - mock_session.get = AsyncMock(return_value=mock_aiohttp_response) + mock_session.get.return_value = AsyncContextManagerMock(mock_aiohttp_response) # Make multiple requests result1 = await transport.get("http://example.com") @@ -572,15 +620,14 @@ async def test_content_type_header_default(): mock_aiohttp_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - post_mock = AsyncMock(return_value=mock_aiohttp_response) - mock_session.post = post_mock + captured_args = {} + mock_session.post = create_capturing_mock(mock_aiohttp_response, captured_args) transport.session = mock_session await transport.post("http://example.com", "test", {}) # Check Content-Type was set - call_args = post_mock.call_args - headers = call_args[1]["headers"] + headers = captured_args["kwargs"]["headers"] assert headers["Content-Type"] == 'text/xml; charset="utf-8"' @@ -631,7 +678,7 @@ async def test_cookie_conversion_httpx_basic(): mock_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_response) + mock_session.post = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Make request @@ -667,7 +714,7 @@ async def test_cookie_conversion_requests_basic(): mock_response.read = AsyncMock(return_value=b"test") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Make request @@ -706,7 +753,7 @@ async def test_cookie_attributes_httpx(): mock_response.read = AsyncMock(return_value=b"secure") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_response) + mock_session.post = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Make request @@ -746,7 +793,7 @@ async def test_multiple_cookies(): mock_response.read = AsyncMock(return_value=b"multi") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Make request @@ -777,7 +824,7 @@ async def test_empty_cookies(): mock_response.read = AsyncMock(return_value=b"nocookies") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Make request @@ -808,7 +855,7 @@ async def test_cookie_encoding(): mock_response.read = AsyncMock(return_value=b"encoded") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.get = AsyncMock(return_value=mock_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Make request @@ -842,7 +889,7 @@ async def test_cookie_jar_type(): mock_response.read = AsyncMock(return_value=b"jar") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_response) + mock_session.post = Mock(return_value=AsyncContextManagerMock(mock_response)) transport.session = mock_session # Test httpx response @@ -850,7 +897,7 @@ async def test_cookie_jar_type(): assert isinstance(httpx_result.cookies, httpx.Cookies) # Test requests response - mock_session.get = AsyncMock(return_value=mock_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_response)) requests_result = await transport.get("http://example.com") # Verify cookies are accessible in requests response assert hasattr(requests_result.cookies, "__getitem__") @@ -888,7 +935,9 @@ async def test_gzip_content_encoding_header_preserved(): mock_aiohttp_response.read = AsyncMock(return_value=compressed_content) mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_aiohttp_response) + mock_session.post = Mock( + return_value=AsyncContextManagerMock(mock_aiohttp_response) + ) transport.session = mock_session # Test httpx response (from post) @@ -905,7 +954,7 @@ async def test_gzip_content_encoding_header_preserved(): assert httpx_result.read() == original_content # Test requests response (from get) - mock_session.get = AsyncMock(return_value=mock_aiohttp_response) + mock_session.get.return_value = AsyncContextManagerMock(mock_aiohttp_response) requests_result = await transport.get("http://camera.local/onvif/device_service") # Verify Content-Encoding header is preserved in requests response too @@ -946,7 +995,9 @@ async def test_multiple_duplicate_headers_preserved(): mock_aiohttp_response.read = AsyncMock(return_value=gzip.compress(b"test")) mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_aiohttp_response) + mock_session.post = Mock( + return_value=AsyncContextManagerMock(mock_aiohttp_response) + ) transport.session = mock_session # Test httpx response @@ -980,7 +1031,7 @@ async def test_http_error_responses_no_exception(): mock_401_response.read = AsyncMock(return_value=b"Unauthorized") mock_session = Mock(spec=aiohttp.ClientSession) - mock_session.post = AsyncMock(return_value=mock_401_response) + mock_session.post = Mock(return_value=AsyncContextManagerMock(mock_401_response)) transport.session = mock_session # Should not raise exception @@ -997,7 +1048,7 @@ async def test_http_error_responses_no_exception(): mock_500_response.cookies = {} mock_500_response.read = AsyncMock(return_value=b"Server Error") - mock_session.get = AsyncMock(return_value=mock_500_response) + mock_session.get = Mock(return_value=AsyncContextManagerMock(mock_500_response)) # Should not raise exception result = await transport.get("http://example.com/wsdl")