|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +"""Location: ./tests/unit/mcpgateway/services/test_catalog_service.py |
| 3 | +Copyright 2025 |
| 4 | +SPDX-License-Identifier: Apache-2.0 |
| 5 | +Authors: Mihai Criveti |
| 6 | +
|
| 7 | +Unit Tests for Catalog Service . |
| 8 | +""" |
| 9 | + |
| 10 | +import pytest |
| 11 | +import asyncio |
| 12 | +from unittest.mock import AsyncMock, MagicMock, patch |
| 13 | +from datetime import datetime, timezone |
| 14 | + |
| 15 | +from mcpgateway.services.catalog_service import CatalogService |
| 16 | +from mcpgateway.schemas import ( |
| 17 | + CatalogListRequest, |
| 18 | + CatalogBulkRegisterRequest, |
| 19 | + CatalogServerRegisterRequest, |
| 20 | +) |
| 21 | + |
| 22 | +@pytest.fixture |
| 23 | +def service(): |
| 24 | + return CatalogService() |
| 25 | + |
| 26 | +@pytest.mark.asyncio |
| 27 | +async def test_load_catalog_cached(service): |
| 28 | + service._catalog_cache = {"cached": True} |
| 29 | + service._cache_timestamp = 1000.0 |
| 30 | + with patch("mcpgateway.services.catalog_service.settings", MagicMock(mcpgateway_catalog_cache_ttl=9999)), \ |
| 31 | + patch("mcpgateway.services.catalog_service.time.time", return_value=1001.0): |
| 32 | + result = await service.load_catalog() |
| 33 | + assert result == {"cached": True} |
| 34 | + |
| 35 | +@pytest.mark.asyncio |
| 36 | +async def test_load_catalog_missing_file(service): |
| 37 | + with patch("mcpgateway.services.catalog_service.settings", MagicMock(mcpgateway_catalog_file="missing.yml", mcpgateway_catalog_cache_ttl=0)): |
| 38 | + with patch("mcpgateway.services.catalog_service.Path.exists", return_value=False): |
| 39 | + result = await service.load_catalog(force_reload=True) |
| 40 | + assert "catalog_servers" in result |
| 41 | + |
| 42 | +@pytest.mark.asyncio |
| 43 | +async def test_load_catalog_valid_yaml(service): |
| 44 | + fake_yaml = {"catalog_servers": [{"id": "1", "name": "srv"}]} |
| 45 | + with patch("mcpgateway.services.catalog_service.settings", MagicMock(mcpgateway_catalog_file="catalog.yml", mcpgateway_catalog_cache_ttl=0)): |
| 46 | + with patch("mcpgateway.services.catalog_service.Path.exists", return_value=True): |
| 47 | + with patch("builtins.open", new_callable=MagicMock) as mock_open, patch("mcpgateway.services.catalog_service.yaml.safe_load", return_value=fake_yaml): |
| 48 | + mock_open.return_value.__enter__.return_value.read.return_value = "data" |
| 49 | + result = await service.load_catalog(force_reload=True) |
| 50 | + assert "catalog_servers" in result |
| 51 | + |
| 52 | +@pytest.mark.asyncio |
| 53 | +async def test_load_catalog_exception(service): |
| 54 | + with patch("mcpgateway.services.catalog_service.settings", MagicMock(mcpgateway_catalog_file="catalog.yml", mcpgateway_catalog_cache_ttl=0)): |
| 55 | + with patch("mcpgateway.services.catalog_service.open", side_effect=Exception("fail")): |
| 56 | + result = await service.load_catalog(force_reload=True) |
| 57 | + assert result["catalog_servers"] == [] |
| 58 | + |
| 59 | +@pytest.mark.asyncio |
| 60 | +async def test_get_catalog_servers_filters(service): |
| 61 | + fake_catalog = { |
| 62 | + "catalog_servers": [ |
| 63 | + {"id": "1", "name": "srv1", "url": "http://a", "category": "cat", "auth_type": "Open", "provider": "prov", "tags": ["t1"], "description": "desc"}, |
| 64 | + {"id": "2", "name": "srv2", "url": "http://b", "category": "other", "auth_type": "API", "provider": "prov2", "tags": ["t2"], "description": "desc2"}, |
| 65 | + ] |
| 66 | + } |
| 67 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 68 | + db = MagicMock() |
| 69 | + db.execute.return_value = [("http://a",)] |
| 70 | + req = CatalogListRequest(category="cat", auth_type="Open", provider="prov", search="srv", tags=["t1"], show_registered_only=True, show_available_only=True, offset=0, limit=10) |
| 71 | + result = await service.get_catalog_servers(req, db) |
| 72 | + assert result.total >= 1 |
| 73 | + assert all(s.category == "cat" for s in result.servers) |
| 74 | + |
| 75 | +@pytest.mark.asyncio |
| 76 | +async def test_register_catalog_server_not_found(service): |
| 77 | + with patch.object(service, "load_catalog", AsyncMock(return_value={"catalog_servers": []})): |
| 78 | + db = MagicMock() |
| 79 | + result = await service.register_catalog_server("missing", None, db) |
| 80 | + assert not result.success |
| 81 | + assert "not found" in result.message |
| 82 | + |
| 83 | +@pytest.mark.asyncio |
| 84 | +async def test_register_catalog_server_already_registered(service): |
| 85 | + fake_catalog = {"catalog_servers": [{"id": "1", "name": "srv", "url": "http://a", "description": "desc"}]} |
| 86 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 87 | + db = MagicMock() |
| 88 | + db.execute.return_value.scalar_one_or_none.return_value = MagicMock(id=123) |
| 89 | + with patch("mcpgateway.services.catalog_service.select"): |
| 90 | + result = await service.register_catalog_server("1", None, db) |
| 91 | + assert not result.success |
| 92 | + assert "already registered" in result.message |
| 93 | + |
| 94 | +@pytest.mark.asyncio |
| 95 | +async def test_register_catalog_server_success(service): |
| 96 | + fake_catalog = {"catalog_servers": [{"id": "1", "name": "srv", "url": "http://a", "description": "desc"}]} |
| 97 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 98 | + db = MagicMock() |
| 99 | + db.execute.return_value.scalar_one_or_none.return_value = None |
| 100 | + with patch("mcpgateway.services.catalog_service.select"), patch.object(service._gateway_service, "register_gateway", AsyncMock(return_value=MagicMock(id=1, name="srv"))): |
| 101 | + result = await service.register_catalog_server("1", None, db) |
| 102 | + assert result.success |
| 103 | + assert "Successfully" in result.message |
| 104 | + |
| 105 | +@pytest.mark.asyncio |
| 106 | +async def test_register_catalog_server_ipv6(service): |
| 107 | + fake_catalog = {"catalog_servers": [{"id": "1", "name": "srv", "url": "[::1]", "description": "desc"}]} |
| 108 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 109 | + db = MagicMock() |
| 110 | + db.execute.return_value.scalar_one_or_none.return_value = None |
| 111 | + with patch("mcpgateway.services.catalog_service.select"): |
| 112 | + result = await service.register_catalog_server("1", None, db) |
| 113 | + assert not result.success |
| 114 | + assert "IPv6" in result.error |
| 115 | + |
| 116 | +@pytest.mark.asyncio |
| 117 | +async def test_register_catalog_server_exception_mapping(service): |
| 118 | + fake_catalog = {"catalog_servers": [{"id": "1", "name": "srv", "url": "http://a", "description": "desc"}]} |
| 119 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 120 | + db = MagicMock() |
| 121 | + db.execute.return_value.scalar_one_or_none.return_value = None |
| 122 | + with patch("mcpgateway.services.catalog_service.select"), \ |
| 123 | + patch.object(service._gateway_service, "register_gateway", AsyncMock(side_effect=Exception("Connection refused"))): |
| 124 | + result = await service.register_catalog_server("1", None, db) |
| 125 | + assert "offline" in result.message |
| 126 | + |
| 127 | +@pytest.mark.asyncio |
| 128 | +async def test_check_server_availability_success(service): |
| 129 | + fake_catalog = {"catalog_servers": [{"id": "1", "url": "http://a"}]} |
| 130 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 131 | + with patch("mcpgateway.services.catalog_service.httpx.AsyncClient") as mock_client: |
| 132 | + mock_instance = AsyncMock() |
| 133 | + mock_instance.get.return_value.status_code = 200 |
| 134 | + mock_client.return_value.__aenter__.return_value = mock_instance |
| 135 | + result = await service.check_server_availability("1") |
| 136 | + assert result.is_available |
| 137 | + |
| 138 | +@pytest.mark.asyncio |
| 139 | +async def test_check_server_availability_not_found(service): |
| 140 | + with patch.object(service, "load_catalog", AsyncMock(return_value={"catalog_servers": []})): |
| 141 | + result = await service.check_server_availability("missing") |
| 142 | + assert not result.is_available |
| 143 | + assert "not found" in result.error |
| 144 | + |
| 145 | +@pytest.mark.asyncio |
| 146 | +async def test_check_server_availability_exception(service): |
| 147 | + fake_catalog = {"catalog_servers": [{"id": "1", "url": "http://a"}]} |
| 148 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 149 | + with patch("mcpgateway.services.catalog_service.httpx.AsyncClient", side_effect=Exception("fail")): |
| 150 | + result = await service.check_server_availability("1") |
| 151 | + assert not result.is_available |
| 152 | + |
| 153 | +@pytest.mark.asyncio |
| 154 | +async def test_bulk_register_servers_success_and_failure(service): |
| 155 | + fake_request = CatalogBulkRegisterRequest(server_ids=["1", "2"], skip_errors=False) |
| 156 | + with patch.object(service, "register_catalog_server", AsyncMock(side_effect=[MagicMock(success=True), MagicMock(success=False, error="fail")])): |
| 157 | + db = MagicMock() |
| 158 | + result = await service.bulk_register_servers(fake_request, db) |
| 159 | + assert result.total_attempted == 2 |
| 160 | + assert len(result.failed) == 1 |
| 161 | + |
| 162 | + |
| 163 | +@pytest.mark.asyncio |
| 164 | +async def test_auth_type_api_key_and_oauth(service): |
| 165 | + fake_catalog = {"catalog_servers": [{"id": "1", "name": "srv", "url": "http://a", "description": "desc", "auth_type": "API Key"}]} |
| 166 | + req = CatalogServerRegisterRequest(server_id="1", name="srv", api_key="secret", oauth_credentials=None) |
| 167 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 168 | + db = MagicMock() |
| 169 | + db.execute.return_value.scalar_one_or_none.return_value = None |
| 170 | + with patch("mcpgateway.services.catalog_service.select"), patch.object(service._gateway_service, "register_gateway", AsyncMock(return_value=MagicMock(id=1, name="srv"))): |
| 171 | + result = await service.register_catalog_server("1", req, db) |
| 172 | + assert result.success |
| 173 | + |
| 174 | + fake_catalog["catalog_servers"][0]["auth_type"] = "OAuth2.1 & API Key" |
| 175 | + with patch.object(service, "load_catalog", AsyncMock(return_value=fake_catalog)): |
| 176 | + db = MagicMock() |
| 177 | + db.execute.return_value.scalar_one_or_none.return_value = None |
| 178 | + with patch("mcpgateway.services.catalog_service.select"), patch.object(service._gateway_service, "register_gateway", AsyncMock(return_value=MagicMock(id=1, name="srv"))): |
| 179 | + result = await service.register_catalog_server("1", req, db) |
| 180 | + assert result.success |
| 181 | + |
| 182 | + |
| 183 | +@pytest.mark.asyncio |
| 184 | +async def test_bulk_register_servers_skip_errors(service): |
| 185 | + fake_request = CatalogBulkRegisterRequest(server_ids=["1", "2"], skip_errors=True) |
| 186 | + with patch.object(service, "register_catalog_server", AsyncMock(side_effect=[MagicMock(success=False, error="fail"), MagicMock(success=True)])): |
| 187 | + db = MagicMock() |
| 188 | + result = await service.bulk_register_servers(fake_request, db) |
| 189 | + assert result.total_attempted == 2 |
| 190 | + assert len(result.failed) == 1 |
0 commit comments