This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Dapr Python SDK

用于开发Dapr应用的Python SDK包

Dapr 提供了多种子包以帮助开发 Python 应用程序。通过这些子包,您可以使用 Dapr 创建 Python 客户端、服务器和虚拟 actor。

先决条件

安装

要开始使用 Python SDK,请安装主要的 Dapr Python SDK 包。

pip install dapr

注意: 开发包包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 dapr-dev 包之前,请确保卸载任何稳定版本的 Python SDK。

pip install dapr-dev

可用子包

SDK 导入

Python SDK 导入是随主 SDK 安装一起包含的子包,但在使用时需要导入。Dapr Python SDK 提供的常用导入包括:

Client

编写 Python 应用以与 Dapr sidecar 和其他 Dapr 应用交互,包括 Python 中的有状态虚拟 actor。

Actors

创建和与 Dapr 的 actor 框架交互。

了解 所有可用的 Dapr Python SDK 导入 的更多信息。

SDK 扩展

SDK 扩展主要用于接收 pub/sub 事件、程序化创建 pub/sub 订阅和处理输入绑定事件。虽然这些任务可以在没有扩展的情况下完成,但使用 Python SDK 扩展会更加方便。

gRPC

使用 gRPC 服务器扩展创建 Dapr 服务。

FastAPI

使用 Dapr FastAPI 扩展与 Dapr Python 虚拟 actor 和 pub/sub 集成。

Flask

使用 Dapr Flask 扩展与 Dapr Python 虚拟 actor 集成。

Workflow

编写与其他 Dapr API 一起工作的 Python 工作流。

了解 Dapr Python SDK 扩展 的更多信息。

试用

克隆 Python SDK 仓库。

git clone https://github.com/dapr/python-sdk.git

通过 Python 快速入门、教程和示例来体验 Dapr 的实际应用:

SDK 示例描述
快速入门使用 Python SDK 在几分钟内体验 Dapr 的 API 构建块。
SDK 示例克隆 SDK 仓库以尝试一些示例并开始。
绑定教程查看 Dapr Python SDK 如何与其他 Dapr SDK 一起工作以启用绑定。
分布式计算器教程使用 Dapr Python SDK 处理方法调用和状态持久化功能。
Hello World 教程学习如何在本地机器上使用 Python SDK 启动并运行 Dapr。
Hello Kubernetes 教程在 Kubernetes 集群中使用 Dapr Python SDK 启动并运行。
可观测性教程使用 Python SDK 探索 Dapr 的指标收集、跟踪、日志记录和健康检查功能。
Pub/sub 教程查看 Dapr Python SDK 如何与其他 Dapr SDK 一起工作以启用 pub/sub 应用。

更多信息

Serialization

了解有关 Dapr SDK 中的序列化的更多信息。

PyPI

Python 包索引

1 - 使用 Dapr 客户端 Python SDK 入门

如何使用 Dapr Python SDK 快速上手

Dapr 客户端包使您能够从 Python 应用程序与其他 Dapr 应用程序进行交互。

准备工作

在开始之前,安装 Dapr Python 包

导入客户端包

dapr 包包含 DaprClient,用于创建和使用客户端。

from dapr.clients import DaprClient

初始化客户端

您可以通过多种方式初始化 Dapr 客户端:

默认值:

如果不提供参数初始化客户端,它将使用 Dapr sidecar 实例的默认值 (127.0.0.1:50001)。

from dapr.clients import DaprClient

with DaprClient() as d:
    # 使用客户端

在初始化时指定端点:

在构造函数中传递参数时,gRPC 端点优先于任何配置或环境变量。

from dapr.clients import DaprClient

with DaprClient("mydomain:50051?tls=true") as d:
    # 使用客户端

配置选项:

Dapr Sidecar 端点

您可以使用标准化的 DAPR_GRPC_ENDPOINT 环境变量来指定 gRPC 端点。当设置了此变量时,可以在没有任何参数的情况下初始化客户端:

