-
Notifications
You must be signed in to change notification settings - Fork 20
Andystaples/add functions support #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
7ff1525
552a2dd
af0e3c2
86ee081
3497148
57de878
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| # Changelog | ||
|
|
||
| All notable changes to this project will be documented in this file. | ||
|
|
||
| The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), | ||
| and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). | ||
|
|
||
| ## v0.1.0 | ||
|
|
||
| - Initial implementation |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| import json | ||
|
|
||
| from datetime import timedelta | ||
| from typing import Any, Optional | ||
| import azure.functions as func | ||
|
|
||
| from durabletask.entities import EntityInstanceId | ||
| from durabletask.client import TaskHubGrpcClient | ||
| from durabletask.azurefunctions.internal.azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl | ||
|
|
||
|
|
||
| # Client class used for Durable Functions | ||
| class DurableFunctionsClient(TaskHubGrpcClient): | ||
| taskHubName: str | ||
| connectionName: str | ||
| creationUrls: dict[str, str] | ||
| managementUrls: dict[str, str] | ||
| baseUrl: str | ||
| requiredQueryStringParameters: str | ||
| rpcBaseUrl: str | ||
| httpBaseUrl: str | ||
| maxGrpcMessageSizeInBytes: int | ||
| grpcHttpClientTimeout: timedelta | ||
|
|
||
| def __init__(self, client_as_string: str): | ||
| client = json.loads(client_as_string) | ||
|
|
||
| self.taskHubName = client.get("taskHubName", "") | ||
| self.connectionName = client.get("connectionName", "") | ||
| self.creationUrls = client.get("creationUrls", {}) | ||
| self.managementUrls = client.get("managementUrls", {}) | ||
| self.baseUrl = client.get("baseUrl", "") | ||
| self.requiredQueryStringParameters = client.get("requiredQueryStringParameters", "") | ||
| self.rpcBaseUrl = client.get("rpcBaseUrl", "") | ||
| self.httpBaseUrl = client.get("httpBaseUrl", "") | ||
| self.maxGrpcMessageSizeInBytes = client.get("maxGrpcMessageSizeInBytes", 0) | ||
| # TODO: convert the string value back to timedelta - annoying regex? | ||
| self.grpcHttpClientTimeout = client.get("grpcHttpClientTimeout", timedelta(seconds=30)) | ||
andystaples marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| interceptors = [AzureFunctionsDefaultClientInterceptorImpl(self.taskHubName, self.requiredQueryStringParameters)] | ||
|
|
||
| # We pass in None for the metadata so we don't construct an additional interceptor in the parent class | ||
| # Since the parent class doesn't use anything metadata for anything else, we can set it as None | ||
| super().__init__( | ||
| host_address=self.rpcBaseUrl, | ||
| secure_channel=False, | ||
| metadata=None, | ||
| interceptors=interceptors) | ||
|
|
||
| def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: | ||
| """Creates an HTTP response for checking the status of a Durable Function instance. | ||
|
|
||
| Args: | ||
| request (func.HttpRequest): The incoming HTTP request. | ||
| instance_id (str): The ID of the Durable Function instance. | ||
| """ | ||
| raise NotImplementedError("This method is not implemented yet.") | ||
|
|
||
| def create_http_management_payload(self, instance_id: str) -> dict[str, str]: | ||
|
||
| """Creates an HTTP management payload for a Durable Function instance. | ||
|
|
||
| Args: | ||
| instance_id (str): The ID of the Durable Function instance. | ||
| """ | ||
| raise NotImplementedError("This method is not implemented yet.") | ||
|
|
||
| def read_entity_state( | ||
| self, | ||
| entity_id: EntityInstanceId, | ||
| task_hub_name: Optional[str], | ||
| connection_name: Optional[str] | ||
| ) -> tuple[bool, Any]: | ||
| """Reads the state of a Durable Entity. | ||
|
|
||
| Args: | ||
| entity_id (str): The ID of the Durable Entity. | ||
| task_hub_name (Optional[str]): The name of the task hub. | ||
| connection_name (Optional[str]): The name of the connection. | ||
|
|
||
| Returns: | ||
| (bool, Any): A tuple containing a boolean indicating if the entity exists and its state. | ||
| """ | ||
| raise NotImplementedError("This method is not implemented yet.") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| """Constants used to determine the local running context.""" | ||
andystaples marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # TODO: Remove unused constants after module is complete | ||
| DEFAULT_LOCAL_HOST: str = 'localhost:7071' | ||
| DEFAULT_LOCAL_ORIGIN: str = f'http://{DEFAULT_LOCAL_HOST}' | ||
| DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' | ||
| HTTP_ACTION_NAME = 'BuiltIn::HttpActivity' | ||
| ORCHESTRATION_TRIGGER = "orchestrationTrigger" | ||
| ACTIVITY_TRIGGER = "activityTrigger" | ||
| ENTITY_TRIGGER = "entityTrigger" | ||
| DURABLE_CLIENT = "durableClient" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| """Durable Task SDK for Python entities component""" | ||
|
|
||
| import durabletask.azurefunctions.decorators.durable_app as durable_app | ||
| import durabletask.azurefunctions.decorators.metadata as metadata | ||
|
|
||
| __all__ = ["durable_app", "metadata"] | ||
|
|
||
| PACKAGE_NAME = "durabletask.entities" | ||
andystaples marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,266 @@ | ||
| # Copyright (c) Microsoft Corporation. All rights reserved. | ||
| # Licensed under the MIT License. | ||
| import base64 | ||
| from functools import wraps | ||
|
|
||
| from durabletask.internal.orchestrator_service_pb2 import OrchestratorRequest, OrchestratorResponse | ||
| from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ | ||
| DurableClient | ||
| from typing import Callable, Optional | ||
| from typing import Union | ||
| from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel | ||
|
|
||
| # TODO: Use __init__.py to optimize imports | ||
| from durabletask.azurefunctions.client import DurableFunctionsClient | ||
| from durabletask.azurefunctions.worker import DurableFunctionsWorker | ||
| from durabletask.azurefunctions.internal.azurefunctions_null_stub import AzureFunctionsNullStub | ||
|
|
||
|
|
||
| class Blueprint(TriggerApi, BindingApi): | ||
| """Durable Functions (DF) Blueprint container. | ||
|
|
||
| It allows functions to be declared via trigger and binding decorators, | ||
| but does not automatically index/register these functions. | ||
|
|
||
| To register these functions, utilize the `register_functions` method from any | ||
| :class:`FunctionRegister` subclass, such as `DFApp`. | ||
| """ | ||
|
|
||
| def __init__(self, | ||
| http_auth_level: Union[AuthLevel, str] = AuthLevel.FUNCTION): | ||
| """Instantiate a Durable Functions app with which to register Functions. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| http_auth_level: Union[AuthLevel, str] | ||
| Authorization level required for Function invocation. | ||
| Defaults to AuthLevel.Function. | ||
|
|
||
| Returns | ||
| ------- | ||
| DFApp | ||
| New instance of a Durable Functions app | ||
| """ | ||
| super().__init__(auth_level=http_auth_level) | ||
|
|
||
| def _configure_orchestrator_callable(self, wrap) -> Callable: | ||
| """Obtain decorator to construct an Orchestrator class from a user-defined Function. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| wrap: Callable | ||
| The next decorator to be applied. | ||
|
|
||
| Returns | ||
| ------- | ||
| Callable | ||
| The function to construct an Orchestrator class from the user-defined Function, | ||
| wrapped by the next decorator in the sequence. | ||
| """ | ||
| def decorator(orchestrator_func): | ||
| # Construct an orchestrator based on the end-user code | ||
|
|
||
| # TODO: Move this logic somewhere better | ||
| def handle(context) -> str: | ||
| context_body = getattr(context, "body", None) | ||
| if context_body is None: | ||
| context_body = context | ||
| orchestration_context = context_body | ||
| request = OrchestratorRequest() | ||
| request.ParseFromString(base64.b64decode(orchestration_context)) | ||
| stub = AzureFunctionsNullStub() | ||
| worker = DurableFunctionsWorker() | ||
| response: Optional[OrchestratorResponse] = None | ||
|
|
||
| def stub_complete(stub_response): | ||
| nonlocal response | ||
| response = stub_response | ||
| stub.CompleteOrchestratorTask = stub_complete | ||
|
Comment on lines
+72
to
+79
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of this is probably optimizable - do we really need to create a new stub and worker for each call? Can they be saved? Will look into this more at some point |
||
| execution_started_events = [e for e in [e1 for e1 in request.newEvents] + [e2 for e2 in request.pastEvents] if e.HasField("executionStarted")] | ||
andystaples marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
andystaples marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| function_name = execution_started_events[-1].executionStarted.name | ||
| worker.add_named_orchestrator(function_name, orchestrator_func) | ||
| worker._execute_orchestrator(request, stub, None) | ||
|
|
||
| if response is None: | ||
| raise Exception("Orchestrator execution did not produce a response.") | ||
| # The Python worker returns the input as type "json", so double-encoding is necessary | ||
| return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"' | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Victoria - Currently, the return value from here is passed on to the host as type "json" so the host attempts to Newtonsoft deserialize it back into an object before handing back to the Durable middleware for final decoding. This breaks, unless I double-encode with quotes as above. Is there a way to communicate to the worker that this is a plain string instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Investigating this - will need a little more time to test on my end |
||
|
|
||
| handle.orchestrator_function = orchestrator_func | ||
|
|
||
| # invoke next decorator, with the Orchestrator as input | ||
| handle.__name__ = orchestrator_func.__name__ | ||
| return wrap(handle) | ||
|
|
||
| return decorator | ||
|
|
||
| def _configure_entity_callable(self, wrap) -> Callable: | ||
| """Obtain decorator to construct an Entity class from a user-defined Function. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| wrap: Callable | ||
| The next decorator to be applied. | ||
|
|
||
| Returns | ||
| ------- | ||
| Callable | ||
| The function to construct an Entity class from the user-defined Function, | ||
| wrapped by the next decorator in the sequence. | ||
| """ | ||
| def decorator(entity_func): | ||
| # TODO: Implement entity support - similar to orchestrators (?) | ||
| raise NotImplementedError() | ||
|
|
||
| return decorator | ||
|
|
||
| def _add_rich_client(self, fb, parameter_name, | ||
| client_constructor): | ||
| # Obtain user-code and force type annotation on the client-binding parameter to be `str`. | ||
| # This ensures a passing type-check of that specific parameter, | ||
| # circumventing a limitation of the worker in type-checking rich DF Client objects. | ||
| # TODO: Once rich-binding type checking is possible, remove the annotation change. | ||
| user_code = fb._function._func | ||
| user_code.__annotations__[parameter_name] = str | ||
|
Comment on lines
+170
to
+175
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Victoria - this is the same approach taken by the existing Durable Python SDK for the DurableClient binding - we force the annotation to be "str" so the worker takes a path that does not attempt to use the DurableClientConverter input parameter converter, which would throw NotImplementedError Do you think it is worth moving the client_constructor logic in this PR into the DurableClientConverter in the -library, so that we don't have to do this type-hacking stuff?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'd have to figure out how to detect which underlying provider for the durable_client_input binding is being used to know when to simply return the string for the old SDK vs parse it in the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main issue would be that we'd have something different to return based on the durable library. Are the types going to be the same? (eg DurableClient for both packages) We could look at creating two separate converters - right now it's using the Generic converter, but it would be better to have our own |
||
|
|
||
| # `wraps` This ensures we re-export the same method-signature as the decorated method | ||
| @wraps(user_code) | ||
| async def df_client_middleware(*args, **kwargs): | ||
|
|
||
| # Obtain JSON-string currently passed as DF Client, | ||
| # construct rich object from it, | ||
| # and assign parameter to that rich object | ||
| starter = kwargs[parameter_name] | ||
| client = client_constructor(starter) | ||
| kwargs[parameter_name] = client | ||
|
|
||
| # Invoke user code with rich DF Client binding | ||
| return await user_code(*args, **kwargs) | ||
|
|
||
| # TODO: Is there a better way to support retrieving the unwrapped user code? | ||
| df_client_middleware.client_function = fb._function._func # type: ignore | ||
|
Comment on lines
+191
to
+192
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Victoria - not sure if you remember this context from a while back, but this is also carryover from the previous SDK - I added this line to make retrieving the "unwrapped" user code possible for the unit testing scenario - see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I vaguely remember context, but we can sync again over specific requirements. |
||
|
|
||
| user_code_with_rich_client = df_client_middleware | ||
| fb._function._func = user_code_with_rich_client | ||
|
|
||
| def orchestration_trigger(self, context_name: str, | ||
| orchestration: Optional[str] = None): | ||
| """Register an Orchestrator Function. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| context_name: str | ||
| Parameter name of the DurableOrchestrationContext object. | ||
| orchestration: Optional[str] | ||
| Name of Orchestrator Function. | ||
| The value is None by default, in which case the name of the method is used. | ||
| """ | ||
| @self._configure_orchestrator_callable | ||
| @self._configure_function_builder | ||
| def wrap(fb): | ||
|
|
||
| def decorator(): | ||
| fb.add_trigger( | ||
| trigger=OrchestrationTrigger(name=context_name, | ||
| orchestration=orchestration)) | ||
| return fb | ||
|
|
||
| return decorator() | ||
|
|
||
| return wrap | ||
|
|
||
| def activity_trigger(self, input_name: str, | ||
| activity: Optional[str] = None): | ||
| """Register an Activity Function. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| input_name: str | ||
| Parameter name of the Activity input. | ||
| activity: Optional[str] | ||
| Name of Activity Function. | ||
| The value is None by default, in which case the name of the method is used. | ||
| """ | ||
| @self._configure_function_builder | ||
| def wrap(fb): | ||
| def decorator(): | ||
| fb.add_trigger( | ||
| trigger=ActivityTrigger(name=input_name, | ||
| activity=activity)) | ||
| return fb | ||
|
|
||
| return decorator() | ||
|
|
||
| return wrap | ||
|
|
||
| def entity_trigger(self, context_name: str, | ||
| entity_name: Optional[str] = None): | ||
| """Register an Entity Function. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| context_name: str | ||
| Parameter name of the Entity input. | ||
| entity_name: Optional[str] | ||
| Name of Entity Function. | ||
| The value is None by default, in which case the name of the method is used. | ||
| """ | ||
| @self._configure_entity_callable | ||
| @self._configure_function_builder | ||
| def wrap(fb): | ||
| def decorator(): | ||
| fb.add_trigger( | ||
| trigger=EntityTrigger(name=context_name, | ||
| entity_name=entity_name)) | ||
| return fb | ||
|
|
||
| return decorator() | ||
|
|
||
| return wrap | ||
|
|
||
| def durable_client_input(self, | ||
| client_name: str, | ||
| task_hub: Optional[str] = None, | ||
| connection_name: Optional[str] = None | ||
| ): | ||
| """Register a Durable-client Function. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| client_name: str | ||
| Parameter name of durable client. | ||
| task_hub: Optional[str] | ||
| Used in scenarios where multiple function apps share the same storage account | ||
| but need to be isolated from each other. If not specified, the default value | ||
| from host.json is used. | ||
| This value must match the value used by the target orchestrator functions. | ||
| connection_name: Optional[str] | ||
| The name of an app setting that contains a storage account connection string. | ||
| The storage account represented by this connection string must be the same one | ||
| used by the target orchestrator functions. If not specified, the default storage | ||
| account connection string for the function app is used. | ||
| """ | ||
|
|
||
| @self._configure_function_builder | ||
| def wrap(fb): | ||
| def decorator(): | ||
| self._add_rich_client(fb, client_name, DurableFunctionsClient) | ||
|
|
||
| fb.add_binding( | ||
| binding=DurableClient(name=client_name, | ||
| task_hub=task_hub, | ||
| connection_name=connection_name)) | ||
| return fb | ||
|
|
||
| return decorator() | ||
|
|
||
| return wrap | ||
|
|
||
|
|
||
| class DFApp(Blueprint, FunctionRegister): | ||
| """Durable Functions (DF) app. | ||
|
|
||
| Exports the decorators required to declare and index DF Function-types. | ||
| """ | ||
|
|
||
| pass | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Potential compatibility issue with type hint syntax. The use of
dict[str, str](PEP 585 style) requires Python 3.9+. Whilepyproject.tomlspecifiesrequires-python = ">=3.9", consider whether this is the intended minimum version or ifDict[str, str]fromtypingshould be used for broader compatibility.