Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
* [mutex](mutex) - lock a particular resource within a namespace between workflows.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
31 changes: 31 additions & 0 deletions mutex/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Mutex

This mutex workflow demos an ability to lock/unlock a particular resource within a particular Temporal namespace
so that other workflows within the same namespace would wait until a resource lock is released. This is useful
when we want to avoid race conditions or parallel mutually exclusive operations on the same resource.

One way of coordinating parallel processing is to use Temporal signals with [`start_signal`](https://docs.temporal.io/develop/python/message-passing#signal-with-start) and
make sure signals are getting processed sequentially, however the logic might become too complex if we
need to lock two or more resources at the same time. Mutex workflow pattern can simplify that.

This example enqueues two long running `SampleWorkflowWithMutex` workflows in parallel. And each of the workflows has a mutex section (lasting 2 seconds in this example).
When `SampleWorkflowWithMutex` reaches the mutex section, it starts a mutex workflow via local activity, and blocks until
`acquire-lock-event` is received. Once `acquire-lock-event` is received, it enters critical section,
and finally releases the lock once processing is over by sending `release_lock` a signal to the `MutexWorkflow`.

## Run this sample


To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

uv run worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflows:

uv run starter.py

This will start a worker to run your workflow and activities, then start two SampleWorkflowWithMutex in parallel, both locking on the same ressource.

The starter terminal should complete with the workflows IDs and the worker terminal should show the logs with the locking.

117 changes: 117 additions & 0 deletions mutex/mutexworkflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from dataclasses import dataclass
from temporalio.client import Client
from temporalio import activity, workflow
import asyncio
from temporalio.exceptions import ApplicationError

LOCK_ACQUIRED_SIGNAL_NAME = "acquire-lock-event"


@dataclass
class SignalWithStartMutexWorkflowInput:
namespace: str
resource_id: str
unlock_timeout_seconds: float
sender_workflow_id: str


@dataclass
class SignalWithStartMutexWorkflowResult:
workflow_id: str


@dataclass
class MutexWorkflowInput:
namespace: str
resource_id: str
unlock_timeout_seconds: float


@activity.defn
async def signal_with_start_mutex_workflow(
params: SignalWithStartMutexWorkflowInput,
) -> SignalWithStartMutexWorkflowResult:
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")

workflow_id = f"mutex:{params.namespace}:{params.resource_id}"

# Sends a signal to the workflow (and starts it if needed)
wf_input = MutexWorkflowInput(
namespace=params.namespace,
resource_id=params.resource_id,
unlock_timeout_seconds=params.unlock_timeout_seconds,
)
await client.start_workflow(
workflow=MutexWorkflow.run,
arg=wf_input,
id=workflow_id,
task_queue="mutex-task-queue",
start_signal="request_lock",
start_signal_args=[params.sender_workflow_id],
)
return SignalWithStartMutexWorkflowResult(workflow_id=workflow_id)


def generate_unlock_token(sender_workflow_id: str) -> str:
return f"unlock-event-{sender_workflow_id}"


@workflow.defn
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may not want to blind-port this from Go but have a more Pythonic way to do a mutex and we want to make sure workflows are in their own files separate from activities/models. For instance, the .NET version of this sample at https://github.com/temporalio/samples-dotnet/tree/main/src/Mutex made an idiomatic WorkflowMutex class for general purpose use. I think this sample should consider a workflow-backed Lock class that looks almost exactly like https://docs.python.org/3/library/asyncio-sync.html#asyncio.Lock and is copyable/usable as if it is a library.

class MutexWorkflow:
def __init__(self):
self._lock_requests: asyncio.Queue[str] = asyncio.Queue()
self._lock_releases: asyncio.Queue[str] = asyncio.Queue()

@workflow.run
async def run(self, params: MutexWorkflowInput) -> str:
workflow.logger.info(f"Starting mutex workflow {workflow.info().workflow_id}")
while True:
# read lock signal
if self._lock_requests.empty():
break
sender_workflow_id = self._lock_requests.get_nowait()

# send release info to origin
# TODO manage case when origin is closed
unlock_token = generate_unlock_token(sender_workflow_id)
handle = workflow.get_external_workflow_handle(sender_workflow_id)
try:
await handle.signal(LOCK_ACQUIRED_SIGNAL_NAME, unlock_token)
except ApplicationError as e:
if e.type == "ExternalWorkflowExecutionNotFound":
workflow.logger.warning(
f"Could not signal lock acquisition to caller {sender_workflow_id}: {e.message}"
)
continue
else:
raise e

# wait for release signal or timeout
try:
await workflow.wait_condition(
lambda: not self._lock_releases.empty(),
timeout=params.unlock_timeout_seconds,
)
# pop the release
# TODO check it’s the right one
if not self._lock_releases.empty():
self._lock_releases.get_nowait()

# If timeout was reached, we release the lock
except asyncio.TimeoutError:
workflow.logger.warning(
f"Workflow {sender_workflow_id} did not release the lock before timeout was reached."
)
continue
workflow.logger.info(f"Stopping mutex workflow {workflow.info().workflow_id}")

@workflow.signal
async def request_lock(self, sender_workflow_id: str):
workflow.logger.info(f"Received lock request from {sender_workflow_id}")
await self._lock_requests.put(sender_workflow_id)

@workflow.signal
async def release_lock(self, unlock_token: str):
workflow.logger.info(f"Received lock release with token {unlock_token}")
await self._lock_releases.put(unlock_token)
37 changes: 37 additions & 0 deletions mutex/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from temporalio.client import Client
import logging
from uuid import uuid4
from workflow import SampleWorkflowWithMutex
import asyncio


async def main():
# set up logging facility
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s"
)
# Start client
client = await Client.connect("localhost:7233")