export DAPR_GRPC_ENDPOINT="mydomain:50051?tls=true"
from dapr.clients import DaprClient

with DaprClient() as d:
    # 客户端将使用环境变量中指定的端点

旧的环境变量 DAPR_RUNTIME_HOSTDAPR_HTTP_PORTDAPR_GRPC_PORT 也被支持,但 DAPR_GRPC_ENDPOINT 优先。

Dapr API 令牌

如果您的 Dapr 实例配置为需要 DAPR_API_TOKEN 环境变量,您可以在环境中设置它,客户端将自动使用它。
您可以在这里阅读更多关于 Dapr API 令牌认证的信息。

健康检查超时

客户端初始化时,会对 Dapr sidecar (/healthz/outbound) 进行健康检查。客户端将在 sidecar 启动并运行后继续。

默认的健康检查超时时间为 60 秒,但可以通过设置 DAPR_HEALTH_TIMEOUT 环境变量来覆盖。

重试和超时

如果从 sidecar 收到特定错误代码,Dapr 客户端可以重试请求。这可以通过 DAPR_API_MAX_RETRIES 环境变量进行配置,并自动获取,不需要任何代码更改。 DAPR_API_MAX_RETRIES 的默认值为 0,这意味着不会进行重试。

您可以通过创建 dapr.clients.retry.RetryPolicy 对象并将其传递给 DaprClient 构造函数来微调更多重试参数:

from dapr.clients.retry import RetryPolicy

retry = RetryPolicy(
    max_attempts=5, 
    initial_backoff=1, 
    max_backoff=20, 
    backoff_multiplier=1.5,
    retryable_http_status_codes=[408, 429, 500, 502, 503, 504],
    retryable_grpc_status_codes=[StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED, ]
)

with DaprClient(retry_policy=retry) as d:
    ...

或对于 actor:

factory = ActorProxyFactory(retry_policy=RetryPolicy(max_attempts=3))
proxy = ActorProxy.create('DemoActor', ActorId('1'), DemoActorInterface, factory)

超时可以通过环境变量 DAPR_API_TIMEOUT_SECONDS 为所有调用设置。默认值为 60 秒。

注意:您可以通过将 timeout 参数传递给 invoke_method 方法来单独控制服务调用的超时。

错误处理

最初,Dapr 中的错误遵循 标准 gRPC 错误模型。然而,为了提供更详细和信息丰富的错误消息,在版本 1.13 中引入了一个增强的错误模型,与 gRPC 更丰富的错误模型 对齐。作为回应,Python SDK 实现了 DaprGrpcError,一个旨在改善开发者体验的自定义异常类。
需要注意的是,过渡到使用 DaprGrpcError 处理所有 gRPC 状态异常仍在进行中。目前,SDK 中的每个 API 调用尚未更新以利用此自定义异常。我们正在积极进行此增强,并欢迎社区的贡献。

使用 Dapr python-SDK 处理 DaprGrpcError 异常的示例:

try:
    d.save_state(store_name=storeName, key=key, value=value)
except DaprGrpcError as err:
    print(f'状态代码: {err.code()}')
    print(f"消息: {err.message()}")
    print(f"错误代码: {err.error_code()}")
    print(f"错误信息(原因): {err.error_info.reason}")
    print(f"资源信息 (资源类型): {err.resource_info.resource_type}")
    print(f"资源信息 (资源名称): {err.resource_info.resource_name}")
    print(f"错误请求 (字段): {err.bad_request.field_violations[0].field}")
    print(f"错误请求 (描述): {err.bad_request.field_violations[0].description}")

构建块

Python SDK 允许您与所有 Dapr 构建块 进行接口交互。

调用服务

