diff --git a/README.md b/README.md index 5c1941d18..6c809c783 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates. * [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals. * [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist. +* [mutex](mutex) - lock a particular resource within a namespace between workflows. * [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry. * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. diff --git a/mutex/README.md b/mutex/README.md new file mode 100644 index 000000000..6fa657641 --- /dev/null +++ b/mutex/README.md @@ -0,0 +1,31 @@ +# Mutex + +This mutex workflow demos an ability to lock/unlock a particular resource within a particular Temporal namespace +so that other workflows within the same namespace would wait until a resource lock is released. This is useful +when we want to avoid race conditions or parallel mutually exclusive operations on the same resource. + +One way of coordinating parallel processing is to use Temporal signals with [`start_signal`](https://docs.temporal.io/develop/python/message-passing#signal-with-start) and +make sure signals are getting processed sequentially, however the logic might become too complex if we +need to lock two or more resources at the same time. Mutex workflow pattern can simplify that. + +This example enqueues two long running `SampleWorkflowWithMutex` workflows in parallel. And each of the workflows has a mutex section (lasting 2 seconds in this example). +When `SampleWorkflowWithMutex` reaches the mutex section, it starts a mutex workflow via local activity, and blocks until +`acquire-lock-event` is received. Once `acquire-lock-event` is received, it enters critical section, +and finally releases the lock once processing is over by sending `release_lock` a signal to the `MutexWorkflow`. + +## Run this sample + + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the +worker: + + uv run worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflows: + + uv run starter.py + +This will start a worker to run your workflow and activities, then start two SampleWorkflowWithMutex in parallel, both locking on the same ressource. + +The starter terminal should complete with the workflows IDs and the worker terminal should show the logs with the locking. + diff --git a/mutex/mutexworkflow.py b/mutex/mutexworkflow.py new file mode 100644 index 000000000..542cd7e57 --- /dev/null +++ b/mutex/mutexworkflow.py @@ -0,0 +1,117 @@ +from dataclasses import dataclass +from temporalio.client import Client +from temporalio import activity, workflow +import asyncio +from temporalio.exceptions import ApplicationError + +LOCK_ACQUIRED_SIGNAL_NAME = "acquire-lock-event" + + +@dataclass +class SignalWithStartMutexWorkflowInput: + namespace: str + resource_id: str + unlock_timeout_seconds: float + sender_workflow_id: str + + +@dataclass +class SignalWithStartMutexWorkflowResult: + workflow_id: str + + +@dataclass +class MutexWorkflowInput: + namespace: str + resource_id: str + unlock_timeout_seconds: float + + +@activity.defn +async def signal_with_start_mutex_workflow( + params: SignalWithStartMutexWorkflowInput, +) -> SignalWithStartMutexWorkflowResult: + # Create client connected to server at the given address + client = await Client.connect("localhost:7233") + + workflow_id = f"mutex:{params.namespace}:{params.resource_id}" + + # Sends a signal to the workflow (and starts it if needed) + wf_input = MutexWorkflowInput( + namespace=params.namespace, + resource_id=params.resource_id, + unlock_timeout_seconds=params.unlock_timeout_seconds, + ) + await client.start_workflow( + workflow=MutexWorkflow.run, + arg=wf_input, + id=workflow_id, + task_queue="mutex-task-queue", + start_signal="request_lock", + start_signal_args=[params.sender_workflow_id], + ) + return SignalWithStartMutexWorkflowResult(workflow_id=workflow_id) + + +def generate_unlock_token(sender_workflow_id: str) -> str: + return f"unlock-event-{sender_workflow_id}" + + +@workflow.defn +class MutexWorkflow: + def __init__(self): + self._lock_requests: asyncio.Queue[str] = asyncio.Queue() + self._lock_releases: asyncio.Queue[str] = asyncio.Queue() + + @workflow.run + async def run(self, params: MutexWorkflowInput) -> str: + workflow.logger.info(f"Starting mutex workflow {workflow.info().workflow_id}") + while True: + # read lock signal + if self._lock_requests.empty(): + break + sender_workflow_id = self._lock_requests.get_nowait() + + # send release info to origin + # TODO manage case when origin is closed + unlock_token = generate_unlock_token(sender_workflow_id) + handle = workflow.get_external_workflow_handle(sender_workflow_id) + try: + await handle.signal(LOCK_ACQUIRED_SIGNAL_NAME, unlock_token) + except ApplicationError as e: + if e.type == "ExternalWorkflowExecutionNotFound": + workflow.logger.warning( + f"Could not signal lock acquisition to caller {sender_workflow_id}: {e.message}" + ) + continue + else: + raise e + + # wait for release signal or timeout + try: + await workflow.wait_condition( + lambda: not self._lock_releases.empty(), + timeout=params.unlock_timeout_seconds, + ) + # pop the release + # TODO check it’s the right one + if not self._lock_releases.empty(): + self._lock_releases.get_nowait() + + # If timeout was reached, we release the lock + except asyncio.TimeoutError: + workflow.logger.warning( + f"Workflow {sender_workflow_id} did not release the lock before timeout was reached." + ) + continue + workflow.logger.info(f"Stopping mutex workflow {workflow.info().workflow_id}") + + @workflow.signal + async def request_lock(self, sender_workflow_id: str): + workflow.logger.info(f"Received lock request from {sender_workflow_id}") + await self._lock_requests.put(sender_workflow_id) + + @workflow.signal + async def release_lock(self, unlock_token: str): + workflow.logger.info(f"Received lock release with token {unlock_token}") + await self._lock_releases.put(unlock_token) diff --git a/mutex/starter.py b/mutex/starter.py new file mode 100644 index 000000000..57d34f7ea --- /dev/null +++ b/mutex/starter.py @@ -0,0 +1,37 @@ +from temporalio.client import Client +import logging +from uuid import uuid4 +from workflow import SampleWorkflowWithMutex +import asyncio + + +async def main(): + # set up logging facility + logging.basicConfig( + level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s" + ) + # Start client + client = await Client.connect("localhost:7233") + + resource_id = uuid4() + + print("starting first workflow") + workflow_1 = client.execute_workflow( + SampleWorkflowWithMutex.run, + str(resource_id), + id="sample-workflow-with-mutex-1-workflow-id", + task_queue="mutex-task-queue", + ) + print("starting second workflow") + workflow_2 = client.execute_workflow( + SampleWorkflowWithMutex.run, + str(resource_id), + id="sample-workflow-with-mutex-2-workflow-id", + task_queue="mutex-task-queue", + ) + results = await asyncio.gather(workflow_1, workflow_2) + print("results:", *results) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/mutex/worker.py b/mutex/worker.py new file mode 100644 index 000000000..e4312c536 --- /dev/null +++ b/mutex/worker.py @@ -0,0 +1,37 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker +from mutexworkflow import MutexWorkflow, signal_with_start_mutex_workflow +from workflow import SampleWorkflowWithMutex + +# reference: https://github.com/temporalio/samples-go/blob/main/mutex/mutex_workflow.go + + +interrupt_event = asyncio.Event() + + +async def main(): + # set up logging facility + logging.basicConfig( + level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s" + ) + # Start client + client = await Client.connect("localhost:7233") + + # Run a worker for the workflow + async with Worker( + client, + task_queue="mutex-task-queue", + workflows=[MutexWorkflow, SampleWorkflowWithMutex], + activities=[signal_with_start_mutex_workflow], + ): + # Wait until interrupted + print("Worker started") + await interrupt_event.wait() + print("Shutting down") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/mutex/workflow.py b/mutex/workflow.py new file mode 100644 index 000000000..90086584d --- /dev/null +++ b/mutex/workflow.py @@ -0,0 +1,62 @@ +from typing import Callable +from datetime import timedelta +from temporalio import workflow + +from mutexworkflow import ( + SignalWithStartMutexWorkflowInput, + MutexWorkflow, + signal_with_start_mutex_workflow, + LOCK_ACQUIRED_SIGNAL_NAME, +) + + +@workflow.defn +class SampleWorkflowWithMutex: + def __init__(self): + self.unlock_token: str | None = None + + @workflow.run + async def run(self, resource_id: str) -> str: + workflow.logger.info(f"Starting workflow") + # acquire lock + unlock_func = await self.lock(resource_id, timedelta(minutes=2.0)) + # do critical work (mutex section) + workflow.logger.info("Doing critical work.") + await workflow.sleep(2.0) + # release lock + await unlock_func() + workflow.logger.info(f"Stopping workflow") + return workflow.info().workflow_id + + async def lock( + self, resource_id: str, unlock_timeout: timedelta + ) -> Callable[[], None]: + """lock resource""" + # request a lock + params = SignalWithStartMutexWorkflowInput( + namespace=workflow.info().namespace, + sender_workflow_id=workflow.info().workflow_id, + resource_id=resource_id, + unlock_timeout_seconds=unlock_timeout.total_seconds(), + ) + result = await workflow.execute_local_activity( + signal_with_start_mutex_workflow, + params, + start_to_close_timeout=timedelta(seconds=5.0), + ) + # wait to acquire lock from mutex workflow + await workflow.wait_condition(lambda: self.unlock_token is not None) + unlock_token = self.unlock_token + + # return function to unlock + async def unlock_function(): + wf_handle = workflow.get_external_workflow_handle( + workflow_id=result.workflow_id + ) + await wf_handle.signal(MutexWorkflow.release_lock, unlock_token) + + return unlock_function + + @workflow.signal(name=LOCK_ACQUIRED_SIGNAL_NAME) + def acquired_lock(self, unlock_token: str): + self.unlock_token = unlock_token