diff --git a/python/tests/unit/connectors/ai/ollama/services/test_ollama_chat_completion.py b/python/tests/unit/connectors/ai/ollama/services/test_ollama_chat_completion.py index 77daee4f8108..0d8d065588f3 100644 --- a/python/tests/unit/connectors/ai/ollama/services/test_ollama_chat_completion.py +++ b/python/tests/unit/connectors/ai/ollama/services/test_ollama_chat_completion.py @@ -1,15 +1,22 @@ # Copyright (c) Microsoft. All rights reserved. +from collections.abc import AsyncGenerator from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest from ollama import AsyncClient +import semantic_kernel.connectors.ai.ollama.services.ollama_chat_completion as occ_module +from semantic_kernel.connectors.ai.completion_usage import CompletionUsage from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior from semantic_kernel.connectors.ai.ollama.ollama_prompt_execution_settings import OllamaChatPromptExecutionSettings from semantic_kernel.connectors.ai.ollama.services.ollama_chat_completion import OllamaChatCompletion from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.function_call_content import FunctionCallContent +from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent +from semantic_kernel.contents.streaming_text_content import StreamingTextContent from semantic_kernel.exceptions.service_exceptions import ( ServiceInitializationError, ServiceInvalidExecutionSettingsError, @@ -260,3 +267,187 @@ async def test_prepare_chat_history_for_request(setup_ollama_chat_completion): prepared_history = ollama_chat_completion._prepare_chat_history_for_request(chat_history) assert prepared_history == [] + + +async def test_service_url_with_httpx_client(model_id: str) -> None: + """ + Test that service_url returns the base_url of the underlying httpx.AsyncClient. + """ + # Initialize an AsyncClient and manually set its _client attribute to an httpx.AsyncClient + client = AsyncClient(host="unused") + base = httpx.AsyncClient(base_url="http://example.com:8000") + client._client = base # simulate underlying httpx client + + ollama = OllamaChatCompletion(ai_model_id=model_id, client=client) + # service_url should reflect the base_url of the httpx client + assert ollama.service_url() == "http://example.com:8000" + + +@patch("ollama.AsyncClient.chat", new_callable=AsyncMock) +async def test_chat_response_branch( + mock_chat: AsyncMock, + model_id: str, + service_id: str, + default_options: dict, + chat_history, + monkeypatch, +) -> None: + """ + Test get_chat_message_contents when AsyncClient.chat returns a ChatResponse instance. + """ + + class DummyFunction: + def __init__(self, name, arguments): + self.name = name + self.arguments = arguments + + class DummyToolCall: + def __init__(self, function): + self.function = function + + class DummyMessage: + def __init__(self, content: str, tool_calls=None) -> None: + self.content = content + self.tool_calls = tool_calls or [] + + class DummyChatResponse: + def __init__( + self, + content: str, + model: str, + prompt_eval_count: int, + eval_count: int, + tool_calls=None, + ) -> None: + function_calls = [ + DummyToolCall(DummyFunction(tc["function"]["name"], tc["function"]["arguments"])) for tc in tool_calls + ] + self.message = DummyMessage(content, function_calls) + self.model = model + self.prompt_eval_count = prompt_eval_count + self.eval_count = eval_count + + # Monkeypatch the ChatResponse type in the module so isinstance works + monkeypatch.setattr(occ_module, "ChatResponse", DummyChatResponse) + + # Prepare a dummy ChatResponse return value + dummy_resp = DummyChatResponse( + content="resp_text", + model="mdl", + prompt_eval_count=2, + eval_count=3, + tool_calls=[{"function": {"name": "fn", "arguments": {"x": 1}}}], + ) + mock_chat.return_value = dummy_resp + + ollama = OllamaChatCompletion(ai_model_id=model_id) + settings = OllamaChatPromptExecutionSettings(service_id=service_id, options=default_options) + + results = await ollama.get_chat_message_contents(chat_history, settings) + # Only one response expected + assert len(results) == 1 + msg = results[0] + # Assert it's a ChatMessageContent + assert isinstance(msg, ChatMessageContent) + # The content property should return the response text + assert msg.content == "resp_text" + + # The second item should be a FunctionCallContent + func_item = msg.items[1] + assert isinstance(func_item, FunctionCallContent) + # Validate function call details + assert func_item.name == "fn" + assert func_item.arguments == {"x": 1} + + # Check metadata + assert "model" in msg.metadata and msg.metadata["model"] == "mdl" + # Access usage directly, key should exist + usage = msg.metadata["usage"] + assert isinstance(usage, CompletionUsage) + assert usage.prompt_tokens == 2 and usage.completion_tokens == 3 + + +@patch("ollama.AsyncClient.chat", new_callable=AsyncMock) +async def test_streaming_chat_response_branch( + mock_chat: AsyncMock, + model_id: str, + service_id: str, + default_options: dict, + chat_history, + monkeypatch, +) -> None: + """ + Test get_streaming_chat_message_contents when AsyncClient.chat yields ChatResponse instances. + """ + + class DummyFunction: + def __init__(self, name, arguments): + self.name = name + self.arguments = arguments + + class DummyToolCall: + def __init__(self, function): + self.function = function + + class DummyMessage: + def __init__(self, content: str, tool_calls=None) -> None: + self.content = content + self.tool_calls = tool_calls or [] + + class DummyChatResponse: + def __init__( + self, + content: str, + model: str, + prompt_eval_count: int, + eval_count: int, + tool_calls=None, + ) -> None: + function_calls = [ + DummyToolCall(DummyFunction(tc["function"]["name"], tc["function"]["arguments"])) for tc in tool_calls + ] + self.message = DummyMessage(content, function_calls) + self.model = model + self.prompt_eval_count = prompt_eval_count + self.eval_count = eval_count + + # Monkeypatch ChatResponse type + monkeypatch.setattr(occ_module, "ChatResponse", DummyChatResponse) + + # Prepare an async generator yielding DummyChatResponse + async def fake_stream() -> AsyncGenerator[DummyChatResponse, None]: + yield DummyChatResponse( + content="stream_text", + model="m2", + prompt_eval_count=1, + eval_count=1, + tool_calls=[{"function": {"name": "f2", "arguments": {}}}], + ) + + mock_chat.return_value = fake_stream() + + ollama = OllamaChatCompletion(ai_model_id=model_id) + settings = OllamaChatPromptExecutionSettings(service_id=service_id, options=default_options) + + collected = [] + # Iterate over streamed batches + async for batch in ollama.get_streaming_chat_message_contents(chat_history, settings): + # We expect a list with a single StreamingChatMessageContent + assert len(batch) == 1 + sc = batch[0] + assert isinstance(sc, StreamingChatMessageContent) + + # First item should be text content + text_item = sc.items[0] + assert isinstance(text_item, StreamingTextContent) + assert text_item.text == "stream_text" + + # Next item should be a FunctionCallContent + func_item = sc.items[1] + assert isinstance(func_item, FunctionCallContent) + assert func_item.name == "f2" + + collected.append(sc) + + # Only one batch should be collected + assert len(collected) == 1 diff --git a/python/tests/unit/processes/dapr_runtime/test_process_actor.py b/python/tests/unit/processes/dapr_runtime/test_process_actor.py index ab38b37063d2..733450155645 100644 --- a/python/tests/unit/processes/dapr_runtime/test_process_actor.py +++ b/python/tests/unit/processes/dapr_runtime/test_process_actor.py @@ -2,6 +2,7 @@ import asyncio import json +from queue import Queue from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -9,10 +10,13 @@ from dapr.actor.runtime._type_information import ActorTypeInformation from dapr.actor.runtime.context import ActorRuntimeContext +from semantic_kernel.exceptions.kernel_exceptions import KernelException +from semantic_kernel.exceptions.process_exceptions import ProcessEventUndefinedException from semantic_kernel.processes.dapr_runtime.actors.actor_state_key import ActorStateKeys from semantic_kernel.processes.dapr_runtime.actors.process_actor import ProcessActor from semantic_kernel.processes.dapr_runtime.dapr_process_info import DaprProcessInfo from semantic_kernel.processes.dapr_runtime.dapr_step_info import DaprStepInfo +from semantic_kernel.processes.kernel_process.kernel_process_event import KernelProcessEvent from semantic_kernel.processes.kernel_process.kernel_process_state import KernelProcessState @@ -158,3 +162,122 @@ def test_handle_message(actor_context): asyncio.run(actor_context.handle_message(message_mock)) mock_run_once.assert_called_once() + + +@pytest.fixture +def actor() -> ProcessActor: + """Create a fresh ProcessActor with mocked dependencies.""" + # Arrange: Create a dummy runtime context and ProcessActor instance + actor_id = ActorId("test_actor") + runtime_context = MagicMock() + kernel = MagicMock() + actor_obj = ProcessActor(runtime_context, actor_id, kernel=kernel, factories={}) + # Mock internal state manager + actor_obj._state_manager = AsyncMock() + actor_obj._state_manager.try_get_state = AsyncMock(return_value=(False, None)) + actor_obj._state_manager.try_add_state = AsyncMock() + actor_obj._state_manager.save_state = AsyncMock() + return actor_obj + + +def test_name_uninitialized(actor: ProcessActor): + """Test that accessing name before initialization raises KernelException.""" + with pytest.raises(KernelException) as exc_info: + _ = actor.name + assert "must be initialized before accessing the name" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_start_not_initialized(actor: ProcessActor): + """Test that start() without initialization raises ValueError.""" + actor.initialize_task = False + with pytest.raises(ValueError): + await actor.start() + + +@pytest.mark.asyncio +async def test_run_once_none_event(actor: ProcessActor): + """Test that run_once(None) raises ProcessEventUndefinedException.""" + actor.initialize_task = True + with pytest.raises(ProcessEventUndefinedException): + await actor.run_once(None) # type: ignore + + +@pytest.mark.asyncio +async def test_send_message_none_event(actor: ProcessActor): + """Test that send_message(None) raises ProcessEventUndefinedException.""" + with pytest.raises(ProcessEventUndefinedException): + await actor.send_message(None) # type: ignore + + +def test_send_message_success(actor: ProcessActor): + """Test that send_message enqueues the event into external_event_queue.""" + event = KernelProcessEvent(id="e1", data="d1") + asyncio.run(actor.send_message(event)) + assert isinstance(actor.external_event_queue, Queue) + assert not actor.external_event_queue.empty() + queued = actor.external_event_queue.get() + assert queued is event + + +@pytest.mark.asyncio +async def test_to_dapr_process_info_uninitialized(actor: ProcessActor): + """Test to_dapr_process_info raises ValueError if process is None.""" + actor.process = None + with pytest.raises(ValueError) as exc: + await actor.to_dapr_process_info() + assert "must be initialized before converting" in str(exc.value) + + +@pytest.mark.asyncio +async def test_to_dapr_process_info_inner_step_type_none(actor: ProcessActor): + """Test to_dapr_process_info raises ValueError if inner_step_python_type is None.""" + actor.process = MagicMock() + # Simulate a process with missing inner_step_python_type + actor.process.inner_step_python_type = None + actor.process.state = MagicMock(name="Proc", id="pid") + actor.steps = [] + with pytest.raises(ValueError) as exc: + await actor.to_dapr_process_info() + assert "inner step type must be defined" in str(exc.value) + + +@pytest.mark.asyncio +async def test_to_dapr_process_info_success(actor: ProcessActor): + """Test to_dapr_process_info returns correct dict for initialized process with no steps.""" + proc_state = KernelProcessState(name="Proc", version="1.0", id="test_actor") + dapr_proc = DaprProcessInfo( + inner_step_python_type="Type1", + state=proc_state, + edges={}, + steps=[], + ) + actor.process = dapr_proc + actor.steps = [] + result = await actor.to_dapr_process_info() + assert result == dapr_proc.model_dump() + + +@pytest.mark.asyncio +async def test_stop_no_task(actor: ProcessActor): + """Test stop() returns normally when no process_task is running.""" + actor.process_task = None + await actor.stop() + + +def test_name_after_manual_set(actor: ProcessActor): + """Test that name property returns the correct name after manual initialization.""" + actor.process = MagicMock() + actor.process.state = MagicMock() + actor.process.state.name = "MyProcess" + actor.process.state.id = "id123" + assert actor.name == "MyProcess" + + +@pytest.mark.asyncio +async def test_send_outgoing_public_events_no_parent(actor: ProcessActor): + """Test send_outgoing_public_events does nothing if parent_process_id is None.""" + actor.parent_process_id = None + with patch("semantic_kernel.processes.dapr_runtime.actors.process_actor.ActorProxy.create") as mock_proxy: + await actor.send_outgoing_public_events() + mock_proxy.assert_not_called() diff --git a/python/tests/unit/processes/dapr_runtime/test_step_actor.py b/python/tests/unit/processes/dapr_runtime/test_step_actor.py index b677df054859..a89f67065af8 100644 --- a/python/tests/unit/processes/dapr_runtime/test_step_actor.py +++ b/python/tests/unit/processes/dapr_runtime/test_step_actor.py @@ -1,15 +1,28 @@ # Copyright (c) Microsoft. All rights reserved. import json +from typing import cast from unittest.mock import AsyncMock, MagicMock, patch import pytest -from dapr.actor import ActorId +from dapr.actor import ActorId, ActorProxy +from semantic_kernel.exceptions.kernel_exceptions import KernelException +from semantic_kernel.exceptions.process_exceptions import ( + ProcessFunctionNotFoundException, +) from semantic_kernel.processes.dapr_runtime.actors.actor_state_key import ActorStateKeys +from semantic_kernel.processes.dapr_runtime.actors.event_buffer_actor import EventBufferActor from semantic_kernel.processes.dapr_runtime.actors.step_actor import StepActor from semantic_kernel.processes.dapr_runtime.dapr_step_info import DaprStepInfo +from semantic_kernel.processes.kernel_process.kernel_process_edge import KernelProcessEdge +from semantic_kernel.processes.kernel_process.kernel_process_event import ( + KernelProcessEvent, + KernelProcessEventVisibility, +) +from semantic_kernel.processes.kernel_process.kernel_process_function_target import KernelProcessFunctionTarget from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState +from semantic_kernel.processes.process_event import ProcessEvent from semantic_kernel.processes.process_message import ProcessMessage @@ -205,3 +218,248 @@ async def test_activate_step_with_factory_uses_existing_state(actor_context): actor_context._state_manager.save_state.assert_awaited_once() assert isinstance(actor_context.step_state.state, FakeState) fake_step_instance.activate.assert_awaited_once_with(actor_context.step_state) + + +@pytest.fixture +def mock_actor_context(): + """Provides a fresh StepActor instance with mocked dependencies.""" + ctx = MagicMock() + actor_id = ActorId("test_actor") + kernel = MagicMock() + actor = StepActor(ctx, actor_id, kernel, factories={}) + actor._state_manager = MagicMock() + return actor + + +def test_name_property_before_init_raises(mock_actor_context): + """Accessing .name before initialization raises KernelException""" + with pytest.raises(KernelException): + _ = mock_actor_context.name + + +async def test_name_property_after_init_returns_name(mock_actor_context): + """After setting step_info, name returns the state's name""" + step_state = KernelProcessStepState(name="MyStep", version="v1", id="id1") + mock_actor_context.step_info = DaprStepInfo( + state=step_state, + inner_step_python_type="Any", + edges={}, + ) + # should return MyStep + assert mock_actor_context.name == "MyStep" + + +def test_get_edge_for_event_empty(mock_actor_context): + """get_edge_for_event returns empty when no edges set""" + mock_actor_context.output_edges = {} + assert mock_actor_context.get_edge_for_event("event") == [] + + +def test_get_edge_for_event_present(mock_actor_context): + """get_edge_for_event returns correct list""" + edge = KernelProcessEdge( + source_step_id="s1", + output_target=KernelProcessFunctionTarget( + step_id="t1", function_name="f", parameter_name=None, target_event_id="e1" + ), + ) + + mock_actor_context.output_edges = {"e1": [edge]} + assert mock_actor_context.get_edge_for_event("e1") == [edge] + + +def test_scoped_event_none_raises(mock_actor_context): + """scoped_event with None raises ValueError""" + # Use cast to satisfy type checker + with pytest.raises(ValueError): + mock_actor_context.scoped_event(cast(ProcessEvent, None)) + + +def test_scoped_event_sets_namespace(mock_actor_context): + """scoped_event updates event namespace correctly""" + # prepare name and id + mock_actor_context.step_info = DaprStepInfo( + state=KernelProcessStepState(name="StepName", version="v1", id="id"), + inner_step_python_type="Any", + edges={}, + ) + # create event + evt = ProcessEvent(inner_event=KernelProcessEvent(id="e", data="d"), namespace=None) + result = mock_actor_context.scoped_event(evt) + assert result.namespace == f"StepName_{mock_actor_context.id.id}" + + +async def test_invoke_function_calls_kernel_invoke(mock_actor_context): + """invoke_function delegates to kernel.invoke with correct args""" + fake_fn = MagicMock(name="f") + fake_kernel = MagicMock() + fake_kernel.invoke = AsyncMock(return_value="result") + res = await mock_actor_context.invoke_function(fake_fn, fake_kernel, {"a": 1}) + fake_kernel.invoke.assert_awaited_once_with(fake_fn, a=1) + assert res == "result" + + +async def test_emit_event_without_namespace_raises(mock_actor_context): + """emit_event without setting event_namespace raises ValueError""" + mock_actor_context.event_namespace = None + with pytest.raises(ValueError): + await mock_actor_context.emit_event(KernelProcessEvent(id="e", data="d")) + + +async def test_emit_process_event_public_and_edge(mock_actor_context): + """emit_process_event enqueues event to parent buffer and sends messages to targets""" + # setup event as public and parent process id + mock_actor_context.parent_process_id = "parent1" + # fake ActorProxy.create to return AsyncMock for enqueue + mock_parent = AsyncMock(spec=EventBufferActor) + mock_child = AsyncMock(spec=EventBufferActor) + + def fake_create(actor_type, actor_id, actor_interface): + # first call is for EventBufferInterface, second for MessageBufferInterface + if actor_interface.__name__ == "EventBufferInterface": + return mock_parent + return mock_child + + with patch.object(ActorProxy, "create", side_effect=fake_create): + # create an edge for event + target = KernelProcessFunctionTarget( + step_id="child1", function_name="func", parameter_name="p", target_event_id="evt" + ) + edge = KernelProcessEdge(source_step_id="src", output_target=target) + + mock_actor_context.output_edges = {"evt": [edge]} + mock_actor_context.get_edge_for_event = MagicMock(return_value=[edge]) + + # call + evt = ProcessEvent( + inner_event=KernelProcessEvent(id="evt", data="data", visibility=KernelProcessEventVisibility.Public), + namespace="ns", + ) + await mock_actor_context.emit_process_event(evt) + # verify enqueue on parent and child + mock_parent.enqueue.assert_awaited_once() + mock_child.enqueue.assert_awaited_once() + + +@patch.object(StepActor, "activate_step", AsyncMock()) +async def test_to_dapr_step_info_errors(mock_actor_context): + """to_dapr_step_info raises if uninitialized""" + # case: step_info not set + mock_actor_context.step_activated = False + mock_actor_context.step_info = None + with pytest.raises(ValueError): + await mock_actor_context.to_dapr_step_info() + + # case: inner_step_type not set + mock_actor_context.step_info = DaprStepInfo( + state=KernelProcessStepState(name="S", version="v", id="id"), + inner_step_python_type="Type", + edges={}, + ) + mock_actor_context.inner_step_type = None + mock_actor_context.step_activated = True + with pytest.raises(ValueError): + await mock_actor_context.to_dapr_step_info() + + +async def test_to_dapr_step_info_success(mock_actor_context): + """to_dapr_step_info returns correct model_dump""" + # setup valid step_info and inner_step_type + mock_actor_context.step_info = DaprStepInfo( + state=KernelProcessStepState(name="N", version="v", id="id"), + inner_step_python_type="Type", + edges={"e": []}, + ) + mock_actor_context.inner_step_type = "Type" + mock_actor_context.step_activated = True + result = await mock_actor_context.to_dapr_step_info() + # check returned dict structure + assert "inner_step_python_type" in result + assert result["inner_step_python_type"] == "Type" + assert result["edges"] == {"e": []} + + +async def test_handle_message_none_raises(mock_actor_context): + """handle_message with None raises ValueError""" + with pytest.raises(ValueError): + await mock_actor_context.handle_message(None) + + +async def test_handle_message_no_function_found_raises(mock_actor_context): + """handle_message raises when function exists in inputs but not in functions dict""" + # prepare actor state + mock_actor_context.step_activated = True + mock_actor_context.functions = {} + mock_actor_context.inputs = {"f1": {"a": "v"}} + mock_actor_context.initial_inputs = {"f1": {"a": "v"}} + mock_actor_context.step_info = DaprStepInfo( + state=KernelProcessStepState(name="S", version="v", id="id"), + inner_step_python_type="Type", + edges={}, + ) + from semantic_kernel.processes.process_message import ProcessMessage + + msg = ProcessMessage(source_id="s", destination_id="d", function_name="f1", values={"a": "v"}) + with pytest.raises(ProcessFunctionNotFoundException): + await mock_actor_context.handle_message(msg) + + +async def test_handle_message_function_returns_none_emits_error(mock_actor_context): + """handle_message when invoke_function returns None emits an error event""" + mock_actor_context.step_activated = True + # ensure name property works by setting step_info + mock_actor_context.step_info = DaprStepInfo( + state=KernelProcessStepState(name="S", version="v", id="id"), + inner_step_python_type="Type", + edges={}, + ) + # prepare inputs and functions + fake_kernel_fn = MagicMock(plugin_name="plug", name="f1") + mock_actor_context.functions = {"f1": fake_kernel_fn} + mock_actor_context.inputs = {"f1": {"x": 1}} + mock_actor_context.initial_inputs = {"f1": {"x": None}} + # patch invoke_function to return None + mock_actor_context.invoke_function = AsyncMock(return_value=None) + # capture emit_event + mock_actor_context.emit_event = AsyncMock() + from semantic_kernel.processes.process_message import ProcessMessage + + msg = ProcessMessage(source_id="s", destination_id="d", function_name="f1", values={"x": 1}) + await mock_actor_context.handle_message(msg) + # should emit error event + emitted = mock_actor_context.emit_event.call_args.args[0] + assert isinstance(emitted, KernelProcessEvent) + assert emitted.id == "f1.OnError" + + +async def test_handle_message_success_emits_result_and_resets_inputs(mock_actor_context): + """handle_message successfully invokes function and emits result event""" + # setup valid step_info and activation + mock_actor_context.step_info = DaprStepInfo( + state=KernelProcessStepState(name="S", version="v", id="id"), + inner_step_python_type="Type", + edges={}, + ) + mock_actor_context.step_activated = True + fake_kernel_fn = MagicMock(plugin_name="plug", name="f1") + mock_actor_context.functions = {"f1": fake_kernel_fn} + mock_actor_context.inputs = {"f1": {"x": 1}} + mock_actor_context.initial_inputs = {"f1": {"x": None}} + + # define a dummy result class with a value attribute + class DummyResult: + def __init__(self, value): + self.value = value + + r = DummyResult("out") + mock_actor_context.invoke_function = AsyncMock(return_value=r) + mock_actor_context.emit_event = AsyncMock() + from semantic_kernel.processes.process_message import ProcessMessage + + msg = ProcessMessage(source_id="s", destination_id="d", function_name="f1", values={"x": 1}) + await mock_actor_context.handle_message(msg) + # verify emitted result event and inputs reset + emitted = mock_actor_context.emit_event.call_args.args[0] + assert emitted.id == "f1.OnResult" + assert emitted.data == "out" + assert mock_actor_context.inputs["f1"] == {"x": None} diff --git a/python/tests/unit/processes/test_process_builder.py b/python/tests/unit/processes/test_process_builder.py new file mode 100644 index 000000000000..cab4c74ba3af --- /dev/null +++ b/python/tests/unit/processes/test_process_builder.py @@ -0,0 +1,211 @@ +# Copyright (c) Microsoft. All rights reserved. +from enum import Enum + +import pytest + +from semantic_kernel.exceptions.process_exceptions import ProcessInvalidConfigurationException +from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess +from semantic_kernel.processes.kernel_process.kernel_process_function_target import KernelProcessFunctionTarget +from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep +from semantic_kernel.processes.process_builder import ProcessBuilder +from semantic_kernel.processes.process_function_target_builder import ProcessFunctionTargetBuilder +from semantic_kernel.processes.process_step_builder import ProcessStepBuilder +from semantic_kernel.processes.process_step_edge_builder import ProcessStepEdgeBuilder +from semantic_kernel.processes.step_utils import get_fully_qualified_name + + +def _create_builder(name: str) -> ProcessBuilder: + """Helper to construct a ProcessBuilder instance bypassing BaseModel required fields.""" + return ProcessBuilder.model_construct( + name=name, + event_namespace=f"{name}_ns", + id="id", + functions_dict={}, + edges={}, + entry_steps=[], + external_event_target_map={}, + has_parent_process=False, + steps=[], + factories={}, + ) + + +class DummyTarget: + function_name = "some_function" + parameter_name = "some_param" + + +class DummyStep(KernelProcessStep): + def resolve_function_target(self, function_name, parameter_name): + return DummyTarget() + + +class SampleEnum(Enum): + """Sample enum for event IDs.""" + + EVENT_A = "event_a" + + +def test_add_step_with_non_class_raises() -> None: + """Test that add_step raises when a non-class is passed as step_type.""" + builder = _create_builder("testProc") + with pytest.raises(ProcessInvalidConfigurationException) as exc_info: + builder.add_step(123) # type: ignore + assert "Expected a class type" in str(exc_info.value) + + +def test_add_step_appends_step_and_returns_builder() -> None: + """Test that a valid class step is appended and returned correctly.""" + builder = _create_builder("proc1") + initial_state = {"key": "value"} + step_builder = builder.add_step(DummyStep, name="CustomStep", initial_state=initial_state) + assert step_builder in builder.steps + assert step_builder.name == "CustomStep" + assert step_builder.initial_state == initial_state + + +def test_add_step_with_factory_function_stores_factory() -> None: + """Test that providing a factory function stores it in the factories dict.""" + builder = _create_builder("proc2") + + def factory_func() -> DummyStep: + return DummyStep() + + builder.add_step(DummyStep, factory_function=factory_func) + fqn = get_fully_qualified_name(DummyStep) + assert fqn in builder.factories + assert builder.factories[fqn] is factory_func + + +def test_where_input_event_missing_raises() -> None: + """Test that where_input_event raises when the event ID is not registered.""" + builder = _create_builder("proc") + with pytest.raises(ValueError) as exc_info: + builder.where_input_event_is("nonexistent") + assert "does not expose an event with Id" in str(exc_info.value) + + +def test_where_input_event_success_returns_copy() -> None: + """Test that where_input_event returns a copy with updated step and event id.""" + builder = _create_builder("proc") + builder.entry_steps.append(DummyStep()) + orig = ProcessFunctionTargetBuilder(builder) + builder.external_event_target_map["evt1"] = orig + + class Evt(Enum): + EVT1 = "evt1" + + result = builder.where_input_event_is(Evt.EVT1) + assert result is not orig + assert result.step is builder + assert result.target_event_id == "evt1" + + +def test_on_input_event_returns_edge_builder_with_string_and_enum() -> None: + """Test that on_input_event returns a ProcessEdgeBuilder with correct source and event_id.""" + + from semantic_kernel.processes.process_builder import ProcessEdgeBuilder # noqa: F811, I001 + from semantic_kernel.functions.kernel_function_metadata import KernelFunctionMetadata # noqa: F401 + from semantic_kernel.processes.process_builder import ProcessBuilder # noqa: F401 + + ProcessEdgeBuilder.model_rebuild() + + builder = _create_builder("proc") + eb_str = builder.on_input_event("myEvent") + assert isinstance(eb_str, ProcessEdgeBuilder) + assert eb_str.source is builder + assert eb_str.event_id == "myEvent" + eb_enum = builder.on_input_event(SampleEnum.EVENT_A) + assert eb_enum.event_id == SampleEnum.EVENT_A.value + + +def test_link_to_raises_when_target_none() -> None: + """Test that link_to raises when edge_builder.target is not set.""" + builder = _create_builder("proc") + edge = ProcessStepEdgeBuilder(source=builder, event_id="evt") + with pytest.raises(ValueError) as exc_info: + builder.link_to("evt", edge) + assert "Target must be set before linking" in str(exc_info.value) + + +def test_link_to_success_updates_internal_maps_and_edges() -> None: + """Test that link_to properly updates entry_steps, external_event_target_map, and edges.""" + builder = _create_builder("proc") + edge = ProcessStepEdgeBuilder(source=builder, event_id="evt") + builder.entry_steps.append(DummyStep()) + target_builder = ProcessFunctionTargetBuilder(builder) + edge.target = target_builder + builder.link_to("evt", edge) + assert edge.source in builder.entry_steps + assert builder.external_event_target_map["evt"] is target_builder + assert "evt" in builder.edges + assert edge in builder.edges["evt"] + + +def test_build_with_simple_step_yields_kernel_process() -> None: + """Test that build returns a KernelProcess with correct properties for simple steps.""" + builder = _create_builder("simpleProc") + builder.add_step(DummyStep) + process = builder.build() + assert isinstance(process, KernelProcess) + assert process.state.name == "simpleProc" + assert process.state.id is None + assert len(process.steps) == 1 + step_info = process.steps[0] + assert step_info.inner_step_type is DummyStep + assert step_info.output_edges == {} + assert process.factories == {} + + +def test_resolve_function_target_no_steps_raises() -> None: + """Test that resolve_function_target raises when there are no entry_steps.""" + builder = _create_builder("proc") + with pytest.raises(ValueError) as exc_info: + builder.resolve_function_target("fn", "param") + assert "No targets found for function" in str(exc_info.value) + + +def test_resolve_function_target_multiple_targets_raises() -> None: + """Test that resolve_function_target raises when multiple entry_steps return targets.""" + builder = _create_builder("proc") + + from semantic_kernel.functions.kernel_function_metadata import KernelFunctionMetadata # noqa: F401 + from semantic_kernel.processes.process_builder import ProcessBuilder # noqa: F401 + + ProcessBuilder.model_rebuild() + + class FakeStep(ProcessStepBuilder): + def __init__(self, name: str = "default_step_name"): + super().__init__(name=name) + + def resolve_function_target(self, fn: str, pn: str) -> KernelProcessFunctionTarget: + return KernelProcessFunctionTarget(step_id="1", function_name=fn, parameter_name=pn) + + builder.entry_steps = [FakeStep(), FakeStep()] # type: ignore + with pytest.raises(ValueError) as exc_info: + builder.resolve_function_target("fn", "param") + assert "Multiple targets found for function" in str(exc_info.value) + + +def test_resolve_function_target_success() -> None: + """Test that resolve_function_target returns the single target when exactly one entry_step matches.""" + builder = _create_builder("proc") + + from semantic_kernel.functions.kernel_function_metadata import KernelFunctionMetadata # noqa: F401 + from semantic_kernel.processes.process_builder import ProcessBuilder # noqa: F401 + + ProcessBuilder.model_rebuild() + + class FakeStep(ProcessStepBuilder): + def __init__(self, name: str = "default_step_name"): + super().__init__(name=name) + + def resolve_function_target(self, fn: str, pn: str) -> KernelProcessFunctionTarget: + return KernelProcessFunctionTarget(step_id="1", function_name=fn, parameter_name=pn) + + builder.entry_steps = [FakeStep()] # type: ignore + result = builder.resolve_function_target("myFn", "myParam") + assert isinstance(result, KernelProcessFunctionTarget) + assert result.step_id == "1" + assert result.function_name == "myFn" + assert result.parameter_name == "myParam"