Dapr Python SDK 提供了一个简单的 API,用于通过 HTTP 或 gRPC(已弃用)调用服务。可以通过设置 DAPR_API_METHOD_INVOCATION_PROTOCOL 环境变量来选择协议,默认情况下未设置时为 HTTP。Dapr 中的 GRPC 服务调用已弃用,建议使用 GRPC 代理作为替代。

from dapr.clients import DaprClient

with DaprClient() as d:
    # 调用方法 (gRPC 或 HTTP GET)    
    resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"message":"Hello World"}')

    # 对于其他 HTTP 动词,必须指定动词
    # 调用 'POST' 方法 (仅限 HTTP)    
    resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"id":"100", "FirstName":"Value", "LastName":"Value"}', http_verb='post')

HTTP API 调用的基本端点在 DAPR_HTTP_ENDPOINT 环境变量中指定。 如果未设置此变量,则端点值从 DAPR_RUNTIME_HOSTDAPR_HTTP_PORT 变量派生,其默认值分别为 127.0.0.13500

gRPC 调用的基本端点是用于客户端初始化的端点(如上所述)。

保存和获取应用程序状态

from dapr.clients import DaprClient

with DaprClient() as d:
    # 保存状态
    d.save_state(store_name="statestore", key="key1", value="value1")

    # 获取状态
    data = d.get_state(store_name="statestore", key="key1").data

    # 删除状态
    d.delete_state(store_name="statestore", key="key1")

查询应用程序状态 (Alpha)

    from dapr import DaprClient

    query = '''
    {
        "filter": {
            "EQ": { "state": "CA" }
        },
        "sort": [
            {
                "key": "person.id",
                "order": "DESC"
            }
        ]
    }
    '''

    with DaprClient() as d:
        resp = d.query_state(
            store_name='state_store',
            query=query,
            states_metadata={"metakey": "metavalue"},  # 可选
        )

发布和订阅

发布消息

from dapr.clients import DaprClient

with DaprClient() as d:
    resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')

订阅消息

from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import json

app = App()

# 默认订阅一个主题
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> None:
    data = json.loads(event.Data())
    print(f'接收到: id={data["id"]}, message="{data ["message"]}"' 
          ' content_type="{event.content_type}"',flush=True)

# 使用 Pub/Sub 路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
               rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
    data = json.loads(event.Data())
    print(f'接收到: id={data["id"]}, message="{data ["message"]}"' 
          ' content_type="{event.content_type}"',flush=True)

流式消息订阅

您可以使用 subscribesubscribe_handler 方法创建对 PubSub 主题的流式订阅。

subscribe 方法返回一个 Subscription 对象,允许您通过调用 next_message 方法从流中提取消息。这将在等待消息时阻塞主线程。完成后,您应该调用 close 方法以终止订阅并停止接收消息。

subscribe_with_handler 方法接受一个回调函数,该函数针对从流中接收到的每条消息执行。它在单独的线程中运行,因此不会阻塞主线程。回调应返回一个 TopicEventResponse(例如 TopicEventResponse('success')),指示消息是否已成功处理、应重试或应丢弃。该方法将根据返回的状态自动管理消息确认。对 subscribe_with_handler 方法的调用返回一个关闭函数,完成后应调用该函数以终止订阅。

以下是使用 subscribe 方法的示例:

import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError

counter = 0


def process_message(message):
    global counter
    counter += 1
    # 在此处处理消息
    print(f'处理消息: {message.data()} 来自 {message.topic()}...')
    return 'success'


def main():
    with DaprClient() as client:
        global counter

        subscription = client.subscribe(
            pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD'
        )

        try:
            while counter < 5:
                try:
                    message = subscription.next_message()

                except StreamInactiveError as e:
                    print('流不活跃。重试...')
                    time.sleep(1)
                    continue
                if message is None:
                    print('在超时时间内未收到消息。')
                    continue

                # 处理消息
                response_status = process_message(message)

                if response_status == 'success':
                    subscription.respond_success(message)
                elif response_status == 'retry':
                    subscription.respond_retry(message)
                elif response_status == 'drop':
                    subscription.respond_drop(message)

        finally:
            print("关闭订阅...")
            subscription.close()


