Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/semantic_kernel/agents/bedrock/bedrock_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ async def _handle_function_call_contents(
chat_history=chat_history,
arguments=self.arguments,
function_call_count=len(function_call_contents),
function_behavior=self.function_choice_behavior,
)
for function_call in function_call_contents
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1111,11 +1111,17 @@ def _collect_text_and_annotations(cls: type[_T], content_list: list[Any]) -> lis

@classmethod
async def _invoke_function_calls(
cls: type[_T], kernel: "Kernel", fccs: list["FunctionCallContent"], chat_history: "ChatHistory"
cls: type[_T],
kernel: "Kernel",
fccs: list["FunctionCallContent"],
chat_history: "ChatHistory",
function_behavior: "FunctionChoiceBehavior | None" = None,
) -> list[Any]:
"""Invoke the function calls."""
tasks = [
kernel.invoke_function_call(function_call=function_call, chat_history=chat_history)
kernel.invoke_function_call(
function_call=function_call, chat_history=chat_history, function_behavior=function_behavior
)
for function_call in fccs
]
return await asyncio.gather(*tasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ async def _parse_function_call_arguments_done(

# Step 4: Invoke the function call
chat_history = ChatHistory()
await self._kernel.invoke_function_call(item, chat_history)
function_behavior = self._current_settings.function_choice_behavior if self._current_settings else None
await self._kernel.invoke_function_call(item, chat_history, function_behavior=function_behavior)
created_output: FunctionResultContent = chat_history.messages[-1].items[0] # type: ignore
# Step 5: Create the function result event
result = RealtimeFunctionResultEvent(
Expand Down
7 changes: 7 additions & 0 deletions python/semantic_kernel/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ async def invoke_function_call(
raise FunctionExecutionException(
f"Only functions: {allowed_functions} are allowed, {function_call.name} is not allowed."
)
elif function_behavior is None:
logger.debug(
"invoke_function_call called without function_behavior. "
"No allowlist validation will be performed for function '%s'. "
"Pass a FunctionChoiceBehavior with filters to enable validation.",
function_call.name,
)
Comment thread
SergeyMenshykh marked this conversation as resolved.
function_to_call = self.get_function(function_call.plugin_name, function_call.function_name)
except Exception as exc:
logger.exception(f"The function `{function_call.name}` is not part of the provided functions: {exc}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,86 @@ async def test_parse_function_call_arguments_done_fail(OpenAIWebsocket, kernel):
iter += 1


async def test_parse_function_call_arguments_done_passes_function_behavior(OpenAIWebsocket, kernel):
"""Verify that the realtime path passes function_choice_behavior to invoke_function_call."""
func_result = "result"
event = ResponseFunctionCallArgumentsDoneEvent(
call_id="call_id",
arguments='{"x": "' + func_result + '"}',
event_id="event_id",
output_index=0,
item_id="item_id",
name="plugin_name-function_name",
response_id="response_id",
type="response.function_call_arguments.done",
)
function_behavior = FunctionChoiceBehavior.Auto(filters={"included_plugins": ["plugin_name"]})
OpenAIWebsocket._current_settings = OpenAIRealtimeExecutionSettings(
instructions="instructions", ai_model_id="gpt-realtime"
)
OpenAIWebsocket._current_settings.function_choice_behavior = function_behavior
OpenAIWebsocket._call_id_to_function_map["call_id"] = "plugin_name-function_name"
func = kernel_function(name="function_name", description="function_description")(lambda x: x)
kernel.add_function(plugin_name="plugin_name", function_name="function_name", function=func)
OpenAIWebsocket._kernel = kernel

# Capture the kwargs passed to invoke_function_call
captured_kwargs = {}
original_invoke = Kernel.invoke_function_call

async def spy_invoke(self, *args, **kwargs):
captured_kwargs.update(kwargs)
return await original_invoke(self, *args, **kwargs)

with (
patch.object(Kernel, "invoke_function_call", spy_invoke),
patch.object(OpenAIWebsocket, "_send"),
):
async for _ in OpenAIWebsocket._parse_function_call_arguments_done(event):
pass

assert "function_behavior" in captured_kwargs
assert captured_kwargs["function_behavior"] is function_behavior


Comment thread
SergeyMenshykh marked this conversation as resolved.
async def test_parse_function_call_arguments_done_filters_block_unallowed(OpenAIWebsocket, kernel):
"""Verify that the realtime path blocks a function not in the allowlist."""
event = ResponseFunctionCallArgumentsDoneEvent(
call_id="call_id",
arguments='{"url": "http://169.254.169.254/"}',
event_id="event_id",
output_index=0,
item_id="item_id",
name="HttpPlugin-GetAsync",
response_id="response_id",
type="response.function_call_arguments.done",
)
function_behavior = FunctionChoiceBehavior.Auto(filters={"included_plugins": ["SafePlugin"]})
OpenAIWebsocket._current_settings = OpenAIRealtimeExecutionSettings(
instructions="instructions", ai_model_id="gpt-realtime"
)
OpenAIWebsocket._current_settings.function_choice_behavior = function_behavior
OpenAIWebsocket._call_id_to_function_map["call_id"] = "HttpPlugin-GetAsync"

# Register both plugins on kernel
safe_func = kernel_function(name="safe_function", description="safe")(lambda: "safe")
http_func = kernel_function(name="GetAsync", description="http get")(lambda url: url)
kernel.add_function(plugin_name="SafePlugin", function_name="safe_function", function=safe_func)
kernel.add_function(plugin_name="HttpPlugin", function_name="GetAsync", function=http_func)
OpenAIWebsocket._kernel = kernel

events_received = []
with patch.object(OpenAIWebsocket, "_send"):
async for evt in OpenAIWebsocket._parse_function_call_arguments_done(event):
events_received.append(evt)

# The function call event is yielded, then the result should contain the error
assert len(events_received) >= 2
result_event = events_received[-1]
assert isinstance(result_event, RealtimeFunctionResultEvent)
assert "not part of the provided" in str(result_event.function_result.result)


async def test_send_audio(OpenAIWebsocket):
audio_event = RealtimeAudioEvent(
audio=AudioContent(data=b"audio data", mime_type="audio/wav"),
Expand Down
95 changes: 95 additions & 0 deletions python/tests/unit/kernel/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,101 @@ async def test_invoke_function_call_with_missing_or_unexpected_args(kernel: Kern
), "Expected fallback message not found in chat history."


async def test_invoke_function_call_with_filters_blocks_unallowed_function(kernel: Kernel):
"""Verify that when function_behavior has filters, an unallowed function is blocked."""
tool_call_mock = MagicMock(spec=FunctionCallContent)
tool_call_mock.name = "HttpPlugin-GetAsync"
tool_call_mock.function_name = "GetAsync"
tool_call_mock.plugin_name = "HttpPlugin"
tool_call_mock.arguments = {"url": "http://169.254.169.254/"}
tool_call_mock.ai_model_id = None
tool_call_mock.metadata = {}
tool_call_mock.index = 0
tool_call_mock.id = "test_id"

chat_history = ChatHistory()

safe_func_meta = KernelFunctionMetadata(name="safe_function", is_prompt=False, plugin_name="SafePlugin")
function_behavior = FunctionChoiceBehavior.Auto(filters={"included_plugins": ["SafePlugin"]})

with patch("semantic_kernel.kernel.Kernel.get_list_of_function_metadata", return_value=[safe_func_meta]):
await kernel.invoke_function_call(
function_call=tool_call_mock,
chat_history=chat_history,
function_behavior=function_behavior,
)

# The function should have been blocked — an error message should be in chat history
assert len(chat_history.messages) == 1
assert "not allowed" in str(chat_history.messages[0].items[0].result) or "not part of the provided" in str(
chat_history.messages[0].items[0].result
)


async def test_invoke_function_call_with_filters_allows_matching_function(kernel: Kernel, get_tool_call_mock):
"""Verify that when function_behavior has filters, an allowed function proceeds (not blocked)."""
tool_call_mock = get_tool_call_mock
chat_history_mock = MagicMock(spec=ChatHistory)

func_meta = KernelFunctionMetadata(
name="function", is_prompt=False, plugin_name="test", fully_qualified_name="test-function"
)

func_mock = AsyncMock(spec=KernelFunction)
func_mock.metadata = func_meta
func_mock.name = "function"
func_mock.parameters = []
func_result = FunctionResult(value="ok", function=func_meta)
func_mock.invoke = AsyncMock(return_value=func_result)

function_behavior = FunctionChoiceBehavior.Auto(filters={"included_plugins": ["test"]})

with (
patch("semantic_kernel.kernel.logger", autospec=True) as logger_mock,
patch("semantic_kernel.kernel.Kernel.get_list_of_function_metadata", return_value=[func_meta]),
patch("semantic_kernel.kernel.Kernel.get_function", return_value=func_mock),
):
await kernel.invoke_function_call(
function_call=tool_call_mock,
chat_history=chat_history_mock,
function_behavior=function_behavior,
)

# The debug message for missing function_behavior should NOT have been logged
debug_calls = [call[0][0] for call in logger_mock.debug.call_args_list] if logger_mock.debug.called else []
assert not any("without function_behavior" in msg for msg in debug_calls)
# The exception logger should NOT have been called (function was allowed)
logger_mock.exception.assert_not_called()


async def test_invoke_function_call_without_function_behavior_logs_debug(kernel: Kernel, get_tool_call_mock):
"""Verify that calling invoke_function_call without function_behavior logs a debug message."""
tool_call_mock = get_tool_call_mock
chat_history_mock = MagicMock(spec=ChatHistory)

func_mock = AsyncMock(spec=KernelFunction)
func_meta = KernelFunctionMetadata(name="function", is_prompt=False)
func_mock.metadata = func_meta
func_mock.name = "function"
func_mock.parameters = []
func_result = FunctionResult(value="Function result", function=func_meta)
func_mock.invoke = AsyncMock(return_value=func_result)

with (
patch("semantic_kernel.kernel.logger", autospec=True) as logger_mock,
patch("semantic_kernel.kernel.Kernel.get_function", return_value=func_mock),
):
await kernel.invoke_function_call(
function_call=tool_call_mock,
chat_history=chat_history_mock,
# function_behavior intentionally omitted
)

logger_mock.debug.assert_called()
debug_calls = [call[0][0] for call in logger_mock.debug.call_args_list]
assert any("without function_behavior" in msg for msg in debug_calls)


# endregion
# region Plugins

Expand Down
Loading