From 7ff1525d9c14001176b26a4e56af584b9391d920 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Thu, 6 Nov 2025 10:01:56 -0700 Subject: [PATCH 1/4] Storing changes commit --- durabletask-azurefunctions/CHANGELOG.md | 10 + durabletask-azurefunctions/__init__.py | 0 .../durabletask/azurefunctions/__init__.py | 0 .../durabletask/azurefunctions/constants.py | 10 + .../azurefunctions/decorators/__init__.py | 11 + .../azurefunctions/decorators/durable_app.py | 193 ++++++++++++++++++ .../azurefunctions/decorators/metadata.py | 109 ++++++++++ .../internal/DurableClientConverter.py | 46 +++++ .../azurefunctions/internal/__init__.py | 3 + .../durabletask/azurefunctions/worker.py | 2 + durabletask-azurefunctions/pyproject.toml | 43 ++++ 11 files changed, 427 insertions(+) create mode 100644 durabletask-azurefunctions/CHANGELOG.md create mode 100644 durabletask-azurefunctions/__init__.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/__init__.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/constants.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/worker.py create mode 100644 durabletask-azurefunctions/pyproject.toml diff --git a/durabletask-azurefunctions/CHANGELOG.md b/durabletask-azurefunctions/CHANGELOG.md new file mode 100644 index 0000000..b9be159 --- /dev/null +++ b/durabletask-azurefunctions/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## v0.1.0 + +- Initial implementation diff --git a/durabletask-azurefunctions/__init__.py b/durabletask-azurefunctions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/constants.py b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py new file mode 100644 index 0000000..78c9792 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py @@ -0,0 +1,10 @@ +"""Constants used to determine the local running context.""" +# Todo: Remove unused constants after module is complete +DEFAULT_LOCAL_HOST: str = 'localhost:7071' +DEFAULT_LOCAL_ORIGIN: str = f'http://{DEFAULT_LOCAL_HOST}' +DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' +HTTP_ACTION_NAME = 'BuiltIn::HttpActivity' +ORCHESTRATION_TRIGGER = "orchestrationTrigger" +ACTIVITY_TRIGGER = "activityTrigger" +ENTITY_TRIGGER = "entityTrigger" +DURABLE_CLIENT = "durableClient" diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py new file mode 100644 index 0000000..f3cfb91 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py @@ -0,0 +1,11 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Durable Task SDK for Python entities component""" + +import durabletask.azurefunctions.decorators.durable_app as durable_app +import durabletask.azurefunctions.decorators.metadata as metadata + +__all__ = ["durable_app", "metadata"] + +PACKAGE_NAME = "durabletask.entities" diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py new file mode 100644 index 0000000..152f6d1 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py @@ -0,0 +1,193 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ + DurableClient +from typing import Callable, Optional +from typing import Union +from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel, OrchestrationContext + + +class Blueprint(TriggerApi, BindingApi): + """Durable Functions (DF) Blueprint container. + + It allows functions to be declared via trigger and binding decorators, + but does not automatically index/register these functions. + + To register these functions, utilize the `register_functions` method from any + :class:`FunctionRegister` subclass, such as `DFApp`. + """ + + def __init__(self, + http_auth_level: Union[AuthLevel, str] = AuthLevel.FUNCTION): + """Instantiate a Durable Functions app with which to register Functions. + + Parameters + ---------- + http_auth_level: Union[AuthLevel, str] + Authorization level required for Function invocation. + Defaults to AuthLevel.Function. + + Returns + ------- + DFApp + New instance of a Durable Functions app + """ + super().__init__(auth_level=http_auth_level) + + def _configure_orchestrator_callable(self, wrap) -> Callable: + """Obtain decorator to construct an Orchestrator class from a user-defined Function. + + In the old programming model, this decorator's logic was unavoidable boilerplate + in user-code. Now, this is handled internally by the framework. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Orchestrator class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(orchestrator_func): + # Construct an orchestrator based on the end-user code + + # TODO: Extract this logic (?) + def handle(context: OrchestrationContext) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + # TODO: Run the orchestration using the context + return "" + + handle.orchestrator_function = orchestrator_func + + # invoke next decorator, with the Orchestrator as input + handle.__name__ = orchestrator_func.__name__ + return wrap(handle) + + return decorator + + def orchestration_trigger(self, context_name: str, + orchestration: Optional[str] = None): + """Register an Orchestrator Function. + + Parameters + ---------- + context_name: str + Parameter name of the DurableOrchestrationContext object. + orchestration: Optional[str] + Name of Orchestrator Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_orchestrator_callable + @self._configure_function_builder + def wrap(fb): + + def decorator(): + fb.add_trigger( + trigger=OrchestrationTrigger(name=context_name, + orchestration=orchestration)) + return fb + + return decorator() + + return wrap + + def activity_trigger(self, input_name: str, + activity: Optional[str] = None): + """Register an Activity Function. + + Parameters + ---------- + input_name: str + Parameter name of the Activity input. + activity: Optional[str] + Name of Activity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_trigger( + trigger=ActivityTrigger(name=input_name, + activity=activity)) + return fb + + return decorator() + + return wrap + + def entity_trigger(self, context_name: str, + entity_name: Optional[str] = None): + """Register an Entity Function. + + Parameters + ---------- + context_name: str + Parameter name of the Entity input. + entity_name: Optional[str] + Name of Entity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_trigger( + trigger=EntityTrigger(name=context_name, + entity_name=entity_name)) + return fb + + return decorator() + + return wrap + + def durable_client_input(self, + client_name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None + ): + """Register a Durable-client Function. + + Parameters + ---------- + client_name: str + Parameter name of durable client. + task_hub: Optional[str] + Used in scenarios where multiple function apps share the same storage account + but need to be isolated from each other. If not specified, the default value + from host.json is used. + This value must match the value used by the target orchestrator functions. + connection_name: Optional[str] + The name of an app setting that contains a storage account connection string. + The storage account represented by this connection string must be the same one + used by the target orchestrator functions. If not specified, the default storage + account connection string for the function app is used. + """ + + @self._configure_function_builder + def wrap(fb): + def decorator(): + # self._add_rich_client(fb, client_name, DurableOrchestrationClient) + + fb.add_binding( + binding=DurableClient(name=client_name, + task_hub=task_hub, + connection_name=connection_name)) + return fb + + return decorator() + + return wrap + + +class DFApp(Blueprint, FunctionRegister): + """Durable Functions (DF) app. + + Exports the decorators required to declare and index DF Function-types. + """ + + pass diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py new file mode 100644 index 0000000..4bf1d6c --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py @@ -0,0 +1,109 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from typing import Optional + +from durabletask.azurefunctions.constants import ORCHESTRATION_TRIGGER, \ + ACTIVITY_TRIGGER, ENTITY_TRIGGER, DURABLE_CLIENT +from azure.functions.decorators.core import Trigger, InputBinding + + +class OrchestrationTrigger(Trigger): + """OrchestrationTrigger. + + Trigger representing an Orchestration Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ORCHESTRATION_TRIGGER + + def __init__(self, + name: str, + orchestration: Optional[str] = None, + ) -> None: + self.orchestration = orchestration + super().__init__(name=name) + + +class ActivityTrigger(Trigger): + """ActivityTrigger. + + Trigger representing a Durable Functions Activity. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ACTIVITY_TRIGGER + + def __init__(self, + name: str, + activity: Optional[str] = None, + ) -> None: + self.activity = activity + super().__init__(name=name) + + +class EntityTrigger(Trigger): + """EntityTrigger. + + Trigger representing an Entity Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ENTITY_TRIGGER + + def __init__(self, + name: str, + entity_name: Optional[str] = None, + ) -> None: + self.entity_name = entity_name + super().__init__(name=name) + + +class DurableClient(InputBinding): + """DurableClient. + + Binding representing a Durable-client object. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this Binding, as a string. + + Returns + ------- + str + The string representation of this binding. + """ + return DURABLE_CLIENT + + def __init__(self, + name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None + ) -> None: + self.task_hub = task_hub + self.connection_name = connection_name + super().__init__(name=name) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py new file mode 100644 index 0000000..4286967 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py @@ -0,0 +1,46 @@ +import abc +from typing import Any, Optional + +from azure.functions import meta + + +class DurableInConverter(meta._BaseConverter, binding=None): + + @classmethod + @abc.abstractmethod + def check_input_type_annotation(cls, pytype: type) -> bool: + pass + + @classmethod + @abc.abstractmethod + def decode(cls, data: meta.Datum, *, trigger_metadata) -> Any: + raise NotImplementedError + + @classmethod + @abc.abstractmethod + def has_implicit_output(cls) -> bool: + return False + + +class DurableOutConverter(meta._BaseConverter, binding=None): + + @classmethod + @abc.abstractmethod + def check_output_type_annotation(cls, pytype: type) -> bool: + pass + + @classmethod + @abc.abstractmethod + def encode(cls, obj: Any, *, + expected_type: Optional[type]) -> Optional[meta.Datum]: + raise NotImplementedError + +# Durable Functions Durable Client Bindings + + +class DurableClientConverter(DurableInConverter, + DurableOutConverter, + binding='durableClient'): + @classmethod + def has_implicit_output(cls) -> bool: + return False diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py new file mode 100644 index 0000000..d5823cf --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py @@ -0,0 +1,3 @@ +from .DurableClientConverter import DurableClientConverter + +__all__ = ["DurableClientConverter"] diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/worker.py b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py new file mode 100644 index 0000000..a176672 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py @@ -0,0 +1,2 @@ +class TempClass: + pass diff --git a/durabletask-azurefunctions/pyproject.toml b/durabletask-azurefunctions/pyproject.toml new file mode 100644 index 0000000..dfb02eb --- /dev/null +++ b/durabletask-azurefunctions/pyproject.toml @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# For more information on pyproject.toml, see https://peps.python.org/pep-0621/ + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "durabletask.azurefunctions" +version = "0.1.0" +description = "Durable Task Python SDK provider implementation for Durable Azure Functions" +keywords = [ + "durable", + "task", + "workflow", + "azure", + "azure functions" +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", +] +requires-python = ">=3.9" +license = {file = "LICENSE"} +readme = "README.md" +dependencies = [ + "durabletask>=0.5.0", + "azure-identity>=1.19.0", + "azure-functions>=1.11.0" +] + +[project.urls] +repository = "https://github.com/microsoft/durabletask-python" +changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md" + +[tool.setuptools.packages.find] +include = ["durabletask.azurefunctions", "durabletask.azurefunctions.*"] + +[tool.pytest.ini_options] +minversion = "6.0" From 552a2ddbe1d02730dea17c2442e10caac9d7788f Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 21 Nov 2025 13:46:16 -0700 Subject: [PATCH 2/4] Working orchestrators + activities --- .../durabletask/azurefunctions/client.py | 85 +++++++++++++++++ .../durabletask/azurefunctions/constants.py | 2 +- .../azurefunctions/decorators/durable_app.py | 91 +++++++++++++++++-- .../internal/DurableClientConverter.py | 46 ---------- .../azurefunctions/internal/__init__.py | 3 - .../azurefunctions_grpc_interceptor.py | 27 ++++++ .../internal/azurefunctions_null_stub.py | 39 ++++++++ .../durabletask/azurefunctions/worker.py | 34 ++++++- .../ProtoTaskHubSidecarServiceStub.py | 35 +++++++ durabletask/worker.py | 6 +- 10 files changed, 306 insertions(+), 62 deletions(-) create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/client.py delete mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py create mode 100644 durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py create mode 100644 durabletask/internal/ProtoTaskHubSidecarServiceStub.py diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/client.py b/durabletask-azurefunctions/durabletask/azurefunctions/client.py new file mode 100644 index 0000000..63a267b --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/client.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json + +from datetime import timedelta +from typing import Any, Optional +import azure.functions as func + +from durabletask.entities import EntityInstanceId +from durabletask.client import TaskHubGrpcClient +from durabletask.azurefunctions.internal.azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl + + +# Client class used for Durable Functions +class DurableFunctionsClient(TaskHubGrpcClient): + taskHubName: str + connectionName: str + creationUrls: dict[str, str] + managementUrls: dict[str, str] + baseUrl: str + requiredQueryStringParameters: str + rpcBaseUrl: str + httpBaseUrl: str + maxGrpcMessageSizeInBytes: int + grpcHttpClientTimeout: timedelta + + def __init__(self, client_as_string: str): + client = json.loads(client_as_string) + + self.taskHubName = client.get("taskHubName", "") + self.connectionName = client.get("connectionName", "") + self.creationUrls = client.get("creationUrls", {}) + self.managementUrls = client.get("managementUrls", {}) + self.baseUrl = client.get("baseUrl", "") + self.requiredQueryStringParameters = client.get("requiredQueryStringParameters", "") + self.rpcBaseUrl = client.get("rpcBaseUrl", "") + self.httpBaseUrl = client.get("httpBaseUrl", "") + self.maxGrpcMessageSizeInBytes = client.get("maxGrpcMessageSizeInBytes", 0) + # TODO: convert the string value back to timedelta - annoying regex? + self.grpcHttpClientTimeout = client.get("grpcHttpClientTimeout", timedelta(seconds=30)) + interceptors = [AzureFunctionsDefaultClientInterceptorImpl(self.taskHubName, self.requiredQueryStringParameters)] + + # We pass in None for the metadata so we don't construct an additional interceptor in the parent class + # Since the parent class doesn't use anything metadata for anything else, we can set it as None + super().__init__( + host_address=self.rpcBaseUrl, + secure_channel=False, + metadata=None, + interceptors=interceptors) + + def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: + """Creates an HTTP response for checking the status of a Durable Function instance. + + Args: + request (func.HttpRequest): The incoming HTTP request. + instance_id (str): The ID of the Durable Function instance. + """ + raise NotImplementedError("This method is not implemented yet.") + + def create_http_management_payload(self, instance_id: str) -> dict[str, str]: + """Creates an HTTP management payload for a Durable Function instance. + + Args: + instance_id (str): The ID of the Durable Function instance. + """ + raise NotImplementedError("This method is not implemented yet.") + + def read_entity_state( + self, + entity_id: EntityInstanceId, + task_hub_name: Optional[str], + connection_name: Optional[str] + ) -> tuple[bool, Any]: + """Reads the state of a Durable Entity. + + Args: + entity_id (str): The ID of the Durable Entity. + task_hub_name (Optional[str]): The name of the task hub. + connection_name (Optional[str]): The name of the connection. + + Returns: + (bool, Any): A tuple containing a boolean indicating if the entity exists and its state. + """ + raise NotImplementedError("This method is not implemented yet.") diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/constants.py b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py index 78c9792..652afca 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/constants.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py @@ -1,5 +1,5 @@ """Constants used to determine the local running context.""" -# Todo: Remove unused constants after module is complete +# TODO: Remove unused constants after module is complete DEFAULT_LOCAL_HOST: str = 'localhost:7071' DEFAULT_LOCAL_ORIGIN: str = f'http://{DEFAULT_LOCAL_HOST}' DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py index 152f6d1..59ccc01 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py @@ -1,10 +1,19 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import base64 +from functools import wraps + +from durabletask.internal.orchestrator_service_pb2 import OrchestratorRequest, OrchestratorResponse from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ DurableClient from typing import Callable, Optional from typing import Union -from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel, OrchestrationContext +from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel + +# TODO: Use __init__.py to optimize imports +from durabletask.azurefunctions.client import DurableFunctionsClient +from durabletask.azurefunctions.worker import DurableFunctionsWorker +from durabletask.azurefunctions.internal.azurefunctions_null_stub import AzureFunctionsNullStub class Blueprint(TriggerApi, BindingApi): @@ -37,9 +46,6 @@ def __init__(self, def _configure_orchestrator_callable(self, wrap) -> Callable: """Obtain decorator to construct an Orchestrator class from a user-defined Function. - In the old programming model, this decorator's logic was unavoidable boilerplate - in user-code. Now, this is handled internally by the framework. - Parameters ---------- wrap: Callable @@ -54,14 +60,31 @@ def _configure_orchestrator_callable(self, wrap) -> Callable: def decorator(orchestrator_func): # Construct an orchestrator based on the end-user code - # TODO: Extract this logic (?) - def handle(context: OrchestrationContext) -> str: + # TODO: Move this logic somewhere better + def handle(context) -> str: context_body = getattr(context, "body", None) if context_body is None: context_body = context orchestration_context = context_body - # TODO: Run the orchestration using the context - return "" + request = OrchestratorRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub = AzureFunctionsNullStub() + worker = DurableFunctionsWorker() + response: Optional[OrchestratorResponse] = None + + def stub_complete(stub_response): + nonlocal response + response = stub_response + stub.CompleteOrchestratorTask = stub_complete + execution_started_events = [e for e in [e1 for e1 in request.newEvents] + [e2 for e2 in request.pastEvents] if e.HasField("executionStarted")] + function_name = execution_started_events[-1].executionStarted.name + worker.add_named_orchestrator(function_name, orchestrator_func) + worker._execute_orchestrator(request, stub, None) + + if response is None: + raise Exception("Orchestrator execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"' handle.orchestrator_function = orchestrator_func @@ -71,6 +94,55 @@ def handle(context: OrchestrationContext) -> str: return decorator + def _configure_entity_callable(self, wrap) -> Callable: + """Obtain decorator to construct an Entity class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Entity class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(entity_func): + # TODO: Implement entity support - similar to orchestrators (?) + raise NotImplementedError() + + return decorator + + def _add_rich_client(self, fb, parameter_name, + client_constructor): + # Obtain user-code and force type annotation on the client-binding parameter to be `str`. + # This ensures a passing type-check of that specific parameter, + # circumventing a limitation of the worker in type-checking rich DF Client objects. + # TODO: Once rich-binding type checking is possible, remove the annotation change. + user_code = fb._function._func + user_code.__annotations__[parameter_name] = str + + # `wraps` This ensures we re-export the same method-signature as the decorated method + @wraps(user_code) + async def df_client_middleware(*args, **kwargs): + + # Obtain JSON-string currently passed as DF Client, + # construct rich object from it, + # and assign parameter to that rich object + starter = kwargs[parameter_name] + client = client_constructor(starter) + kwargs[parameter_name] = client + + # Invoke user code with rich DF Client binding + return await user_code(*args, **kwargs) + + # TODO: Is there a better way to support retrieving the unwrapped user code? + df_client_middleware.client_function = fb._function._func # type: ignore + + user_code_with_rich_client = df_client_middleware + fb._function._func = user_code_with_rich_client + def orchestration_trigger(self, context_name: str, orchestration: Optional[str] = None): """Register an Orchestrator Function. @@ -133,6 +205,7 @@ def entity_trigger(self, context_name: str, Name of Entity Function. The value is None by default, in which case the name of the method is used. """ + @self._configure_entity_callable @self._configure_function_builder def wrap(fb): def decorator(): @@ -171,7 +244,7 @@ def durable_client_input(self, @self._configure_function_builder def wrap(fb): def decorator(): - # self._add_rich_client(fb, client_name, DurableOrchestrationClient) + self._add_rich_client(fb, client_name, DurableFunctionsClient) fb.add_binding( binding=DurableClient(name=client_name, diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py deleted file mode 100644 index 4286967..0000000 --- a/durabletask-azurefunctions/durabletask/azurefunctions/internal/DurableClientConverter.py +++ /dev/null @@ -1,46 +0,0 @@ -import abc -from typing import Any, Optional - -from azure.functions import meta - - -class DurableInConverter(meta._BaseConverter, binding=None): - - @classmethod - @abc.abstractmethod - def check_input_type_annotation(cls, pytype: type) -> bool: - pass - - @classmethod - @abc.abstractmethod - def decode(cls, data: meta.Datum, *, trigger_metadata) -> Any: - raise NotImplementedError - - @classmethod - @abc.abstractmethod - def has_implicit_output(cls) -> bool: - return False - - -class DurableOutConverter(meta._BaseConverter, binding=None): - - @classmethod - @abc.abstractmethod - def check_output_type_annotation(cls, pytype: type) -> bool: - pass - - @classmethod - @abc.abstractmethod - def encode(cls, obj: Any, *, - expected_type: Optional[type]) -> Optional[meta.Datum]: - raise NotImplementedError - -# Durable Functions Durable Client Bindings - - -class DurableClientConverter(DurableInConverter, - DurableOutConverter, - binding='durableClient'): - @classmethod - def has_implicit_output(cls) -> bool: - return False diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py index d5823cf..e69de29 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py @@ -1,3 +0,0 @@ -from .DurableClientConverter import DurableClientConverter - -__all__ = ["DurableClientConverter"] diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py new file mode 100644 index 0000000..a457a5e --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from importlib.metadata import version + +from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl + + +class AzureFunctionsDefaultClientInterceptorImpl (DefaultClientInterceptorImpl): + """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an + interceptor to add additional headers to all calls as needed.""" + required_query_string_parameters: str + + def __init__(self, taskhub_name: str, required_query_string_parameters: str): + self.required_query_string_parameters = required_query_string_parameters + try: + # Get the version of the azurefunctions package + sdk_version = version('durabletask-azurefunctions') + except Exception: + # Fallback if version cannot be determined + sdk_version = "unknown" + user_agent = f"durabletask-python/{sdk_version}" + self._metadata = [ + ("taskhub", taskhub_name), + ("x-user-agent", user_agent)] # 'user-agent' is a reserved header in grpc, so we use 'x-user-agent' instead + super().__init__(self._metadata) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py new file mode 100644 index 0000000..18b0116 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py @@ -0,0 +1,39 @@ + +from durabletask.internal.ProtoTaskHubSidecarServiceStub import ProtoTaskHubSidecarServiceStub + + +class AzureFunctionsNullStub(ProtoTaskHubSidecarServiceStub): + """Missing associated documentation comment in .proto file.""" + + def __init__(self): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Hello = lambda *args, **kwargs: None + self.StartInstance = lambda *args, **kwargs: None + self.GetInstance = lambda *args, **kwargs: None + self.RewindInstance = lambda *args, **kwargs: None + self.WaitForInstanceStart = lambda *args, **kwargs: None + self.WaitForInstanceCompletion = lambda *args, **kwargs: None + self.RaiseEvent = lambda *args, **kwargs: None + self.TerminateInstance = lambda *args, **kwargs: None + self.SuspendInstance = lambda *args, **kwargs: None + self.ResumeInstance = lambda *args, **kwargs: None + self.QueryInstances = lambda *args, **kwargs: None + self.PurgeInstances = lambda *args, **kwargs: None + self.GetWorkItems = lambda *args, **kwargs: None + self.CompleteActivityTask = lambda *args, **kwargs: None + self.CompleteOrchestratorTask = lambda *args, **kwargs: None + self.CompleteEntityTask = lambda *args, **kwargs: None + self.StreamInstanceHistory = lambda *args, **kwargs: None + self.CreateTaskHub = lambda *args, **kwargs: None + self.DeleteTaskHub = lambda *args, **kwargs: None + self.SignalEntity = lambda *args, **kwargs: None + self.GetEntity = lambda *args, **kwargs: None + self.QueryEntities = lambda *args, **kwargs: None + self.CleanEntityStorage = lambda *args, **kwargs: None + self.AbandonTaskActivityWorkItem = lambda *args, **kwargs: None + self.AbandonTaskOrchestratorWorkItem = lambda *args, **kwargs: None + self.AbandonTaskEntityWorkItem = lambda *args, **kwargs: None diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/worker.py b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py index a176672..a3e8223 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/worker.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py @@ -1,2 +1,32 @@ -class TempClass: - pass +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from threading import Event +from durabletask.worker import _Registry, ConcurrencyOptions +from durabletask.internal import shared +from durabletask.worker import TaskHubGrpcWorker + + +# Worker class used for Durable Task Scheduler (DTS) +class DurableFunctionsWorker(TaskHubGrpcWorker): + """TOOD: Docs + """ + + def __init__(self): + # Don't call the parent constructor - we don't actually want to start an AsyncWorkerLoop + # or recieve work items from anywhere but the method that is creating this worker + self._registry = _Registry() + self._host_address = "" + self._logger = shared.get_logger("worker") + self._shutdown = Event() + self._is_running = False + self._secure_channel = False + + self._concurrency_options = ConcurrencyOptions() + + self._interceptors = None + + def add_named_orchestrator(self, name: str, func): + """TOOD: Docs + """ + self._registry.add_named_orchestrator(name, func) diff --git a/durabletask/internal/ProtoTaskHubSidecarServiceStub.py b/durabletask/internal/ProtoTaskHubSidecarServiceStub.py new file mode 100644 index 0000000..9500b96 --- /dev/null +++ b/durabletask/internal/ProtoTaskHubSidecarServiceStub.py @@ -0,0 +1,35 @@ +from typing import Any, Callable + + +class ProtoTaskHubSidecarServiceStub(object): + """TODO: Docs""" + + def __init__(self): + """Constructor. + """ + self.Hello: Callable[..., None] + self.StartInstance: Callable[..., None] + self.GetInstance: Callable[..., None] + self.RewindInstance: Callable[..., None] + self.WaitForInstanceStart: Callable[..., None] + self.WaitForInstanceCompletion: Callable[..., None] + self.RaiseEvent: Callable[..., None] + self.TerminateInstance: Callable[..., None] + self.SuspendInstance: Callable[..., None] + self.ResumeInstance: Callable[..., None] + self.QueryInstances: Callable[..., None] + self.PurgeInstances: Callable[..., None] + self.GetWorkItems: Callable[..., None] + self.CompleteActivityTask: Callable[..., None] + self.CompleteOrchestratorTask: Callable[..., None] + self.CompleteEntityTask: Callable[..., None] + self.StreamInstanceHistory: Callable[..., None] + self.CreateTaskHub: Callable[..., None] + self.DeleteTaskHub: Callable[..., None] + self.SignalEntity: Callable[..., None] + self.GetEntity: Callable[..., None] + self.QueryEntities: Callable[..., None] + self.CleanEntityStorage: Callable[..., None] + self.AbandonTaskActivityWorkItem: Callable[..., None] + self.AbandonTaskOrchestratorWorkItem: Callable[..., None] + self.AbandonTaskEntityWorkItem: Callable[..., None] diff --git a/durabletask/worker.py b/durabletask/worker.py index 09f6559..f9f5f8d 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -19,6 +19,7 @@ from google.protobuf import empty_pb2 from durabletask.internal import helpers +from durabletask.internal.ProtoTaskHubSidecarServiceStub import ProtoTaskHubSidecarServiceStub from durabletask.internal.entity_state_shim import StateShim from durabletask.internal.helpers import new_timestamp from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext @@ -625,7 +626,7 @@ def stop(self): def _execute_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): try: @@ -1689,6 +1690,9 @@ def process_event( self._logger.info(f"{ctx.instance_id}: Entity operation failed.") self._logger.info(f"Data: {json.dumps(event.entityOperationFailed)}") pass + elif event.HasField("orchestratorCompleted"): + # Added in Functions only (for some reason) and does not affect orchestrator flow + pass else: eventType = event.WhichOneof("eventType") raise task.OrchestrationStateError( From af0e3c2bc2580799dbcb1cdb9fd74bd398975a72 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 21 Nov 2025 14:36:16 -0700 Subject: [PATCH 3/4] Nitpicks and cleanup --- .../durabletask/azurefunctions/__init__.py | 2 ++ .../azurefunctions/decorators/durable_app.py | 12 +++++++++++- .../internal/azurefunctions_null_stub.py | 7 +++---- .../durabletask/azurefunctions/worker.py | 4 ++-- .../internal/ProtoTaskHubSidecarServiceStub.py | 7 +++++-- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py index e69de29..59e481e 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py index 59ccc01..d4ae41c 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. + import base64 from functools import wraps @@ -76,7 +77,16 @@ def stub_complete(stub_response): nonlocal response response = stub_response stub.CompleteOrchestratorTask = stub_complete - execution_started_events = [e for e in [e1 for e1 in request.newEvents] + [e2 for e2 in request.pastEvents] if e.HasField("executionStarted")] + execution_started_events = [] + for e in request.pastEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + for e in request.newEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + if len(execution_started_events) == 0: + raise Exception("No ExecutionStarted event found in orchestration request.") + function_name = execution_started_events[-1].executionStarted.name worker.add_named_orchestrator(function_name, orchestrator_func) worker._execute_orchestrator(request, stub, None) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py index 18b0116..47a0ce7 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py @@ -1,15 +1,14 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. from durabletask.internal.ProtoTaskHubSidecarServiceStub import ProtoTaskHubSidecarServiceStub class AzureFunctionsNullStub(ProtoTaskHubSidecarServiceStub): - """Missing associated documentation comment in .proto file.""" + """A task hub sidecar stub class that implements all methods as no-ops.""" def __init__(self): """Constructor. - - Args: - channel: A grpc.Channel. """ self.Hello = lambda *args, **kwargs: None self.StartInstance = lambda *args, **kwargs: None diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/worker.py b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py index a3e8223..8b4aca3 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/worker.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py @@ -9,7 +9,7 @@ # Worker class used for Durable Task Scheduler (DTS) class DurableFunctionsWorker(TaskHubGrpcWorker): - """TOOD: Docs + """TODO: Docs """ def __init__(self): @@ -27,6 +27,6 @@ def __init__(self): self._interceptors = None def add_named_orchestrator(self, name: str, func): - """TOOD: Docs + """TODO: Docs """ self._registry.add_named_orchestrator(name, func) diff --git a/durabletask/internal/ProtoTaskHubSidecarServiceStub.py b/durabletask/internal/ProtoTaskHubSidecarServiceStub.py index 9500b96..7ccfd58 100644 --- a/durabletask/internal/ProtoTaskHubSidecarServiceStub.py +++ b/durabletask/internal/ProtoTaskHubSidecarServiceStub.py @@ -1,8 +1,11 @@ -from typing import Any, Callable +from typing import Callable class ProtoTaskHubSidecarServiceStub(object): - """TODO: Docs""" + """A stub class roughly matching the TaskHubSidecarServiceStub generated from the .proto file. + Used by Azure Functions during orchestration and entity executions to inject custom behavior, + as no real sidecar stub is available. + """ def __init__(self): """Constructor. From 349714882fecdb6c5468309bb6ba62a383c835f7 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 21 Nov 2025 15:11:54 -0700 Subject: [PATCH 4/4] Save-all nits --- .../durabletask/azurefunctions/constants.py | 3 +++ .../durabletask/azurefunctions/decorators/__init__.py | 2 +- .../durabletask/azurefunctions/decorators/metadata.py | 1 + .../durabletask/azurefunctions/internal/__init__.py | 2 ++ .../azurefunctions/internal/azurefunctions_grpc_interceptor.py | 2 +- 5 files changed, 8 insertions(+), 2 deletions(-) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/constants.py b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py index 652afca..f647e31 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/constants.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + """Constants used to determine the local running context.""" # TODO: Remove unused constants after module is complete DEFAULT_LOCAL_HOST: str = 'localhost:7071' diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py index f3cfb91..59283ba 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py @@ -8,4 +8,4 @@ __all__ = ["durable_app", "metadata"] -PACKAGE_NAME = "durabletask.entities" +PACKAGE_NAME = "durabletask.azurefunctions.decorators" diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py index 4bf1d6c..30dc6ff 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. + from typing import Optional from durabletask.azurefunctions.constants import ORCHESTRATION_TRIGGER, \ diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py index e69de29..59e481e 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py index a457a5e..8736bf6 100644 --- a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py @@ -6,7 +6,7 @@ from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl -class AzureFunctionsDefaultClientInterceptorImpl (DefaultClientInterceptorImpl): +class AzureFunctionsDefaultClientInterceptorImpl(DefaultClientInterceptorImpl): """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an interceptor to add additional headers to all calls as needed."""