if __name__ == '__main__':
    main()

以下是使用 subscribe_with_handler 方法的示例:

import time

from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse

counter = 0


def process_message(message):
    # 在此处处理消息
    global counter
    counter += 1
    print(f'处理消息: {message.data()} 来自 {message.topic()}...')
    return TopicEventResponse('success')


def main():
    with (DaprClient() as client):
        # 这将启动一个新线程,该线程将监听消息
        # 并在 `process_message` 函数中处理它们
        close_fn = client.subscribe_with_handler(
            pubsub_name='pubsub', topic='TOPIC_A', handler_fn=process_message,
            dead_letter_topic='TOPIC_A_DEAD'
        )

        while counter < 5:
            time.sleep(1)

        print("关闭订阅...")
        close_fn()


if __name__ == '__main__':
    main()

与输出绑定交互

from dapr.clients import DaprClient

with DaprClient() as d:
    resp = d.invoke_binding(binding_name='kafkaBinding', operation='create', data='{"message":"Hello World"}')

检索秘密

from dapr.clients import DaprClient

with DaprClient() as d:
    resp = d.get_secret(store_name='localsecretstore', key='secretKey')

配置

获取配置

from dapr.clients import DaprClient

with DaprClient() as d:
    # 获取配置
    configuration = d.get_configuration(store_name='configurationstore', keys=['orderId'], config_metadata={})

订阅配置

import asyncio
from time import sleep
from dapr.clients import DaprClient

async def executeConfiguration():
    with DaprClient() as d:
        storeName = 'configurationstore'

        key = 'orderId'

        # 在 20 秒内等待 sidecar 启动。
        d.wait(20)

        # 通过键订阅配置。
        configuration = await d.subscribe_configuration(store_name=storeName, keys=[key], config_metadata={})
        while True:
            if configuration != None:
                items = configuration.get_items()
                for key, item in items:
                    print(f"订阅键={key} 值={item.value} 版本={item.version}", flush=True)
            else:
                print("尚无内容")
        sleep(5)

asyncio.run(executeConfiguration())

分布式锁

from dapr.clients import DaprClient

def main():
    # 锁参数
    store_name = 'lockstore'  # 在 components/lockstore.yaml 中定义
    resource_id = 'example-lock-resource'
    client_id = 'example-client-id'
    expiry_in_seconds = 60

    with DaprClient() as dapr:
        print('将尝试从名为 [%s] 的锁存储中获取锁' % store_name)
        print('锁是为名为 [%s] 的资源准备的' % resource_id)
        print('客户端标识符是 [%s]' % client_id)
        print('锁将在 %s 秒后过期。' % expiry_in_seconds)

        with dapr.try_lock(store_name, resource_id, client_id, expiry_in_seconds) as lock_result:
            assert lock_result.success, '获取锁失败。中止。'
            print('锁获取成功!!!')

        # 此时锁已释放 - 通过 `with` 子句的魔力 ;)
        unlock_result = dapr.unlock(store_name, resource_id, client_id)
        print('我们已经释放了锁,因此解锁将不起作用。')
        print('我们仍然尝试解锁它,并得到了 [%s]' % unlock_result.status)

加密

from dapr.clients import DaprClient

message = 'The secret is "passw0rd"'

def main():
    with DaprClient() as d:
        resp = d.encrypt(
            data=message.encode(),
            options=EncryptOptions(
                component_name='crypto-localstorage',
                key_name='rsa-private-key.pem',
                key_wrap_algorithm='RSA',
            ),
        )
        encrypt_bytes = resp.read()

        resp = d.decrypt(
            data=encrypt_bytes,
            options=DecryptOptions(
                component_name='crypto-localstorage',
                key_name='rsa-private-key.pem',
            ),
        )
        decrypt_bytes = resp.read()

        print(decrypt_bytes.decode())  # The secret is "passw0rd"

