1 - Getting started with the Dapr client Python SDK
How to get up and running with the Dapr Python SDK
The Dapr client package allows you to interact with other Dapr applications from a Python application.
Prerequisites
Install the Dapr Python package before getting started.
Import the client package
The dapr
package contains the DaprClient
, which is used to create and use a client.
from dapr.clients import DaprClient
Initialising the client
You can initialise a Dapr client in multiple ways:
Default values:
When you initialise the client without any parameters it will use the default values for a Dapr
sidecar instance (127.0.0.1:50001
).
from dapr.clients import DaprClient
with DaprClient() as d:
# use the client
Specifying an endpoint on initialisation:
When passed as an argument in the constructor, the gRPC endpoint takes precedence over any
configuration or environment variable.
from dapr.clients import DaprClient
with DaprClient("mydomain:50051?tls=true") as d:
# use the client
Configuration options:
Dapr Sidecar Endpoints
You can use the standardised DAPR_GRPC_ENDPOINT
environment variable to
specify the gRPC endpoint. When this variable is set, the client can be initialised
without any arguments:
export DAPR_GRPC_ENDPOINT="mydomain:50051?tls=true"
from dapr.clients import DaprClient
with DaprClient() as d:
# the client will use the endpoint specified in the environment variables
The legacy environment variables DAPR_RUNTIME_HOST
, DAPR_HTTP_PORT
and DAPR_GRPC_PORT
are
also supported, but DAPR_GRPC_ENDPOINT
takes precedence.
Dapr API Token
If your Dapr instance is configured to require the DAPR_API_TOKEN
environment variable, you can
set it in the environment and the client will use it automatically.
You can read more about Dapr API token authentication here.
Health timeout
On client initialisation, a health check is performed against the Dapr sidecar (/healthz/outbound
).
The client will wait for the sidecar to be up and running before proceeding.
The default healthcheck timeout is 60 seconds, but it can be overridden by setting the DAPR_HEALTH_TIMEOUT
environment variable.
Retries and timeout
The Dapr client can retry a request if a specific error code is received from the sidecar. This is
configurable through the DAPR_API_MAX_RETRIES
environment variable and is picked up automatically,
not requiring any code changes.
The default value for DAPR_API_MAX_RETRIES
is 0
, which means no retries will be made.
You can fine-tune more retry parameters by creating a dapr.clients.retry.RetryPolicy
object and
passing it to the DaprClient constructor:
from dapr.clients.retry import RetryPolicy
retry = RetryPolicy(
max_attempts=5,
initial_backoff=1,
max_backoff=20,
backoff_multiplier=1.5,
retryable_http_status_codes=[408, 429, 500, 502, 503, 504],
retryable_grpc_status_codes=[StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED, ]
)
with DaprClient(retry_policy=retry) as d:
...
or for actors:
factory = ActorProxyFactory(retry_policy=RetryPolicy(max_attempts=3))
proxy = ActorProxy.create('DemoActor', ActorId('1'), DemoActorInterface, factory)
Timeout can be set for all calls through the environment variable DAPR_API_TIMEOUT_SECONDS
. The default value is 60 seconds.
Note: You can control timeouts on service invocation separately, by passing a timeout
parameter to the invoke_method
method.
Error handling
Initially, errors in Dapr followed the Standard gRPC error model. However, to provide more detailed and informative error messages, in version 1.13 an enhanced error model has been introduced which aligns with the gRPC Richer error model. In response, the Python SDK implemented DaprGrpcError
, a custom exception class designed to improve the developer experience.
It’s important to note that the transition to using DaprGrpcError
for all gRPC status exceptions is a work in progress. As of now, not every API call in the SDK has been updated to leverage this custom exception. We are actively working on this enhancement and welcome contributions from the community.
Example of handling DaprGrpcError
exceptions when using the Dapr python-SDK:
try:
d.save_state(store_name=storeName, key=key, value=value)
except DaprGrpcError as err:
print(f'Status code: {err.code()}')
print(f"Message: {err.message()}")
print(f"Error code: {err.error_code()}")
print(f"Error info(reason): {err.error_info.reason}")
print(f"Resource info (resource type): {err.resource_info.resource_type}")
print(f"Resource info (resource name): {err.resource_info.resource_name}")
print(f"Bad request (field): {err.bad_request.field_violations[0].field}")
print(f"Bad request (description): {err.bad_request.field_violations[0].description}")
Building blocks
The Python SDK allows you to interface with all of the Dapr building blocks.
Invoke a service
The Dapr Python SDK provides a simple API for invoking services via either HTTP or gRPC (deprecated). The protocol can be selected by setting the DAPR_API_METHOD_INVOCATION_PROTOCOL
environment variable, defaulting to HTTP when unset. GRPC service invocation in Dapr is deprecated and GRPC proxying is recommended as an alternative.
from dapr.clients import DaprClient
with DaprClient() as d:
# invoke a method (gRPC or HTTP GET)
resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"message":"Hello World"}')
# for other HTTP verbs the verb must be specified
# invoke a 'POST' method (HTTP only)
resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"id":"100", "FirstName":"Value", "LastName":"Value"}', http_verb='post')
The base endpoint for HTTP api calls is specified in the DAPR_HTTP_ENDPOINT
environment variable.
If this variable is not set, the endpoint value is derived from the DAPR_RUNTIME_HOST
and DAPR_HTTP_PORT
variables, whose default values are 127.0.0.1
and 3500
accordingly.
The base endpoint for gRPC calls is the one used for the client initialisation (explained above).
Save & get application state
from dapr.clients import DaprClient
with DaprClient() as d:
# Save state
d.save_state(store_name="statestore", key="key1", value="value1")
# Get state
data = d.get_state(store_name="statestore", key="key1").data
# Delete state
d.delete_state(store_name="statestore", key="key1")
Query application state (Alpha)
from dapr import DaprClient
query = '''
{
"filter": {
"EQ": { "state": "CA" }
},
"sort": [
{
"key": "person.id",
"order": "DESC"
}
]
}
'''
with DaprClient() as d:
resp = d.query_state(
store_name='state_store',
query=query,
states_metadata={"metakey": "metavalue"}, # optional
)
Publish & subscribe
Publish messages
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')
Send CloudEvents messages with a json payload:
from dapr.clients import DaprClient
import json
with DaprClient() as d:
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'my-service',
'id': 'myid',
'data': {'id': 1, 'message': 'hello world'},
'datacontenttype': 'application/json',
}
# Set the data content type to 'application/cloudevents+json'
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_CE',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)
Publish CloudEvents messages with plain text payload:
from dapr.clients import DaprClient
import json
with DaprClient() as d:
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'my-service',
'id': "myid",
'data': 'hello world',
'datacontenttype': 'text/plain',
}
# Set the data content type to 'application/cloudevents+json'
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_CE',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)
Subscribe to messages
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import json
app = App()
# Default subscription for a topic
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> None:
data = json.loads(event.Data())
print(f'Received: id={data["id"]}, message="{data ["message"]}"'
' content_type="{event.content_type}"',flush=True)
# Specific handler using Pub/Sub routing
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
data = json.loads(event.Data())
print(f'Received: id={data["id"]}, message="{data ["message"]}"'
' content_type="{event.content_type}"',flush=True)
Streaming message subscription
You can create a streaming subscription to a PubSub topic using either the subscribe
or subscribe_handler
methods.
The subscribe
method returns an iterable Subscription
object, which allows you to pull messages from the
stream by using a for
loop (ex. for message in subscription
) or by
calling the next_message
method. This will block on the main thread while waiting for messages.
When done, you should call the close method to terminate the
subscription and stop receiving messages.
The subscribe_with_handler
method accepts a callback function that is executed for each message
received from the stream.
It runs in a separate thread, so it doesn’t block the main thread. The callback should return a
TopicEventResponse
(ex. TopicEventResponse('success')
), indicating whether the message was
processed successfully, should be retried, or should be discarded. The method will automatically
manage message acknowledgements based on the returned status. The call to subscribe_with_handler
method returns a close function, which should be called to terminate the subscription when you’re
done.
Here’s an example of using the subscribe
method:
import time
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError
counter = 0
def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
return 'success'
def main():
with DaprClient() as client:
global counter
subscription = client.subscribe(
pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD'
)
try:
for message in subscription:
if message is None:
print('No message received. The stream might have been cancelled.')
continue
try:
response_status = process_message(message)
if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
if counter >= 5:
break
except StreamInactiveError:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
except StreamCancelledError:
print('Stream was cancelled')
break
except Exception as e:
print(f'Error occurred during message processing: {e}')
finally:
print('Closing subscription...')
subscription.close()
if __name__ == '__main__':
main()
And here’s an example of using the subscribe_with_handler
method:
import time
from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
counter = 0
def process_message(message):
# Process the message here
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
return TopicEventResponse('success')
def main():
with (DaprClient() as client):
# This will start a new thread that will listen for messages
# and process them in the `process_message` function
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub', topic='TOPIC_A', handler_fn=process_message,
dead_letter_topic='TOPIC_A_DEAD'
)
while counter < 5:
time.sleep(1)
print("Closing subscription...")
close_fn()
if __name__ == '__main__':
main()
Conversation (Alpha)
Note
The Dapr Conversation API is currently in alpha.
Since version 1.15 Dapr offers developers the capability to securely and reliably interact with Large Language Models (LLM) through the Conversation API.
from dapr.clients import DaprClient
from dapr.clients.grpc._request import ConversationInput
with DaprClient() as d:
inputs = [
ConversationInput(content="What's Dapr?", role='user', scrub_pii=True),
ConversationInput(content='Give a brief overview.', role='user', scrub_pii=True),
]
metadata = {
'model': 'foo',
'key': 'authKey',
'cacheTTL': '10m',
}
response = d.converse_alpha1(
name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata
)
for output in response.outputs:
print(f'Result: {output.result}')
Interact with output bindings
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.invoke_binding(binding_name='kafkaBinding', operation='create', data='{"message":"Hello World"}')
Retrieve secrets
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.get_secret(store_name='localsecretstore', key='secretKey')
Configuration
Get configuration
from dapr.clients import DaprClient
with DaprClient() as d:
# Get Configuration
configuration = d.get_configuration(store_name='configurationstore', keys=['orderId'], config_metadata={})
Subscribe to configuration
import asyncio
from time import sleep
from dapr.clients import DaprClient
async def executeConfiguration():
with DaprClient() as d:
storeName = 'configurationstore'
key = 'orderId'
# Wait for sidecar to be up within 20 seconds.
d.wait(20)
# Subscribe to configuration by key.
configuration = await d.subscribe_configuration(store_name=storeName, keys=[key], config_metadata={})
while True:
if configuration != None:
items = configuration.get_items()
for key, item in items:
print(f"Subscribe key={key} value={item.value} version={item.version}", flush=True)
else:
print("Nothing yet")
sleep(5)
asyncio.run(executeConfiguration())
Distributed Lock
from dapr.clients import DaprClient
def main():
# Lock parameters
store_name = 'lockstore' # as defined in components/lockstore.yaml
resource_id = 'example-lock-resource'
client_id = 'example-client-id'
expiry_in_seconds = 60
with DaprClient() as dapr:
print('Will try to acquire a lock from lock store named [%s]' % store_name)
print('The lock is for a resource named [%s]' % resource_id)
print('The client identifier is [%s]' % client_id)
print('The lock will will expire in %s seconds.' % expiry_in_seconds)
with dapr.try_lock(store_name, resource_id, client_id, expiry_in_seconds) as lock_result:
assert lock_result.success, 'Failed to acquire the lock. Aborting.'
print('Lock acquired successfully!!!')
# At this point the lock was released - by magic of the `with` clause ;)
unlock_result = dapr.unlock(store_name, resource_id, client_id)
print('We already released the lock so unlocking will not work.')
print('We tried to unlock it anyway and got back [%s]' % unlock_result.status)
Cryptography
from dapr.clients import DaprClient
message = 'The secret is "passw0rd"'
def main():
with DaprClient() as d:
resp = d.encrypt(
data=message.encode(),
options=EncryptOptions(
component_name='crypto-localstorage',
key_name='rsa-private-key.pem',
key_wrap_algorithm='RSA',
),
)
encrypt_bytes = resp.read()
resp = d.decrypt(
data=encrypt_bytes,
options=DecryptOptions(
component_name='crypto-localstorage',
key_name='rsa-private-key.pem',
),
)
decrypt_bytes = resp.read()
print(decrypt_bytes.decode()) # The secret is "passw0rd"
Python SDK examples
2 - Getting started with the Dapr actor Python SDK
How to get up and running with the Dapr Python SDK
The Dapr actor package allows you to interact with Dapr virtual actors from a Python application.
Pre-requisites
Actor interface
The interface defines the actor contract that is shared between the actor implementation and the clients calling the actor. Because a client may depend on it, it typically makes sense to define it in an assembly that is separate from the actor implementation.
from dapr.actor import ActorInterface, actormethod
class DemoActorInterface(ActorInterface):
@actormethod(name="GetMyData")
async def get_my_data(self) -> object:
...
Actor services
An actor service hosts the virtual actor. It is implemented a class that derives from the base type Actor
and implements the interfaces defined in the actor interface.
Actors can be created using one of the Dapr actor extensions:
Actor client
An actor client contains the implementation of the actor client which calls the actor methods defined in the actor interface.
import asyncio
from dapr.actor import ActorProxy, ActorId
from demo_actor_interface import DemoActorInterface
async def main():
# Create proxy client
proxy = ActorProxy.create('DemoActor', ActorId('1'), DemoActorInterface)
# Call method on client
resp = await proxy.GetMyData()
Sample
Visit this page for a runnable actor sample.
Mock Actor Testing
The Dapr Python SDK provides the ability to create mock actors to unit test your actor methods and see how they interact with the actor state.
Sample Usage
from dapr.actor.runtime.mock_actor import create_mock_actor
class MyActor(Actor, MyActorInterface):
async def save_state(self, data) -> None:
await self._state_manager.set_state('mystate', data)
await self._state_manager.save_state()
mock_actor = create_mock_actor(MyActor, "id")
await mock_actor.save_state(5)
assert mockactor._state_manager._mock_state['mystate'] == 5 #True
Mock actors are created by passing your actor class and an actor ID (a string) to the create_mock_actor function. This function returns an instance of the actor with many internal methods overridden. Instead of interacting with Dapr for tasks like saving state or managing timers, the mock actor uses in-memory state to simulate these behaviors.
This state can be accessed through the following variables:
IMPORTANT NOTE: Due to type hinting issues as discussed further down, these variables will not be visible to type hinters/linters/etc, who will think they are invalid variables. You will need to use them with #type: ignore in order to satisfy any such systems.
-
_state_manager._mock_state()
A [str, object]
dict where all the actor state is stored. Any variable saved via _state_manager.save_state(key, value)
, or any other statemanager method is stored in the dict as that key, value pair. Any value loaded via try_get_state
or any other statemanager method is taken from this dict.
-
_state_manager._mock_timers()
A [str, ActorTimerData]
dict which holds the active actor timers. Any actor method which would add or remove a timer adds or pops the appropriate ActorTimerData
object from this dict.
-
_state_manager._mock_reminders()
A [str, ActorReminderData] dict which holds the active actor reminders. Any actor method which would add or remove a timer adds or pops the appropriate ActorReminderData object from this dict.
Note: The timers and reminders will never actually trigger. The dictionaries exist only so methods that should add or remove timers/reminders can be tested. If you need to test the callbacks they should activate, you should call them directly with the appropriate values:
result = await mock_actor.recieve_reminder(name, state, due_time, period, _ttl)
# Test the result directly or test for side effects (like changing state) by querying `_state_manager._mock_state`
Usage and Limitations
To allow for more fine-grained control, the _on_activate
method will not be called automatically the way it is when Dapr initializes a new Actor instance. You should call it manually as needed as part of your tests.
A current limitation of the mock actor system is that it does not call the _on_pre_actor_method
and _on_post_actor_method
methods. You can always call these methods manually as part of a test.
The __init__
, register_timer
, unregister_timer
, register_reminder
, unregister_reminder
methods are all overwritten by the MockActor class that gets applied as a mixin via create_mock_actor
. If your actor itself overwrites these methods, those modifications will themselves be overwritten and the actor will likely not behave as you expect.
note: __init__
is a special case where you are expected to define it as
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
Mock actors work fine with this, but if you have added any extra logic into __init__
, it will be overwritten. It is worth noting that the correct way to apply logic on initialization is via _on_activate
(which can also be safely used with mock actors) instead of __init__
.
If you have an actor which does override default Dapr actor methods, you can create a custom subclass of the MockActor class (from MockActor.py) which implements whatever custom logic you have along with interacting with _mock_state
, _mock_timers
, and _mock_reminders
as normal, and then applying that custom class as a mixin via a create_mock_actor
function you define yourself.
The actor _runtime_ctx
variable is set to None. All the normal actor methods have been overwritten such as to not call it, but if your code itself interacts directly with _runtime_ctx
, tests may fail.
The actor _state_manager is overwritten with an instance of MockStateManager
. This has all the same methods and functionality of the base ActorStateManager
, except for using the various _mock
variables for storing data instead of the _runtime_ctx
. If your code implements its own custom state manager it will be overwritten and tests will likely fail.
Type Hinting
Because of Python’s lack of a unified method for type hinting type intersections (see: python/typing #213), type hinting unfortunately doesn’t work with Mock Actors. The return type is type hinted as “instance of Actor subclass T” when it should really be type hinted as “instance of MockActor subclass T” or “instance of type intersection [Actor subclass T, MockActor]
” (where, it is worth noting, MockActor
is itself a subclass of Actor
).
This means that, for example, if you hover over mockactor._state_manager
in a code editor, it will come up as an instance of ActorStateManager (instead of MockStateManager), and various IDE helper functions (like VSCode’s Go to Definition
, which will bring you to the definition of ActorStateManager instead of MockStateManager) won’t work properly.
For now, this issue is unfixable, so it’s merely something to be noted because of the confusion it might cause. If in the future it becomes possible to accurately type hint cases like this feel free to open an issue about implementing it.
3.1 - Getting started with the Dapr Python gRPC service extension
How to get up and running with the Dapr Python gRPC extension
The Dapr Python SDK provides a built in gRPC server extension, dapr.ext.grpc
, for creating Dapr services.
Installation
You can download and install the Dapr gRPC server extension with:
pip install dapr-ext-grpc
Note
The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the <code>dapr-dev</code> package.
pip3 install dapr-ext-grpc-dev
Examples
The App
object can be used to create a server.
Listen for service invocation requests
The InvokeMethodReqest
and InvokeMethodResponse
objects can be used to handle incoming requests.
A simple service that will listen and respond to requests will look like:
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
app = App()
@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
print(request.metadata, flush=True)
print(request.text(), flush=True)
return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
app.run(50051)
A full sample can be found here.
Subscribe to a topic
When subscribing to a topic, you can instruct dapr whether the event delivered has been accepted, or whether it should be dropped, or retried later.
from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse
app = App()
# Default subscription for a topic
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
print(event.Data(),flush=True)
# Returning None (or not doing a return explicitly) is equivalent
# to returning a TopicEventResponse("success").
# You can also return TopicEventResponse("retry") for dapr to log
# the message and retry delivery later, or TopicEventResponse("drop")
# for it to drop the message
return TopicEventResponse("success")
# Specific handler using Pub/Sub routing
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
print(event.Data(),flush=True)
# Handler with disabled topic validation
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
print(event.Data(),flush=True)
app.run(50051)
A full sample can be found here.
from dapr.ext.grpc import App, BindingRequest
app = App()
@app.binding('kafkaBinding')
def binding(request: BindingRequest):
print(request.text(), flush=True)
app.run(50051)
A full sample can be found here.
3.4.1 - Getting started with the Dapr Workflow Python SDK
How to get up and running with workflows using the Dapr Python SDK
Let’s create a Dapr workflow and invoke it using the console. With the provided workflow example, you will:
- Run a Python console application that demonstrates workflow orchestration with activities, child workflows, and external events
- Learn how to handle retries, timeouts, and workflow state management
- Use the Python workflow SDK to start, pause, resume, and purge workflow instances
This example uses the default configuration from dapr init
in self-hosted mode.
In the Python example project, the simple.py
file contains the setup of the app, including:
- The workflow definition
- The workflow activity definitions
- The registration of the workflow and workflow activities
Prerequisites
Set up the environment
Start by cloning the [Python SDK repo].
git clone https://github.com/dapr/python-sdk.git
From the Python SDK root directory, navigate to the Dapr Workflow example.
Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK.
pip3 install -r workflow/requirements.txt
Run the application locally
To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run:
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py
Note: Since Python3.exe is not defined in Windows, you may need to use python simple.py
instead of python3 simple.py
.
Expected output
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: "Completed"
What happened?
When you run the application, several key workflow features are shown:
-
Workflow and Activity Registration: The application uses Python decorators to automatically register workflows and activities with the runtime. This decorator-based approach provides a clean, declarative way to define your workflow components:
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
# Workflow definition...
@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
# Activity definition...
-
Runtime Setup: The application initializes the workflow runtime and client:
wfr = WorkflowRuntime()
wfr.start()
wf_client = DaprWorkflowClient()
-
Activity Execution: The workflow executes a series of activities that increment a counter:
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
-
Retry Logic: The workflow demonstrates error handling with a retry policy:
retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
-
Child Workflow: A child workflow is executed with its own retry policy:
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
-
External Event Handling: The workflow waits for an external event with a timeout:
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])
-
Workflow Lifecycle Management: The example demonstrates how to pause and resume the workflow:
wf_client.pause_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
# ... check status ...
wf_client.resume_workflow(instance_id=instance_id)
-
Event Raising: After resuming, the workflow raises an event:
wf_client.raise_workflow_event(
instance_id=instance_id,
event_name=event_name,
data=event_data
)
-
Completion and Cleanup: Finally, the workflow waits for completion and cleans up:
state = wf_client.wait_for_workflow_completion(
instance_id,
timeout_in_seconds=30
)
wf_client.purge_workflow(instance_id=instance_id)
Next steps