Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions python/semantic_kernel/connectors/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ def create_mcp_server_from_kernel(
functions_to_expose = [
func for func in kernel.get_full_list_of_function_metadata() if func.name not in (excluded_functions or [])
]
exposed_names = frozenset(func.name for func in functions_to_expose)

if len(functions_to_expose) > 0:

Expand Down Expand Up @@ -1058,8 +1059,15 @@ async def _call_tool(
*args: Any,
) -> Sequence[types.TextContent | types.ImageContent | types.AudioContent | types.EmbeddedResource]:
"""Call a tool in the kernel."""
await _log(level="debug", data=f"Calling tool with args: {args}")
function_name, arguments = args[0], args[1]
if function_name not in exposed_names:
raise McpError(
error=types.ErrorData(
code=types.METHOD_NOT_FOUND,
message=f"Unknown tool: {function_name}",
)
)
await _log(level="debug", data=f"Calling tool: {function_name}")
result = await _call_kernel_function(function_name, arguments)
if result:
value = result.value
Expand Down Expand Up @@ -1165,7 +1173,6 @@ async def _set_logging_level(level: types.LoggingLevel) -> None:
async def _call_kernel_function(function_name: str, arguments: Any) -> FunctionResult | None:
function = kernel.get_function(plugin_name=None, function_name=function_name)
arguments["server"] = server
print("arguments", arguments)
return await function.invoke(kernel=kernel, **arguments)

return server
51 changes: 51 additions & 0 deletions python/tests/unit/connectors/mcp/test_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,54 @@ async def test_mcp_normalization_function(mock_session, list_tool_calls_with_sla
assert _normalize_mcp_name("weird\\name with spaces") == "weird-name-with-spaces"
assert _normalize_mcp_name("simple_name") == "simple_name"
assert _normalize_mcp_name("Name-With.Dots_And-Hyphens") == "Name-With.Dots_And-Hyphens"


async def test_excluded_function_cannot_be_called(kernel: "Kernel"):
"""Test that excluded functions are rejected at call time, not just hidden from listing."""
from semantic_kernel.connectors.mcp import create_mcp_server_from_kernel
from semantic_kernel.functions.kernel_function_decorator import kernel_function

side_effect_called = False

@kernel_function(name="public_echo")
def public_echo(message: str) -> str:
return f"echo: {message}"

@kernel_function(name="secret_admin")
def secret_admin(target: str) -> str:
nonlocal side_effect_called
side_effect_called = True
return f"privileged action on {target}"

kernel.add_function(plugin_name="tools", function=public_echo)
kernel.add_function(plugin_name="tools", function=secret_admin)

server = create_mcp_server_from_kernel(kernel, excluded_functions=["secret_admin"])

# Verify the server was created with handlers
assert types.ListToolsRequest in server.request_handlers
assert types.CallToolRequest in server.request_handlers

# Mock _get_cached_tool_definition to bypass SDK request context requirements
# (normally set by a real MCP session transport)
async def _fake_get_cached_tool_definition(tool_name):
return None

server._get_cached_tool_definition = _fake_get_cached_tool_definition

# Build a proper CallToolRequest as the MCP SDK would send
call_tool_request = types.CallToolRequest(
method="tools/call",
params=types.CallToolRequestParams(name="secret_admin", arguments={}),
)

# The internal handler wraps our _call_tool; invoke via the registered handler
handler = server.request_handlers[types.CallToolRequest]
result = await handler(call_tool_request)

# The call must fail (isError=True) with the correct error message
assert result.root.isError is True, "Calling an excluded function should return an error"
assert any("Unknown tool" in c.text for c in result.root.content if hasattr(c, "text")), (
f"Expected 'Unknown tool' error, got: {result.root.content}"
)
assert not side_effect_called, "Excluded function's side effect should not have fired"
Loading