工作流

from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

def main():
    with DaprClient() as d:
        host = settings.DAPR_RUNTIME_HOST
        port = settings.DAPR_GRPC_PORT
        workflowRuntime = WorkflowRuntime(host, port)
        workflowRuntime = WorkflowRuntime()
        workflowRuntime.register_workflow(hello_world_wf)
        workflowRuntime.register_activity(hello_act)
        workflowRuntime.start()

        # 启动工作流
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # ...

        # 暂停测试
        d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"从 {workflowName} 获取暂停调用后的响应: {getResponse.runtime_status}")

        # 恢复测试
        d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"从 {workflowName} 获取恢复调用后的响应: {getResponse.runtime_status}")
        
        sleep(1)
        # 触发事件
        d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

        sleep(5)
        # 清除测试
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

        
        # 启动另一个工作流以进行终止
        # 这也将测试在旧实例被清除后在新工作流上使用相同的实例 ID
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # 终止测试
        d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        sleep(1)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"从 {workflowName} 获取终止调用后的响应: {getResponse.runtime_status}")

        # 清除测试
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

        workflowRuntime.shutdown()

相关链接

Python SDK 示例

2 - 使用 Dapr actor Python SDK 入门

如何使用 Dapr Python SDK 快速上手

Dapr actor 包使您能够从 Python 应用程序与 Dapr 虚拟 actor 交互。

先决条件

actor 接口

接口定义了 actor 实现和调用 actor 的客户端之间共享的协议。由于客户端可能依赖于此协议,通常将其定义在与 actor 实现分开的模块中是有意义的。

from dapr.actor import ActorInterface, actormethod

class DemoActorInterface(ActorInterface):
    @actormethod(name="GetMyData")
    async def get_my_data(self) -> object:
        ...

actor 服务

actor 服务负责托管虚拟 actor。它是一个从基类 Actor 派生并实现 actor 接口中定义的类。

可以使用以下 Dapr actor 扩展之一创建 actor:

actor 客户端

actor 客户端用于实现调用 actor 接口中定义的方法。

import asyncio

from dapr.actor import ActorProxy, ActorId
from demo_actor_interface import DemoActorInterface

async def main():
    # 创建代理客户端
    proxy = ActorProxy.create('DemoActor', ActorId('1'), DemoActorInterface)

    # 在客户端上调用方法
    resp = await proxy.GetMyData()

示例

访问此页面获取可运行的 actor 示例。

3 - Dapr Python SDK 插件

用于开发 Dapr 应用的 Python SDK 工具

3.1 - 开始使用 Dapr Python gRPC 服务扩展

如何启动并运行 Dapr Python gRPC 扩展

Dapr Python SDK 提供了一个用于创建 Dapr 服务的内置 gRPC 服务器扩展 dapr.ext.grpc

安装

您可以通过以下命令下载并安装 Dapr gRPC 服务器扩展:

pip install dapr-ext-grpc
pip3 install dapr-ext-grpc-dev

示例

您可以使用 App 对象来创建一个服务器。

监听服务调用请求

可以使用 InvokeMethodRequestInvokeMethodResponse 对象来处理传入的请求。

以下是一个简单的服务示例,它会监听并响应请求:

from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse

app = App()

@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
    print(request.metadata, flush=True)
    print(request.text(), flush=True)

    return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")

app.run(50051)

完整示例可以在这里找到。

订阅主题

在订阅主题时,您可以指示 dapr 事件是否已被接受,或者是否应该丢弃或稍后重试。

from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse

app = App()

# 默认的主题订阅
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
    print(event.Data(), flush=True)
    # 返回 None(或不显式返回)等同于返回 TopicEventResponse("success")。
    # 您还可以返回 TopicEventResponse("retry") 以便 dapr 记录消息并稍后重试交付,
    # 或者返回 TopicEventResponse("drop") 以丢弃消息
    return TopicEventResponse("success")

