Let’s create a Dapr workflow and invoke it using the console. With the provided workflow example, you will:
This example uses the default configuration from dapr init
in self-hosted mode.
In the Python example project, the simple.py
file contains the setup of the app, including:
Start by cloning the [Python SDK repo].
git clone https://github.com/dapr/python-sdk.git
From the Python SDK root directory, navigate to the Dapr Workflow example.
cd examples/workflow
Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK.
pip3 install -r workflow/requirements.txt
To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run:
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py
Note: Since Python3.exe is not defined in Windows, you may need to use
python simple.py
instead ofpython3 simple.py
.
Expected output
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: "Completed"
When you run the application, several key workflow features are shown:
Workflow and Activity Registration: The application uses Python decorators to automatically register workflows and activities with the runtime. This decorator-based approach provides a clean, declarative way to define your workflow components:
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
# Workflow definition...
@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
# Activity definition...
Runtime Setup: The application initializes the workflow runtime and client:
wfr = WorkflowRuntime()
wfr.start()
wf_client = DaprWorkflowClient()
Activity Execution: The workflow executes a series of activities that increment a counter:
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
Retry Logic: The workflow demonstrates error handling with a retry policy:
retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
Child Workflow: A child workflow is executed with its own retry policy:
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
External Event Handling: The workflow waits for an external event with a timeout:
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])
Workflow Lifecycle Management: The example demonstrates how to pause and resume the workflow:
wf_client.pause_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
# ... check status ...
wf_client.resume_workflow(instance_id=instance_id)
Event Raising: After resuming, the workflow raises an event:
wf_client.raise_workflow_event(
instance_id=instance_id,
event_name=event_name,
data=event_data
)
Completion and Cleanup: Finally, the workflow waits for completion and cleans up:
state = wf_client.wait_for_workflow_completion(
instance_id,
timeout_in_seconds=30
)
wf_client.purge_workflow(instance_id=instance_id)