resource_id = uuid4()

print("starting first workflow")
workflow_1 = client.execute_workflow(
SampleWorkflowWithMutex.run,
str(resource_id),
id="sample-workflow-with-mutex-1-workflow-id",
task_queue="mutex-task-queue",
)
print("starting second workflow")
workflow_2 = client.execute_workflow(
SampleWorkflowWithMutex.run,
str(resource_id),
id="sample-workflow-with-mutex-2-workflow-id",
task_queue="mutex-task-queue",
)
results = await asyncio.gather(workflow_1, workflow_2)
print("results:", *results)


if __name__ == "__main__":
asyncio.run(main())
37 changes: 37 additions & 0 deletions mutex/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker
from mutexworkflow import MutexWorkflow, signal_with_start_mutex_workflow
from workflow import SampleWorkflowWithMutex

# reference: https://github.com/temporalio/samples-go/blob/main/mutex/mutex_workflow.go


interrupt_event = asyncio.Event()


async def main():
# set up logging facility
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s"
)
# Start client
client = await Client.connect("localhost:7233")

# Run a worker for the workflow
async with Worker(
client,
task_queue="mutex-task-queue",
workflows=[MutexWorkflow, SampleWorkflowWithMutex],
activities=[signal_with_start_mutex_workflow],
):
# Wait until interrupted
print("Worker started")
await interrupt_event.wait()
print("Shutting down")


if __name__ == "__main__":
asyncio.run(main())
62 changes: 62 additions & 0 deletions mutex/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Callable
from datetime import timedelta
from temporalio import workflow

from mutexworkflow import (
SignalWithStartMutexWorkflowInput,
MutexWorkflow,
signal_with_start_mutex_workflow,
LOCK_ACQUIRED_SIGNAL_NAME,
)


@workflow.defn
class SampleWorkflowWithMutex:
def __init__(self):
self.unlock_token: str | None = None

@workflow.run
async def run(self, resource_id: str) -> str:
workflow.logger.info(f"Starting workflow")
# acquire lock
unlock_func = await self.lock(resource_id, timedelta(minutes=2.0))
# do critical work (mutex section)
workflow.logger.info("Doing critical work.")
await workflow.sleep(2.0)
# release lock
await unlock_func()
workflow.logger.info(f"Stopping workflow")
return workflow.info().workflow_id

async def lock(
self, resource_id: str, unlock_timeout: timedelta
) -> Callable[[], None]:
"""lock resource"""
# request a lock
params = SignalWithStartMutexWorkflowInput(
namespace=workflow.info().namespace,
sender_workflow_id=workflow.info().workflow_id,
resource_id=resource_id,
unlock_timeout_seconds=unlock_timeout.total_seconds(),
)
result = await workflow.execute_local_activity(
signal_with_start_mutex_workflow,
params,
start_to_close_timeout=timedelta(seconds=5.0),
)
# wait to acquire lock from mutex workflow
await workflow.wait_condition(lambda: self.unlock_token is not None)
unlock_token = self.unlock_token

# return function to unlock
async def unlock_function():
wf_handle = workflow.get_external_workflow_handle(
workflow_id=result.workflow_id
)
await wf_handle.signal(MutexWorkflow.release_lock, unlock_token)

return unlock_function

@workflow.signal(name=LOCK_ACQUIRED_SIGNAL_NAME)
def acquired_lock(self, unlock_token: str):
self.unlock_token = unlock_token