# 使用发布/订阅路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
               rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
    print(event.Data(), flush=True)

# 禁用主题验证的处理程序
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
    print(event.Data(), flush=True)

app.run(50051)

完整示例可以在这里找到。

设置输入绑定触发器

from dapr.ext.grpc import App, BindingRequest

app = App()

@app.binding('kafkaBinding')
def binding(request: BindingRequest):
    print(request.text(), flush=True)

app.run(50051)

完整示例可以在这里找到。

相关链接

3.2 - Dapr Python SDK 与 FastAPI 集成指南

如何使用 FastAPI 扩展创建 Dapr Python actor 和发布订阅功能

Dapr Python SDK 通过 dapr-ext-fastapi 扩展实现与 FastAPI 的集成。

安装

您可以通过以下命令下载并安装 Dapr FastAPI 扩展:

pip install dapr-ext-fastapi
pip install dapr-ext-fastapi-dev

示例

订阅不同类型的事件

import uvicorn
from fastapi import Body, FastAPI
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel

class RawEventModel(BaseModel):
    body: str

class User(BaseModel):
    id: int
    name = 'Jane Doe'

class CloudEventModel(BaseModel):
    data: User
    datacontenttype: str
    id: str
    pubsubname: str
    source: str
    specversion: str
    topic: str
    traceid: str
    traceparent: str
    tracestate: str
    type: str    
    
app = FastAPI()
dapr_app = DaprApp(app)

# 处理任意结构的事件(简单但不够可靠)
# dapr publish --publish-app-id sample --topic any_topic --pubsub pubsub --data '{"id":"7", "desc": "good", "size":"small"}'
@dapr_app.subscribe(pubsub='pubsub', topic='any_topic')
def any_event_handler(event_data = Body()):
    print(event_data)    

# 为了更稳健,根据发布者是否使用 CloudEvents 选择以下之一

# 处理使用 CloudEvents 发送的事件
# dapr publish --publish-app-id sample --topic cloud_topic --pubsub pubsub --data '{"id":"7", "name":"Bob Jones"}'
@dapr_app.subscribe(pubsub='pubsub', topic='cloud_topic')
def cloud_event_handler(event_data: CloudEventModel):
    print(event_data)   

# 处理未使用 CloudEvents 发送的原始事件
# curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/raw_topic?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"body": "345"}'
@dapr_app.subscribe(pubsub='pubsub', topic='raw_topic')
def raw_event_handler(event_data: RawEventModel):
    print(event_data)    

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=30212)

创建一个 actor

from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from demo_actor import DemoActor

app = FastAPI(title=f'{DemoActor.__name__}服务')

# 添加 Dapr actor 扩展
actor = DaprActor(app)

@app.on_event("startup")
async def startup_event():
    # 注册 DemoActor
    await actor.register_actor(DemoActor)

@app.get("/GetMyData")
def get_my_data():
    return "{'message': 'myData'}"

3.3 - Dapr Python SDK 与 Flask 集成

如何使用 Flask 扩展创建 Dapr Python 虚拟 actor

Dapr Python SDK 使用 flask-dapr 扩展来实现与 Flask 的集成。

安装

您可以通过以下命令下载并安装 Dapr Flask 扩展:

pip install flask-dapr
pip install flask-dapr-dev

示例

from flask import Flask
from flask_dapr.actor import DaprActor

from dapr.conf import settings
from demo_actor import DemoActor

app = Flask(f'{DemoActor.__name__}Service')

# 启用 DaprActor Flask 扩展
actor = DaprActor(app)

# 注册 DemoActor
actor.register_actor(DemoActor)

# 设置方法路由
@app.route('/GetMyData', methods=['GET'])
def get_my_data():
    return {'message': 'myData'}, 200

# 运行应用程序
if __name__ == '__main__':
    app.run(port=settings.HTTP_APP_PORT)

