Messages - Python SDK feature guide
A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Python SDK.
Write message handlers
The code that follows is part of a working message passing sample.
Follow these guidelines when writing your message handlers:
- Message handlers are defined as methods on the Workflow class, using one of the three decorators:
@workflow.query
,@workflow.signal
, and@workflow.update
. - The parameters and return values of handlers and the main Workflow function must be serializable.
- Prefer data classes to multiple input parameters. Data class parameters allow you to add fields without changing the calling signature.
Query handlers
A Query is a synchronous operation that retrieves state from a Workflow Execution:
class Language(IntEnum):
Chinese = 1
English = 2
French = 3
@dataclass
class GetLanguagesInput:
include_unsupported: bool
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.greetings = {
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
}
@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return list(Language)
else:
return list(self.greetings)
-
The Query decorator can accept arguments. Refer to the API docs:
@workflow.query
. -
A Query handler uses
def
, notasync def
. You can't perform async operations like executing an Activity in a Query handler.
Signal handlers
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:
@dataclass
class ApproveInput:
name: str
@workflow.defn
class GreetingWorkflow:
...
@workflow.signal
def approve(self, input: ApproveInput) -> None:
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name
-
The Signal decorator can accept arguments. Refer to the API docs:
@workflow.signal
. -
The handler should not return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.
-
Signal (and Update) handlers can be
async def
. This allows you to use Activities, Child Workflows, durableasyncio.sleep
Timers,workflow.wait_condition
conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Signal and Update handlers.
Update handlers and validators
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
class Language(IntEnum):
Chinese = 1
English = 2
French = 3
@workflow.defn
class GreetingWorkflow:
...
@workflow.update
def set_language(self, language: Language) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language
@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")
-
The Update decorator can take arguments (like,
name
,dynamic
andunfinished_policy
) as described in the API reference docs forworkflow.update
. -
About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
- The SDK automatically provides a validator decorator named
@<update-handler-name>.validator
. The validator must accept the same argument types as the handler and returnNone
.
-
Accepting and rejecting Updates with validators:
- To reject an Update, raise an exception of any type in the validator.
- Without a validator, Updates are always accepted.
-
Validators and Event History:
- The
WorkflowExecutionUpdateAccepted
event is written into the History whether the acceptance was automatic or programmatic. - When a Validator raises an error, the Update is rejected and
WorkflowExecutionUpdateAccepted
won't be added to the Event History. The caller receives an "Update failed" error.
- The
-
Use
workflow.current_update_info
to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once. -
Update (and Signal) handlers can be
async def
, letting them use Activities, Child Workflows, durableasyncio.sleep
Timers,workflow.wait_condition
conditions, and more. See Async handlers and Workflow message passing for safe usage guidelines.
Send messages
To send Queries, Signals, or Updates, you call methods on a WorkflowHandle object:
-
Use start_workflow to start a Workflow and return its handle.
-
Use get_workflow_handle_for to retrieve a typed Workflow handle by its Workflow Id.
For example:
client = await Client.connect("localhost:7233")
workflow_handle = await client.start_workflow(
GreetingWorkflow.run, id="greeting-workflow-1234", task_queue="my-task-queue"
)
To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.
Send a Query
Use WorkflowHandle.query
to send a Query to a Workflow Execution:
supported_languages = await workflow_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
)
-
Sending a Query doesn’t add events to a Workflow's Event History.
-
You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.
-
A Worker must be online and polling the Task Queue to process a Query.
Send a Signal
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.
Send a Signal from a Client
Use WorkflowHandle.signal
to send a Signal:
await workflow_handle.signal(GreetingWorkflow.approve, ApproveInput(name="me"))
-
The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
-
The WorkflowExecutionSignaled Event appears in the Workflow's Event History.
Send a Signal from a Workflow
A Workflow can send a Signal to another Workflow, known as an External Signal.
You'll need a Workflow handle for the external Workflow.
Use get_external_workflow_handle_for
:
# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Signal-With-Start
Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running.
To use Signal-With-Start, call the start_workflow
method and pass the start_signal
argument with the name of your Signal:
from temporalio.client import Client
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)
Send an Update
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.
WorkflowExecutionUpdateAccepted
is added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompleted
is added to the Event History when the Worker confirms that the Update has finished.
To send an Update to a Workflow Execution, you can:
-
Call
execute_update
and wait for the Update to complete. This code fetches an Update result:previous_language = await workflow_handle.execute_update(
GreetingWorkflow.set_language, Language.Chinese
) -
Send
start_update
to receive anUpdateHandle
as soon as the Update is accepted or rejected.- Use this
UpdateHandle
later to fetch your results. async def
Update handlers normally perform long-running async activities.start_update
only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.
For example:
# Wait until the update is accepted
update_handle = await workflow_handle.start_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
# Wait until the update is completed
update_result = await update_handle.result()For more details, see the "Async handlers" section.
- Use this
To obtain an Update handle, you can:
- Use
start_update
to start an Update and return the handle, as shown in the preceding example. - Use
get_update_handle_for
to fetch a handle for an in-progress Update using the Update ID and Workflow ID.
In real-world development, sometimes you may be unable to import Workflow Definition method signatures. When you don't have access to the Workflow Definition or it isn't written in Python, you can still use non-type safe APIs and dynamic method invocation. Pass method names instead of method objects to:
Client.start_workflow
WorkflowHandle.query
WorkflowHandle.signal
WorkflowHandle.execute_update
WorkflowHandle.start_update
Use these non-type safe APIs:
Message handler patterns
This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.
For additional information, see Inject work into the main Workflow, Ensuring your messages are processed exactly once, and this sample demonstrating safe async
message handling.
Add async handlers to use await
Signal and Update handlers can be async def
as well as def
.
Using async def
allows you to use await
with Activities, Child Workflows, asyncio.sleep
Timers, workflow.wait_condition
conditions, etc.
This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await
calls.
It's essential to understand the things that could go wrong in order to use async def
handlers safely.
See Workflow message passing for guidance on safe usage of async Signal and Update handlers, the Safe message handlers sample, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.
The following code executes an Activity that makes a network call to a remote service.
It modifies the Update handler from earlier on this page, turning it into an async def
:
@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
await asyncio.sleep(0.2) # Pretend that we are calling a remote service.
greetings = {
Language.Arabic: "مرحبا بالعالم",
Language.Chinese: "你好,世界",
Language.English: "Hello, world",
Language.French: "Bonjour, monde",
Language.Hindi: "नमस्ते दुनिया",
Language.Spanish: "Hola mundo",
}
return greetings.get(to_language)
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.lock = asyncio.Lock()
...
...
@workflow.update
async def set_language(self, language: Language) -> Language:
if language not in self.greetings:
# 👉 Use a lock here to ensure that multiple calls to set_language are processed in order.
async with self.lock:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
start_to_close_timeout=timedelta(seconds=10),
)
if greeting is None:
# 👉 An update validator cannot be async, so cannot be used to check that the remote
# call_greeting_service supports the requested language. Raising ApplicationError
# will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
# added to history.
raise ApplicationError(
f"Greeting service does not support {language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
return previous_language
After updating the code to use an async def
, your Update handler can schedule an Activity and await the result.
Although an async def
Signal handler can initiate similar network tasks, using an Update handler allows the client to receive a result or error once the Activity completes.
This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.
Add wait conditions to block
Sometimes, async def
Signal or Update handlers need to meet certain conditions before they should continue.
You can use a wait condition (workflow.wait_condition
) to set a function that prevents the code from proceeding until the condition returns True
.
This is an important feature that helps you control your handler logic.
Here are two important use cases for workflow.wait_condition
:
- Waiting in a handler until it is appropriate to continue.
- Waiting in the main Workflow until all active handlers have finished.
The condition state you're waiting for can be updated by and reflect any part of the Workflow code. This includes the main Workflow method, other handlers, or child coroutines spawned by the main Workflow method, and so forth.
Use wait conditions in handlers
It's common to use a Workflow wait condition to wait until a handler should start.
You can also use wait conditions anywhere else in the handler to wait for a specific condition to become True
.
This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become True
.
Consider a ready_for_update_to_execute
method that runs before your Update handler executes.
The workflow.wait_condition
method waits until your condition is met:
@workflow.update
async def my_update(self, update_input: UpdateInput) -> str:
await workflow.wait_condition(
lambda: self.ready_for_update_to_execute(update_input)
)
Remember: Handlers can execute before the main Workflow method starts.
Ensure your handlers finish before the Workflow completes
Workflow wait conditions can ensure your handler completes before a Workflow finishes.
When your Workflow uses async def
Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result.
The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results.
Use workflow.wait_condition
and all_handlers_finished
to address this problem and allow your Workflow to end smoothly:
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
...
await workflow.wait_condition(workflow.all_handlers_finished)
return "workflow-result"
By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by passing the unfinished_policy
argument to the @workflow.signal
/ workflow.update
decorator:
@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
async def my_update(self) -> None:
...
See Finishing handlers before the Workflow completes for more information.
Use asyncio.Lock
to prevent concurrent handler execution
Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:
@workflow.defn
class MyWorkflow:
@workflow.signal
async def bad_async_handler(self):
data = await workflow.execute_activity(
fetch_data, start_to_close_timeout=timedelta(seconds=10)
)
self.x = data.x
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has self.x from one Activity execution and self.y from another.
await asyncio.sleep(1) # or await anything else
self.y = data.y
Coordinating access using asyncio.Lock
corrects this code.
Locking makes sure that only one handler instance can execute a specific section of code at any given time:
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
...
self.lock = asyncio.Lock()
...
@workflow.signal
async def safe_async_handler(self):
async with self.lock:
data = await workflow.execute_activity(
fetch_data, start_to_close_timeout=timedelta(seconds=10)
)
self.x = data.x
# ✅ OK: the scheduler may switch now to a different handler execution, or to the main workflow
# method, but no other execution of this handler can run until this execution finishes.
await asyncio.sleep(1) # or await anything else
self.y = data.y
Message handler troubleshooting
When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:
-
The client can't contact the server: You'll receive a
temporalio.service.RPCError
on which thestatus
attribute isRPCStatusCode
UNAVAILABLE
(after some retries; see theretry_config
argument toClient.connect
). -
The workflow does not exist: You'll receive an
temporalio.service.RPCError
exception on which thestatus
attribute isRPCStatusCode
NOT_FOUND
.
See Exceptions in message handlers for a non–Python-specific discussion of this topic.
Problems when sending a Signal
When using Signal, the only exception that will result from your requests during its execution is RPCError
.
All handlers may experience additional exceptions during the initial (pre-Worker) part of a handler request lifecycle.
For Queries and Updates, the client waits for a response from the Worker. If an issue occurs during the handler Execution by the Worker, the client may receive an exception.
Problems when sending an Update
When working with Updates, you may encounter these errors:
-
No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely. You can use
asyncio.timeout
to impose a timeout. This raises atemporalio.client.WorkflowUpdateRPCTimeoutOrCancelledError
exception. -
Update failed: You'll receive a
temporalio.client.WorkflowUpdateFailedError
exception. There are two ways this can happen:-
The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.
-
The Update failed after having been accepted.
Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:
- A failed Child Workflow
- A failed Activity (if the Activity retries have been set to a finite number)
- The Workflow author raising
ApplicationFailure
- Any error listed in workflow_failure_exception_types (empty by default)
-
-
The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:
- If the request hasn't been accepted by the server, you receive a
FAILED_PRECONDITION
temporalio.service.RPCError
exception. - If the request has been accepted, it is durable.
Once the Workflow is healthy again after a code deploy, use an
UpdateHandle
to fetch the Update result.
- If the request hasn't been accepted by the server, you receive a
-
The Workflow finished while the Update handler execution was in progress: You'll receive a
temporalio.service.RPCError
exception with astatus
attribute ofRPCStatusCode
NOT_FOUND
. This happens if the Workflow finished while the Update handler execution was in progress, for example because-
The Workflow was canceled or failed.
-
The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
-
Problems when sending a Query
When working with Queries, you may encounter these errors:
-
There is no Workflow Worker polling the Task Queue: You'll receive a
temporalio.service.RPCError
exception on which thestatus
attribute isRPCStatusCode
FAILED_PRECONDITION
. -
Query failed: You'll receive a
temporalio.client.WorkflowQueryFailedError
exception if something goes wrong during a Query. Any exception in a Query handler will trigger this error. This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead. -
The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.
Dynamic components
A dynamic Workflow, Activitity, Signal, Update, or Query is a kind of unnamed item. Normally, these items are registered by name with the Worker and invoked at runtime. When an unregistered or unrecognized Workflow, Activity, or message request arrives with a recognized method signature, the Worker can use a pre-registered dynamic stand-in.
For example, you might send a request to start a Workflow named "MyUnknownWorkflow". After receiving a Workflow Task, the Worker may find that there's no registered Workflow Definitions of that type. It then checks to see if there's a registered dynamic Workflow. If the dynamic Workflow signature matches the incoming Workflow signature, the Worker invokes that just as it would invoke a non-dynamic statically named version.
By registering dynamic versions of your Temporal components, the Worker can fall back to these alternate implementations for name mismatches.
Use dynamic elements judiciously and as a fallback mechanism, not a primary design. They can introduce long-term maintainability and debugging issues. Reserve dynamic invocation use for cases where a name is not or can't be known at compile time.
Set a dynamic Signal, Query, or Update handler
A dynamic Signal, Query, or Update refers to a special stand-in handler. It's used when an unregistered handler request arrives.
Consider a Signal, where you might send something like workflow.signal(MyWorkflow.my_signal_method, my_arg)
.
This is a type-safe compiler-checked approach that guarantees a method exists.
There's also a non-type-safe string-based form: workflow.signal('some-name', my_arg)
.
When sent to the server, the name is checked only after arriving at the Worker.
This is where "dynamic handlers" come in.
After failing to find a handler with a matching name and type, the Worker checks for a registered dynamic stand-in handler. If found, the Worker uses that instead.
You must opt handlers into dynamic access.
Add dynamic=True
to the handler decorator (for example, @workflow.signal(dynamic=True)
) to make a handler dynamic.
The handler's signature must accept (self, name: str, args: Sequence[RawValue])
.
Use a payload_converter function to convert RawValue
objects to your required type.
For example:
from typing import Sequence
from temporalio.common import RawValue
...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
...
This sample creates a dynamic_signal
Signal.
When an unregistered or unrecognized Signal arrives with a matching signature, dynamic assignment uses this handler to manage the Signal.
It is responsible for transforming the sequence contents into usable data in a form that the method's logic can process and act on.
Set a dynamic Workflow
A dynamic Workflow refers to a special stand-in Workflow Definition. It's used when an unknown Workflow Execution request arrives.
Consider the "MyUnknownWorkflow" example described earlier. The Worker may find there's no registered Workflow Definitions of that name or type. After failing to find a Workflow Definition with a matching type, the Worker looks for a dynamic stand-in. If found, it invokes that instead.
To participate, your Workflow must opt into dynamic access.
Adding dynamic=True
to the @workflow.defn
decorator makes the Workflow Definition eligible to participate in dynamic invocation.
You must register the Workflow with the Worker before it can be invoked.
The Workflow Definition's primary Workflow method must accept a single argument of type Sequence[temporalio.common.RawValue]
.
Use a payload_converter function to convert RawValue
objects to your required type.
For example:
# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
This Workflow converts the first Sequence
element to a string, and uses that to execute an Activity.
Set a dynamic Activity
A dynamic Activity is a stand-in implementation. It's used when an Activity Task with an unknown Activity type is received by the Worker.
To participate, your Activity must opt into dynamic access.
Adding dynamic=True
to the @activity.defn
decorator makes the Workflow Definition eligible to participate in dynamic invocation.
You must register the Activity with the Worker before it can be invoked.
The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
Use a payload_converter function to convert RawValue
objects to your required types.
For example:
# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
This example invokes an unregistered Activity by name. The Worker resolves it using the registered dynamic Activity instead. When possible, prefer to use compiler-checked type-safe arguments rather than Activity name strings.