1 - 开始使用 Dapr Python gRPC 服务扩展

如何启动并运行 Dapr Python gRPC 扩展

Dapr Python SDK 提供了一个用于创建 Dapr 服务的内置 gRPC 服务器扩展 dapr.ext.grpc

安装

您可以通过以下命令下载并安装 Dapr gRPC 服务器扩展:

pip install dapr-ext-grpc
pip3 install dapr-ext-grpc-dev

示例

您可以使用 App 对象来创建一个服务器。

监听服务调用请求

可以使用 InvokeMethodRequestInvokeMethodResponse 对象来处理传入的请求。

以下是一个简单的服务示例,它会监听并响应请求:

from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse

app = App()

@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
    print(request.metadata, flush=True)
    print(request.text(), flush=True)

    return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")

app.run(50051)

完整示例可以在这里找到。

订阅主题

在订阅主题时,您可以指示 dapr 事件是否已被接受,或者是否应该丢弃或稍后重试。

from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse

app = App()

# 默认的主题订阅
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
    print(event.Data(), flush=True)
    # 返回 None(或不显式返回)等同于返回 TopicEventResponse("success")。
    # 您还可以返回 TopicEventResponse("retry") 以便 dapr 记录消息并稍后重试交付,
    # 或者返回 TopicEventResponse("drop") 以丢弃消息
    return TopicEventResponse("success")

# 使用发布/订阅路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
               rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
    print(event.Data(), flush=True)

# 禁用主题验证的处理程序
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
    print(event.Data(), flush=True)

app.run(50051)

完整示例可以在这里找到。

设置输入绑定触发器

from dapr.ext.grpc import App, BindingRequest

app = App()

@app.binding('kafkaBinding')
def binding(request: BindingRequest):
    print(request.text(), flush=True)

app.run(50051)

完整示例可以在这里找到。

相关链接

2 - Dapr Python SDK 与 FastAPI 集成指南

如何使用 FastAPI 扩展创建 Dapr Python actor 和发布订阅功能

Dapr Python SDK 通过 dapr-ext-fastapi 扩展实现与 FastAPI 的集成。

安装

您可以通过以下命令下载并安装 Dapr FastAPI 扩展:

pip install dapr-ext-fastapi
pip install dapr-ext-fastapi-dev

示例

订阅不同类型的事件

import uvicorn
from fastapi import Body, FastAPI
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel

class RawEventModel(BaseModel):
    body: str

class User(BaseModel):
    id: int
    name = 'Jane Doe'

class CloudEventModel(BaseModel):
    data: User
    datacontenttype: str
    id: str
    pubsubname: str
    source: str
    specversion: str
    topic: str
    traceid: str
    traceparent: str
    tracestate: str
    type: str    
    
app = FastAPI()
dapr_app = DaprApp(app)

# 处理任意结构的事件(简单但不够可靠)
# dapr publish --publish-app-id sample --topic any_topic --pubsub pubsub --data '{"id":"7", "desc": "good", "size":"small"}'
@dapr_app.subscribe(pubsub='pubsub', topic='any_topic')
def any_event_handler(event_data = Body()):
    print(event_data)    

# 为了更稳健,根据发布者是否使用 CloudEvents 选择以下之一

# 处理使用 CloudEvents 发送的事件
# dapr publish --publish-app-id sample --topic cloud_topic --pubsub pubsub --data '{"id":"7", "name":"Bob Jones"}'
@dapr_app.subscribe(pubsub='pubsub', topic='cloud_topic')
def cloud_event_handler(event_data: CloudEventModel):
    print(event_data)   

# 处理未使用 CloudEvents 发送的原始事件
# curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/raw_topic?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"body": "345"}'
@dapr_app.subscribe(pubsub='pubsub', topic='raw_topic')
def raw_event_handler(event_data: RawEventModel):
    print(event_data)    

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=30212)

创建一个 actor

from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from demo_actor import DemoActor

app = FastAPI(title=f'{DemoActor.__name__}服务')

# 添加 Dapr actor 扩展
actor = DaprActor(app)

@app.on_event("startup")
async def startup_event():
    # 注册 DemoActor
    await actor.register_actor(DemoActor)

@app.get("/GetMyData")
def get_my_data():
    return "{'message': 'myData'}"

3 - Dapr Python SDK 与 Flask 集成

如何使用 Flask 扩展创建 Dapr Python 虚拟 actor

Dapr Python SDK 使用 flask-dapr 扩展来实现与 Flask 的集成。

安装

您可以通过以下命令下载并安装 Dapr Flask 扩展:

pip install flask-dapr
pip install flask-dapr-dev

示例

from flask import Flask
from flask_dapr.actor import DaprActor

from dapr.conf import settings
from demo_actor import DemoActor

app = Flask(f'{DemoActor.__name__}Service')

# 启用 DaprActor Flask 扩展
actor = DaprActor(app)

# 注册 DemoActor
actor.register_actor(DemoActor)

# 设置方法路由
@app.route('/GetMyData', methods=['GET'])
def get_my_data():
    return {'message': 'myData'}, 200

# 运行应用程序
if __name__ == '__main__':
    app.run(port=settings.HTTP_APP_PORT)

4 - Dapr Python SDK 与 Dapr Workflow 扩展集成

如何使用 Dapr Workflow 扩展快速上手

Dapr Python SDK 内置了一个 Dapr Workflow 扩展,dapr.ext.workflow,用于创建 Dapr 服务。

安装

您可以通过以下命令下载并安装 Dapr Workflow 扩展:

pip install dapr-ext-workflow
pip3 install dapr-ext-workflow-dev

下一步

开始使用 Dapr Workflow Python SDK

4.1 - 使用 Dapr Workflow Python SDK 入门

如何使用 Dapr Python SDK 开始并运行工作流

我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的 hello world 工作流示例,您将会:

此示例使用 dapr init 的默认配置在本地模式下运行。

在 Python 示例项目中,app.py 文件包含应用程序的设置,其中包括:

  • 工作流定义
  • 工作流活动定义
  • 工作流和工作流活动的注册

先决条件

设置环境

运行以下命令以安装使用 Dapr Python SDK 运行此工作流示例的必要依赖。

pip3 install -r demo_workflow/requirements.txt

克隆 [Python SDK 仓库]。

git clone https://github.com/dapr/python-sdk.git

从 Python SDK 根目录导航到 Dapr 工作流示例。

cd examples/demo_workflow

本地运行应用程序

要运行 Dapr 应用程序,您需要启动 Python 程序和一个 Dapr 辅助进程。在终端中运行:

dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py

注意: 由于 Windows 中未定义 Python3.exe,您可能需要使用 python app.py 而不是 python3 app.py

预期输出

== APP == ==========根据输入开始计数器增加==========

== APP == start_resp exampleInstanceID

== APP == 你好,计数器!
== APP == 新的计数器值是:1!

== APP == 你好,计数器!
== APP == 新的计数器值是:11!

== APP == 你好,计数器!
== APP == 你好,计数器!
== APP == 在暂停调用后从 hello_world_wf 获取响应:已暂停

== APP == 你好,计数器!
== APP == 在恢复调用后从 hello_world_wf 获取响应:运行中

== APP == 你好,计数器!
== APP == 新的计数器值是:111!

== APP == 你好,计数器!
== APP == 实例成功清除

== APP == start_resp exampleInstanceID

== APP == 你好,计数器!
== APP == 新的计数器值是:1112!

== APP == 你好,计数器!
== APP == 新的计数器值是:1122!

== APP == 在终止调用后从 hello_world_wf 获取响应:已终止
== APP == 在终止调用后从 child_wf 获取响应:已终止
== APP == 实例成功清除

发生了什么?

当您运行 dapr run 时,Dapr 客户端:

  1. 注册了工作流 (hello_world_wf) 及其活动 (hello_act)
  2. 启动了工作流引擎
def main():
    with DaprClient() as d:
        host = settings.DAPR_RUNTIME_HOST
        port = settings.DAPR_GRPC_PORT
        workflowRuntime = WorkflowRuntime(host, port)
        workflowRuntime = WorkflowRuntime()
        workflowRuntime.register_workflow(hello_world_wf)
        workflowRuntime.register_activity(hello_act)
        workflowRuntime.start()

        print("==========根据输入开始计数器增加==========")
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

然后 Dapr 暂停并恢复了工作流:

       # 暂停
        d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在暂停调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

        # 恢复
        d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在恢复调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

一旦工作流恢复,Dapr 触发了一个工作流事件并打印了新的计数器值:

        # 触发事件
        d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

为了从您的状态存储中清除工作流状态,Dapr 清除了工作流:

        # 清除
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

然后示例演示了通过以下步骤终止工作流:

  • 使用与已清除工作流相同的 instanceId 启动一个新的工作流。
  • 在关闭工作流之前终止并清除工作流。
        # 启动另一个工作流
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # 终止
        d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        sleep(1)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在终止调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

        # 清除
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

下一步