This is the multi-page printable view of this section. Click here to print.
Dapr Python SDK extensions
- 1: Getting started with the Dapr Python gRPC service extension
- 2: Dapr Python SDK integration with FastAPI
- 3: Dapr Python SDK integration with Flask
- 4: Dapr Python SDK integration with Dapr Workflow extension
1 - Getting started with the Dapr Python gRPC service extension
The Dapr Python SDK provides a built in gRPC server extension, dapr.ext.grpc
, for creating Dapr services.
Installation
You can download and install the Dapr gRPC server extension with:
pip install dapr-ext-grpc
Note
The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the <code>dapr-dev</code> package.
pip3 install dapr-ext-grpc-dev
Examples
The App
object can be used to create a server.
Listen for service invocation requests
The InvokeMethodReqest
and InvokeMethodResponse
objects can be used to handle incoming requests.
A simple service that will listen and respond to requests will look like:
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
app = App()
@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
print(request.metadata, flush=True)
print(request.text(), flush=True)
return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
app.run(50051)
A full sample can be found here.
Subscribe to a topic
When subscribing to a topic, you can instruct dapr whether the event delivered has been accepted, or whether it should be dropped, or retried later.
from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse
app = App()
# Default subscription for a topic
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
print(event.Data(),flush=True)
# Returning None (or not doing a return explicitly) is equivalent
# to returning a TopicEventResponse("success").
# You can also return TopicEventResponse("retry") for dapr to log
# the message and retry delivery later, or TopicEventResponse("drop")
# for it to drop the message
return TopicEventResponse("success")
# Specific handler using Pub/Sub routing
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
print(event.Data(),flush=True)
# Handler with disabled topic validation
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
print(event.Data(),flush=True)
app.run(50051)
A full sample can be found here.
Setup input binding trigger
from dapr.ext.grpc import App, BindingRequest
app = App()
@app.binding('kafkaBinding')
def binding(request: BindingRequest):
print(request.text(), flush=True)
app.run(50051)
A full sample can be found here.
Related links
2 - Dapr Python SDK integration with FastAPI
The Dapr Python SDK provides integration with FastAPI using the dapr-ext-fastapi
extension.
Installation
You can download and install the Dapr FastAPI extension with:
pip install dapr-ext-fastapi
Note
The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the <code>dapr-dev</code> package.
pip install dapr-ext-fastapi-dev
Example
Subscribing to events of different types
import uvicorn
from fastapi import Body, FastAPI
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel
class RawEventModel(BaseModel):
body: str
class User(BaseModel):
id: int
name: str
class CloudEventModel(BaseModel):
data: User
datacontenttype: str
id: str
pubsubname: str
source: str
specversion: str
topic: str
traceid: str
traceparent: str
tracestate: str
type: str
app = FastAPI()
dapr_app = DaprApp(app)
# Allow handling event with any structure (Easiest, but least robust)
# dapr publish --publish-app-id sample --topic any_topic --pubsub pubsub --data '{"id":"7", "desc": "good", "size":"small"}'
@dapr_app.subscribe(pubsub='pubsub', topic='any_topic')
def any_event_handler(event_data = Body()):
print(event_data)
# For robustness choose one of the below based on if publisher is using CloudEvents
# Handle events sent with CloudEvents
# dapr publish --publish-app-id sample --topic cloud_topic --pubsub pubsub --data '{"id":"7", "name":"Bob Jones"}'
@dapr_app.subscribe(pubsub='pubsub', topic='cloud_topic')
def cloud_event_handler(event_data: CloudEventModel):
print(event_data)
# Handle raw events sent without CloudEvents
# curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/raw_topic?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"body": "345"}'
@dapr_app.subscribe(pubsub='pubsub', topic='raw_topic')
def raw_event_handler(event_data: RawEventModel):
print(event_data)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=30212)
Creating an actor
from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from demo_actor import DemoActor
app = FastAPI(title=f'{DemoActor.__name__}Service')
# Add Dapr Actor Extension
actor = DaprActor(app)
@app.on_event("startup")
async def startup_event():
# Register DemoActor
await actor.register_actor(DemoActor)
@app.get("/GetMyData")
def get_my_data():
return "{'message': 'myData'}"
3 - Dapr Python SDK integration with Flask
The Dapr Python SDK provides integration with Flask using the flask-dapr
extension.
Installation
You can download and install the Dapr Flask extension with:
pip install flask-dapr
Note
The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the <code>dapr-dev</code> package.
pip install flask-dapr-dev
Example
from flask import Flask
from flask_dapr.actor import DaprActor
from dapr.conf import settings
from demo_actor import DemoActor
app = Flask(f'{DemoActor.__name__}Service')
# Enable DaprActor Flask extension
actor = DaprActor(app)
# Register DemoActor
actor.register_actor(DemoActor)
# Setup method route
@app.route('/GetMyData', methods=['GET'])
def get_my_data():
return {'message': 'myData'}, 200
# Run application
if __name__ == '__main__':
app.run(port=settings.HTTP_APP_PORT)
4 - Dapr Python SDK integration with Dapr Workflow extension
The Dapr Python SDK provides a built-in Dapr Workflow extension, dapr.ext.workflow
, for creating Dapr services.
Installation
You can download and install the Dapr Workflow extension with:
pip install dapr-ext-workflow
Note
The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the <code>dapr-dev</code> package.
pip install dapr-ext-workflow-dev
Example
from time import sleep
import dapr.ext.workflow as wf
wfr = wf.WorkflowRuntime()
@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2]
@wfr.activity(name='step1')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1
@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2
@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work
if __name__ == '__main__':
wfr.start()
sleep(10) # wait for workflow runtime to start
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')
wfr.shutdown()
- Learn more about authoring and managing workflows:
- Visit Python SDK examples for code samples and instructions to try out Dapr Workflow:
Next steps
Getting started with the Dapr Workflow Python SDK4.1 - Getting started with the Dapr Workflow Python SDK
Let’s create a Dapr workflow and invoke it using the console. With the provided workflow example, you will:
- Run a Python console application that demonstrates workflow orchestration with activities, child workflows, and external events
- Learn how to handle retries, timeouts, and workflow state management
- Use the Python workflow SDK to start, pause, resume, and purge workflow instances
This example uses the default configuration from dapr init
in self-hosted mode.
In the Python example project, the simple.py
file contains the setup of the app, including:
- The workflow definition
- The workflow activity definitions
- The registration of the workflow and workflow activities
Prerequisites
- Dapr CLI installed
- Initialized Dapr environment
- Python 3.9+ installed
- Dapr Python package and the workflow extension installed
- Verify you’re using the latest proto bindings
Set up the environment
Start by cloning the [Python SDK repo].
git clone https://github.com/dapr/python-sdk.git
From the Python SDK root directory, navigate to the Dapr Workflow example.
cd examples/workflow
Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK.
pip3 install -r workflow/requirements.txt
Run the application locally
To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run:
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py
Note: Since Python3.exe is not defined in Windows, you may need to use
python simple.py
instead ofpython3 simple.py
.
Expected output
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: "Completed"
What happened?
When you run the application, several key workflow features are shown:
-
Workflow and Activity Registration: The application uses Python decorators to automatically register workflows and activities with the runtime. This decorator-based approach provides a clean, declarative way to define your workflow components:
@wfr.workflow(name='hello_world_wf') def hello_world_wf(ctx: DaprWorkflowContext, wf_input): # Workflow definition... @wfr.activity(name='hello_act') def hello_act(ctx: WorkflowActivityContext, wf_input): # Activity definition...
-
Runtime Setup: The application initializes the workflow runtime and client:
wfr = WorkflowRuntime() wfr.start() wf_client = DaprWorkflowClient()
-
Activity Execution: The workflow executes a series of activities that increment a counter:
@wfr.workflow(name='hello_world_wf') def hello_world_wf(ctx: DaprWorkflowContext, wf_input): yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10)
-
Retry Logic: The workflow demonstrates error handling with a retry policy:
retry_policy = RetryPolicy( first_retry_interval=timedelta(seconds=1), max_number_of_attempts=3, backoff_coefficient=2, max_retry_interval=timedelta(seconds=10), retry_timeout=timedelta(seconds=100), ) yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
-
Child Workflow: A child workflow is executed with its own retry policy:
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
-
External Event Handling: The workflow waits for an external event with a timeout:
event = ctx.wait_for_external_event(event_name) timeout = ctx.create_timer(timedelta(seconds=30)) winner = yield when_any([event, timeout])
-
Workflow Lifecycle Management: The example demonstrates how to pause and resume the workflow:
wf_client.pause_workflow(instance_id=instance_id) metadata = wf_client.get_workflow_state(instance_id=instance_id) # ... check status ... wf_client.resume_workflow(instance_id=instance_id)
-
Event Raising: After resuming, the workflow raises an event:
wf_client.raise_workflow_event( instance_id=instance_id, event_name=event_name, data=event_data )
-
Completion and Cleanup: Finally, the workflow waits for completion and cleans up:
state = wf_client.wait_for_workflow_completion( instance_id, timeout_in_seconds=30 ) wf_client.purge_workflow(instance_id=instance_id)