This is the multi-page printable view of this section. Click here to print.
Dapr Python SDK 插件
1 - 开始使用 Dapr Python gRPC 服务扩展
Dapr Python SDK 提供了一个用于创建 Dapr 服务的内置 gRPC 服务器扩展 dapr.ext.grpc
。
安装
您可以通过以下命令下载并安装 Dapr gRPC 服务器扩展:
pip install dapr-ext-grpc
注意
开发包包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保卸载任何稳定版本的 Python SDK 扩展。
pip3 install dapr-ext-grpc-dev
示例
您可以使用 App
对象来创建一个服务器。
监听服务调用请求
可以使用 InvokeMethodRequest
和 InvokeMethodResponse
对象来处理传入的请求。
以下是一个简单的服务示例,它会监听并响应请求:
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
app = App()
@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
print(request.metadata, flush=True)
print(request.text(), flush=True)
return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
app.run(50051)
完整示例可以在这里找到。
订阅主题
在订阅主题时,您可以指示 dapr 事件是否已被接受,或者是否应该丢弃或稍后重试。
from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse
app = App()
# 默认的主题订阅
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
print(event.Data(), flush=True)
# 返回 None(或不显式返回)等同于返回 TopicEventResponse("success")。
# 您还可以返回 TopicEventResponse("retry") 以便 dapr 记录消息并稍后重试交付,
# 或者返回 TopicEventResponse("drop") 以丢弃消息
return TopicEventResponse("success")
# 使用发布/订阅路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
print(event.Data(), flush=True)
# 禁用主题验证的处理程序
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
print(event.Data(), flush=True)
app.run(50051)
完整示例可以在这里找到。
设置输入绑定触发器
from dapr.ext.grpc import App, BindingRequest
app = App()
@app.binding('kafkaBinding')
def binding(request: BindingRequest):
print(request.text(), flush=True)
app.run(50051)
完整示例可以在这里找到。
相关链接
2 - Dapr Python SDK 与 FastAPI 集成指南
Dapr Python SDK 通过 dapr-ext-fastapi
扩展实现与 FastAPI 的集成。
安装
您可以通过以下命令下载并安装 Dapr FastAPI 扩展:
pip install dapr-ext-fastapi
注意
开发版包含与 Dapr 运行时预发布版本兼容的功能。在安装 <code>dapr-dev</code> 包之前,请先卸载任何稳定版本的 Python SDK 扩展。
pip install dapr-ext-fastapi-dev
示例
订阅不同类型的事件
import uvicorn
from fastapi import Body, FastAPI
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel
class RawEventModel(BaseModel):
body: str
class User(BaseModel):
id: int
name = 'Jane Doe'
class CloudEventModel(BaseModel):
data: User
datacontenttype: str
id: str
pubsubname: str
source: str
specversion: str
topic: str
traceid: str
traceparent: str
tracestate: str
type: str
app = FastAPI()
dapr_app = DaprApp(app)
# 处理任意结构的事件(简单但不够可靠)
# dapr publish --publish-app-id sample --topic any_topic --pubsub pubsub --data '{"id":"7", "desc": "good", "size":"small"}'
@dapr_app.subscribe(pubsub='pubsub', topic='any_topic')
def any_event_handler(event_data = Body()):
print(event_data)
# 为了更稳健,根据发布者是否使用 CloudEvents 选择以下之一
# 处理使用 CloudEvents 发送的事件
# dapr publish --publish-app-id sample --topic cloud_topic --pubsub pubsub --data '{"id":"7", "name":"Bob Jones"}'
@dapr_app.subscribe(pubsub='pubsub', topic='cloud_topic')
def cloud_event_handler(event_data: CloudEventModel):
print(event_data)
# 处理未使用 CloudEvents 发送的原始事件
# curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/raw_topic?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"body": "345"}'
@dapr_app.subscribe(pubsub='pubsub', topic='raw_topic')
def raw_event_handler(event_data: RawEventModel):
print(event_data)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=30212)
创建一个 actor
from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from demo_actor import DemoActor
app = FastAPI(title=f'{DemoActor.__name__}服务')
# 添加 Dapr actor 扩展
actor = DaprActor(app)
@app.on_event("startup")
async def startup_event():
# 注册 DemoActor
await actor.register_actor(DemoActor)
@app.get("/GetMyData")
def get_my_data():
return "{'message': 'myData'}"
3 - Dapr Python SDK 与 Flask 集成
Dapr Python SDK 使用 flask-dapr
扩展来实现与 Flask 的集成。
安装
您可以通过以下命令下载并安装 Dapr Flask 扩展:
pip install flask-dapr
注意
开发版包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保卸载任何已安装的稳定版 Python SDK 扩展。
pip install flask-dapr-dev
示例
from flask import Flask
from flask_dapr.actor import DaprActor
from dapr.conf import settings
from demo_actor import DemoActor
app = Flask(f'{DemoActor.__name__}Service')
# 启用 DaprActor Flask 扩展
actor = DaprActor(app)
# 注册 DemoActor
actor.register_actor(DemoActor)
# 设置方法路由
@app.route('/GetMyData', methods=['GET'])
def get_my_data():
return {'message': 'myData'}, 200
# 运行应用程序
if __name__ == '__main__':
app.run(port=settings.HTTP_APP_PORT)
4 - Dapr Python SDK 与 Dapr Workflow 扩展集成
注意
Dapr Workflow 目前处于初始测试阶段(alpha)。Dapr Python SDK 内置了一个 Dapr Workflow 扩展,dapr.ext.workflow
,用于创建 Dapr 服务。
安装
您可以通过以下命令下载并安装 Dapr Workflow 扩展:
pip install dapr-ext-workflow
注意
开发包包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保卸载任何已安装的稳定版 Python SDK 扩展。
pip3 install dapr-ext-workflow-dev
下一步
开始使用 Dapr Workflow Python SDK4.1 - 使用 Dapr Workflow Python SDK 入门
注意
Dapr Workflow 目前处于 alpha 阶段。我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的 hello world 工作流示例,您将会:
- 运行一个使用
DaprClient
的 Python 控制台应用程序 - 利用 Python 工作流 SDK 和 API 调用来启动、暂停、恢复、终止和清除工作流实例
此示例使用 dapr init
的默认配置在本地模式下运行。
在 Python 示例项目中,app.py
文件包含应用程序的设置,其中包括:
- 工作流定义
- 工作流活动定义
- 工作流和工作流活动的注册
先决条件
- 已安装 Dapr CLI
- 已初始化 Dapr 环境
- 已安装 Python 3.8+
- 已安装 Dapr Python 包 和 工作流扩展
- 确保您使用的是最新的 proto 绑定(proto 绑定是用于定义服务接口的协议缓冲区文件)
设置环境
运行以下命令以安装使用 Dapr Python SDK 运行此工作流示例的必要依赖。
pip3 install -r demo_workflow/requirements.txt
克隆 [Python SDK 仓库]。
git clone https://github.com/dapr/python-sdk.git
从 Python SDK 根目录导航到 Dapr 工作流示例。
cd examples/demo_workflow
本地运行应用程序
要运行 Dapr 应用程序,您需要启动 Python 程序和一个 Dapr 辅助进程。在终端中运行:
dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py
注意: 由于 Windows 中未定义 Python3.exe,您可能需要使用
python app.py
而不是python3 app.py
。
预期输出
== APP == ==========根据输入开始计数器增加==========
== APP == start_resp exampleInstanceID
== APP == 你好,计数器!
== APP == 新的计数器值是:1!
== APP == 你好,计数器!
== APP == 新的计数器值是:11!
== APP == 你好,计数器!
== APP == 你好,计数器!
== APP == 在暂停调用后从 hello_world_wf 获取响应:已暂停
== APP == 你好,计数器!
== APP == 在恢复调用后从 hello_world_wf 获取响应:运行中
== APP == 你好,计数器!
== APP == 新的计数器值是:111!
== APP == 你好,计数器!
== APP == 实例成功清除
== APP == start_resp exampleInstanceID
== APP == 你好,计数器!
== APP == 新的计数器值是:1112!
== APP == 你好,计数器!
== APP == 新的计数器值是:1122!
== APP == 在终止调用后从 hello_world_wf 获取响应:已终止
== APP == 在终止调用后从 child_wf 获取响应:已终止
== APP == 实例成功清除
发生了什么?
当您运行 dapr run
时,Dapr 客户端:
- 注册了工作流 (
hello_world_wf
) 及其活动 (hello_act
) - 启动了工作流引擎
def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
print("==========根据输入开始计数器增加==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
然后 Dapr 暂停并恢复了工作流:
# 暂停
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在暂停调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
# 恢复
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在恢复调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
一旦工作流恢复,Dapr 触发了一个工作流事件并打印了新的计数器值:
# 触发事件
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
为了从您的状态存储中清除工作流状态,Dapr 清除了工作流:
# 清除
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")
然后示例演示了通过以下步骤终止工作流:
- 使用与已清除工作流相同的
instanceId
启动一个新的工作流。 - 在关闭工作流之前终止并清除工作流。
# 启动另一个工作流
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# 终止
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在终止调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
# 清除
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")