3.4 - Dapr Python SDK 与 Dapr Workflow 扩展集成

如何使用 Dapr Workflow 扩展快速上手

Dapr Python SDK 内置了一个 Dapr Workflow 扩展,dapr.ext.workflow,用于创建 Dapr 服务。

安装

您可以通过以下命令下载并安装 Dapr Workflow 扩展:

pip install dapr-ext-workflow
pip3 install dapr-ext-workflow-dev

下一步

开始使用 Dapr Workflow Python SDK

3.4.1 - 使用 Dapr Workflow Python SDK 入门

如何使用 Dapr Python SDK 开始并运行工作流

我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的 hello world 工作流示例,您将会:

此示例使用 dapr init 的默认配置在本地模式下运行。

在 Python 示例项目中,app.py 文件包含应用程序的设置,其中包括:

  • 工作流定义
  • 工作流活动定义
  • 工作流和工作流活动的注册

先决条件

设置环境

运行以下命令以安装使用 Dapr Python SDK 运行此工作流示例的必要依赖。

pip3 install -r demo_workflow/requirements.txt

克隆 [Python SDK 仓库]。

git clone https://github.com/dapr/python-sdk.git

从 Python SDK 根目录导航到 Dapr 工作流示例。

cd examples/demo_workflow

本地运行应用程序

要运行 Dapr 应用程序,您需要启动 Python 程序和一个 Dapr 辅助进程。在终端中运行:

dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py

注意: 由于 Windows 中未定义 Python3.exe,您可能需要使用 python app.py 而不是 python3 app.py

预期输出

== APP == ==========根据输入开始计数器增加==========

== APP == start_resp exampleInstanceID

== APP == 你好,计数器!
== APP == 新的计数器值是:1!

== APP == 你好,计数器!
== APP == 新的计数器值是:11!

== APP == 你好,计数器!
== APP == 你好,计数器!
== APP == 在暂停调用后从 hello_world_wf 获取响应:已暂停

== APP == 你好,计数器!
== APP == 在恢复调用后从 hello_world_wf 获取响应:运行中

== APP == 你好,计数器!
== APP == 新的计数器值是:111!

== APP == 你好,计数器!
== APP == 实例成功清除

== APP == start_resp exampleInstanceID

== APP == 你好,计数器!
== APP == 新的计数器值是:1112!

== APP == 你好,计数器!
== APP == 新的计数器值是:1122!

== APP == 在终止调用后从 hello_world_wf 获取响应:已终止
== APP == 在终止调用后从 child_wf 获取响应:已终止
== APP == 实例成功清除

发生了什么?

当您运行 dapr run 时,Dapr 客户端:

  1. 注册了工作流 (hello_world_wf) 及其活动 (hello_act)
  2. 启动了工作流引擎
def main():
    with DaprClient() as d:
        host = settings.DAPR_RUNTIME_HOST
        port = settings.DAPR_GRPC_PORT
        workflowRuntime = WorkflowRuntime(host, port)
        workflowRuntime = WorkflowRuntime()
        workflowRuntime.register_workflow(hello_world_wf)
        workflowRuntime.register_activity(hello_act)
        workflowRuntime.start()

        print("==========根据输入开始计数器增加==========")
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

然后 Dapr 暂停并恢复了工作流:

       # 暂停
        d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在暂停调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

        # 恢复
        d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在恢复调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

一旦工作流恢复,Dapr 触发了一个工作流事件并打印了新的计数器值:

        # 触发事件
        d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

为了从您的状态存储中清除工作流状态,Dapr 清除了工作流:

        # 清除
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

然后示例演示了通过以下步骤终止工作流:

  • 使用与已清除工作流相同的 instanceId 启动一个新的工作流。
  • 在关闭工作流之前终止并清除工作流。
        # 启动另一个工作流
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # 终止
        d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        sleep(1)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"在终止调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")

        # 清除
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("实例成功清除")

下一步