1 - 使用 Dapr 客户端 Python SDK 入门
如何使用 Dapr Python SDK 快速上手
Dapr 客户端包使您能够从 Python 应用程序与其他 Dapr 应用程序进行交互。
注意
如果您还没有尝试过,
请尝试其中一个快速入门,以快速了解如何使用 Dapr Python SDK 和 API 构建块。
准备工作
在开始之前,安装 Dapr Python 包。
导入客户端包
dapr
包包含 DaprClient
,用于创建和使用客户端。
from dapr.clients import DaprClient
初始化客户端
您可以通过多种方式初始化 Dapr 客户端:
默认值:
如果不提供参数初始化客户端,它将使用 Dapr sidecar 实例的默认值 (127.0.0.1:50001
)。
from dapr.clients import DaprClient
with DaprClient() as d:
# 使用客户端
在初始化时指定端点:
在构造函数中传递参数时,gRPC 端点优先于任何配置或环境变量。
from dapr.clients import DaprClient
with DaprClient("mydomain:50051?tls=true") as d:
# 使用客户端
配置选项:
Dapr Sidecar 端点
您可以使用标准化的 DAPR_GRPC_ENDPOINT
环境变量来指定 gRPC 端点。当设置了此变量时,可以在没有任何参数的情况下初始化客户端:
export DAPR_GRPC_ENDPOINT="mydomain:50051?tls=true"
from dapr.clients import DaprClient
with DaprClient() as d:
# 客户端将使用环境变量中指定的端点
旧的环境变量 DAPR_RUNTIME_HOST
、DAPR_HTTP_PORT
和 DAPR_GRPC_PORT
也被支持,但 DAPR_GRPC_ENDPOINT
优先。
Dapr API 令牌
如果您的 Dapr 实例配置为需要 DAPR_API_TOKEN
环境变量,您可以在环境中设置它,客户端将自动使用它。
您可以在这里阅读更多关于 Dapr API 令牌认证的信息。
健康检查超时
客户端初始化时,会对 Dapr sidecar (/healthz/outbound
) 进行健康检查。客户端将在 sidecar 启动并运行后继续。
默认的健康检查超时时间为 60 秒,但可以通过设置 DAPR_HEALTH_TIMEOUT
环境变量来覆盖。
重试和超时
如果从 sidecar 收到特定错误代码,Dapr 客户端可以重试请求。这可以通过 DAPR_API_MAX_RETRIES
环境变量进行配置,并自动获取,不需要任何代码更改。
DAPR_API_MAX_RETRIES
的默认值为 0
,这意味着不会进行重试。
您可以通过创建 dapr.clients.retry.RetryPolicy
对象并将其传递给 DaprClient 构造函数来微调更多重试参数:
from dapr.clients.retry import RetryPolicy
retry = RetryPolicy(
max_attempts=5,
initial_backoff=1,
max_backoff=20,
backoff_multiplier=1.5,
retryable_http_status_codes=[408, 429, 500, 502, 503, 504],
retryable_grpc_status_codes=[StatusCode.UNAVAILABLE, StatusCode.DEADLINE_EXCEEDED, ]
)
with DaprClient(retry_policy=retry) as d:
...
或对于 actor:
factory = ActorProxyFactory(retry_policy=RetryPolicy(max_attempts=3))
proxy = ActorProxy.create('DemoActor', ActorId('1'), DemoActorInterface, factory)
超时可以通过环境变量 DAPR_API_TIMEOUT_SECONDS
为所有调用设置。默认值为 60 秒。
注意:您可以通过将 timeout
参数传递给 invoke_method
方法来单独控制服务调用的超时。
错误处理
最初,Dapr 中的错误遵循 标准 gRPC 错误模型。然而,为了提供更详细和信息丰富的错误消息,在版本 1.13 中引入了一个增强的错误模型,与 gRPC 更丰富的错误模型 对齐。作为回应,Python SDK 实现了 DaprGrpcError
,一个旨在改善开发者体验的自定义异常类。
需要注意的是,过渡到使用 DaprGrpcError
处理所有 gRPC 状态异常仍在进行中。目前,SDK 中的每个 API 调用尚未更新以利用此自定义异常。我们正在积极进行此增强,并欢迎社区的贡献。
使用 Dapr python-SDK 处理 DaprGrpcError
异常的示例:
try:
d.save_state(store_name=storeName, key=key, value=value)
except DaprGrpcError as err:
print(f'状态代码: {err.code()}')
print(f"消息: {err.message()}")
print(f"错误代码: {err.error_code()}")
print(f"错误信息(原因): {err.error_info.reason}")
print(f"资源信息 (资源类型): {err.resource_info.resource_type}")
print(f"资源信息 (资源名称): {err.resource_info.resource_name}")
print(f"错误请求 (字段): {err.bad_request.field_violations[0].field}")
print(f"错误请求 (描述): {err.bad_request.field_violations[0].description}")
构建块
Python SDK 允许您与所有 Dapr 构建块 进行接口交互。
调用服务
Dapr Python SDK 提供了一个简单的 API,用于通过 HTTP 或 gRPC(已弃用)调用服务。可以通过设置 DAPR_API_METHOD_INVOCATION_PROTOCOL
环境变量来选择协议,默认情况下未设置时为 HTTP。Dapr 中的 GRPC 服务调用已弃用,建议使用 GRPC 代理作为替代。
from dapr.clients import DaprClient
with DaprClient() as d:
# 调用方法 (gRPC 或 HTTP GET)
resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"message":"Hello World"}')
# 对于其他 HTTP 动词,必须指定动词
# 调用 'POST' 方法 (仅限 HTTP)
resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"id":"100", "FirstName":"Value", "LastName":"Value"}', http_verb='post')
HTTP API 调用的基本端点在 DAPR_HTTP_ENDPOINT
环境变量中指定。
如果未设置此变量,则端点值从 DAPR_RUNTIME_HOST
和 DAPR_HTTP_PORT
变量派生,其默认值分别为 127.0.0.1
和 3500
。
gRPC 调用的基本端点是用于客户端初始化的端点(如上所述)。
保存和获取应用程序状态
from dapr.clients import DaprClient
with DaprClient() as d:
# 保存状态
d.save_state(store_name="statestore", key="key1", value="value1")
# 获取状态
data = d.get_state(store_name="statestore", key="key1").data
# 删除状态
d.delete_state(store_name="statestore", key="key1")
查询应用程序状态 (Alpha)
from dapr import DaprClient
query = '''
{
"filter": {
"EQ": { "state": "CA" }
},
"sort": [
{
"key": "person.id",
"order": "DESC"
}
]
}
'''
with DaprClient() as d:
resp = d.query_state(
store_name='state_store',
query=query,
states_metadata={"metakey": "metavalue"}, # 可选
)
发布和订阅
发布消息
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')
订阅消息
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import json
app = App()
# 默认订阅一个主题
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> None:
data = json.loads(event.Data())
print(f'接收到: id={data["id"]}, message="{data ["message"]}"'
' content_type="{event.content_type}"',flush=True)
# 使用 Pub/Sub 路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
data = json.loads(event.Data())
print(f'接收到: id={data["id"]}, message="{data ["message"]}"'
' content_type="{event.content_type}"',flush=True)
流式消息订阅
您可以使用 subscribe
或 subscribe_handler
方法创建对 PubSub 主题的流式订阅。
subscribe
方法返回一个 Subscription
对象,允许您通过调用 next_message
方法从流中提取消息。这将在等待消息时阻塞主线程。完成后,您应该调用 close 方法以终止订阅并停止接收消息。
subscribe_with_handler
方法接受一个回调函数,该函数针对从流中接收到的每条消息执行。它在单独的线程中运行,因此不会阻塞主线程。回调应返回一个 TopicEventResponse
(例如 TopicEventResponse('success')
),指示消息是否已成功处理、应重试或应丢弃。该方法将根据返回的状态自动管理消息确认。对 subscribe_with_handler
方法的调用返回一个关闭函数,完成后应调用该函数以终止订阅。
以下是使用 subscribe
方法的示例:
import time
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
counter = 0
def process_message(message):
global counter
counter += 1
# 在此处处理消息
print(f'处理消息: {message.data()} 来自 {message.topic()}...')
return 'success'
def main():
with DaprClient() as client:
global counter
subscription = client.subscribe(
pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD'
)
try:
while counter < 5:
try:
message = subscription.next_message()
except StreamInactiveError as e:
print('流不活跃。重试...')
time.sleep(1)
continue
if message is None:
print('在超时时间内未收到消息。')
continue
# 处理消息
response_status = process_message(message)
if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
finally:
print("关闭订阅...")
subscription.close()
if __name__ == '__main__':
main()
以下是使用 subscribe_with_handler
方法的示例:
import time
from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
counter = 0
def process_message(message):
# 在此处处理消息
global counter
counter += 1
print(f'处理消息: {message.data()} 来自 {message.topic()}...')
return TopicEventResponse('success')
def main():
with (DaprClient() as client):
# 这将启动一个新线程,该线程将监听消息
# 并在 `process_message` 函数中处理它们
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub', topic='TOPIC_A', handler_fn=process_message,
dead_letter_topic='TOPIC_A_DEAD'
)
while counter < 5:
time.sleep(1)
print("关闭订阅...")
close_fn()
if __name__ == '__main__':
main()
与输出绑定交互
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.invoke_binding(binding_name='kafkaBinding', operation='create', data='{"message":"Hello World"}')
检索秘密
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.get_secret(store_name='localsecretstore', key='secretKey')
配置
获取配置
from dapr.clients import DaprClient
with DaprClient() as d:
# 获取配置
configuration = d.get_configuration(store_name='configurationstore', keys=['orderId'], config_metadata={})
订阅配置
import asyncio
from time import sleep
from dapr.clients import DaprClient
async def executeConfiguration():
with DaprClient() as d:
storeName = 'configurationstore'
key = 'orderId'
# 在 20 秒内等待 sidecar 启动。
d.wait(20)
# 通过键订阅配置。
configuration = await d.subscribe_configuration(store_name=storeName, keys=[key], config_metadata={})
while True:
if configuration != None:
items = configuration.get_items()
for key, item in items:
print(f"订阅键={key} 值={item.value} 版本={item.version}", flush=True)
else:
print("尚无内容")
sleep(5)
asyncio.run(executeConfiguration())
分布式锁
from dapr.clients import DaprClient
def main():
# 锁参数
store_name = 'lockstore' # 在 components/lockstore.yaml 中定义
resource_id = 'example-lock-resource'
client_id = 'example-client-id'
expiry_in_seconds = 60
with DaprClient() as dapr:
print('将尝试从名为 [%s] 的锁存储中获取锁' % store_name)
print('锁是为名为 [%s] 的资源准备的' % resource_id)
print('客户端标识符是 [%s]' % client_id)
print('锁将在 %s 秒后过期。' % expiry_in_seconds)
with dapr.try_lock(store_name, resource_id, client_id, expiry_in_seconds) as lock_result:
assert lock_result.success, '获取锁失败。中止。'
print('锁获取成功!!!')
# 此时锁已释放 - 通过 `with` 子句的魔力 ;)
unlock_result = dapr.unlock(store_name, resource_id, client_id)
print('我们已经释放了锁,因此解锁将不起作用。')
print('我们仍然尝试解锁它,并得到了 [%s]' % unlock_result.status)
加密
from dapr.clients import DaprClient
message = 'The secret is "passw0rd"'
def main():
with DaprClient() as d:
resp = d.encrypt(
data=message.encode(),
options=EncryptOptions(
component_name='crypto-localstorage',
key_name='rsa-private-key.pem',
key_wrap_algorithm='RSA',
),
)
encrypt_bytes = resp.read()
resp = d.decrypt(
data=encrypt_bytes,
options=DecryptOptions(
component_name='crypto-localstorage',
key_name='rsa-private-key.pem',
),
)
decrypt_bytes = resp.read()
print(decrypt_bytes.decode()) # The secret is "passw0rd"
工作流
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"
def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
# 启动工作流
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# ...
# 暂停测试
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"从 {workflowName} 获取暂停调用后的响应: {getResponse.runtime_status}")
# 恢复测试
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"从 {workflowName} 获取恢复调用后的响应: {getResponse.runtime_status}")
sleep(1)
# 触发事件
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
sleep(5)
# 清除测试
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")
# 启动另一个工作流以进行终止
# 这也将测试在旧实例被清除后在新工作流上使用相同的实例 ID
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# 终止测试
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"从 {workflowName} 获取终止调用后的响应: {getResponse.runtime_status}")
# 清除测试
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")
workflowRuntime.shutdown()
相关链接
Python SDK 示例
2 - 使用 Dapr actor Python SDK 入门
如何使用 Dapr Python SDK 快速上手
Dapr actor 包使您能够从 Python 应用程序与 Dapr 虚拟 actor 交互。
先决条件
actor 接口
接口定义了 actor 实现和调用 actor 的客户端之间共享的协议。由于客户端可能依赖于此协议,通常将其定义在与 actor 实现分开的模块中是有意义的。
from dapr.actor import ActorInterface, actormethod
class DemoActorInterface(ActorInterface):
@actormethod(name="GetMyData")
async def get_my_data(self) -> object:
...
actor 服务
actor 服务负责托管虚拟 actor。它是一个从基类 Actor
派生并实现 actor 接口中定义的类。
可以使用以下 Dapr actor 扩展之一创建 actor:
actor 客户端
actor 客户端用于实现调用 actor 接口中定义的方法。
import asyncio
from dapr.actor import ActorProxy, ActorId
from demo_actor_interface import DemoActorInterface
async def main():
# 创建代理客户端
proxy = ActorProxy.create('DemoActor', ActorId('1'), DemoActorInterface)
# 在客户端上调用方法
resp = await proxy.GetMyData()
示例
访问此页面获取可运行的 actor 示例。
3.1 - 开始使用 Dapr Python gRPC 服务扩展
如何启动并运行 Dapr Python gRPC 扩展
Dapr Python SDK 提供了一个用于创建 Dapr 服务的内置 gRPC 服务器扩展 dapr.ext.grpc
。
安装
您可以通过以下命令下载并安装 Dapr gRPC 服务器扩展:
pip install dapr-ext-grpc
注意
开发包包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保卸载任何稳定版本的 Python SDK 扩展。
pip3 install dapr-ext-grpc-dev
示例
您可以使用 App
对象来创建一个服务器。
监听服务调用请求
可以使用 InvokeMethodRequest
和 InvokeMethodResponse
对象来处理传入的请求。
以下是一个简单的服务示例,它会监听并响应请求:
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
app = App()
@app.method(name='my-method')
def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
print(request.metadata, flush=True)
print(request.text(), flush=True)
return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
app.run(50051)
完整示例可以在这里找到。
订阅主题
在订阅主题时,您可以指示 dapr 事件是否已被接受,或者是否应该丢弃或稍后重试。
from typing import Optional
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse
app = App()
# 默认的主题订阅
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
print(event.Data(), flush=True)
# 返回 None(或不显式返回)等同于返回 TopicEventResponse("success")。
# 您还可以返回 TopicEventResponse("retry") 以便 dapr 记录消息并稍后重试交付,
# 或者返回 TopicEventResponse("drop") 以丢弃消息
return TopicEventResponse("success")
# 使用发布/订阅路由的特定处理程序
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
print(event.Data(), flush=True)
# 禁用主题验证的处理程序
@app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
def mytopic_wildcard(event: v1.Event) -> None:
print(event.Data(), flush=True)
app.run(50051)
完整示例可以在这里找到。
设置输入绑定触发器
from dapr.ext.grpc import App, BindingRequest
app = App()
@app.binding('kafkaBinding')
def binding(request: BindingRequest):
print(request.text(), flush=True)
app.run(50051)
完整示例可以在这里找到。
相关链接
3.2 - Dapr Python SDK 与 FastAPI 集成指南
如何使用 FastAPI 扩展创建 Dapr Python actor 和发布订阅功能
Dapr Python SDK 通过 dapr-ext-fastapi
扩展实现与 FastAPI 的集成。
安装
您可以通过以下命令下载并安装 Dapr FastAPI 扩展:
pip install dapr-ext-fastapi
注意
开发版包含与 Dapr 运行时预发布版本兼容的功能。在安装 <code>dapr-dev</code> 包之前,请先卸载任何稳定版本的 Python SDK 扩展。
pip install dapr-ext-fastapi-dev
示例
订阅不同类型的事件
import uvicorn
from fastapi import Body, FastAPI
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel
class RawEventModel(BaseModel):
body: str
class User(BaseModel):
id: int
name = 'Jane Doe'
class CloudEventModel(BaseModel):
data: User
datacontenttype: str
id: str
pubsubname: str
source: str
specversion: str
topic: str
traceid: str
traceparent: str
tracestate: str
type: str
app = FastAPI()
dapr_app = DaprApp(app)
# 处理任意结构的事件(简单但不够可靠)
# dapr publish --publish-app-id sample --topic any_topic --pubsub pubsub --data '{"id":"7", "desc": "good", "size":"small"}'
@dapr_app.subscribe(pubsub='pubsub', topic='any_topic')
def any_event_handler(event_data = Body()):
print(event_data)
# 为了更稳健,根据发布者是否使用 CloudEvents 选择以下之一
# 处理使用 CloudEvents 发送的事件
# dapr publish --publish-app-id sample --topic cloud_topic --pubsub pubsub --data '{"id":"7", "name":"Bob Jones"}'
@dapr_app.subscribe(pubsub='pubsub', topic='cloud_topic')
def cloud_event_handler(event_data: CloudEventModel):
print(event_data)
# 处理未使用 CloudEvents 发送的原始事件
# curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/raw_topic?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"body": "345"}'
@dapr_app.subscribe(pubsub='pubsub', topic='raw_topic')
def raw_event_handler(event_data: RawEventModel):
print(event_data)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=30212)
创建一个 actor
from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from demo_actor import DemoActor
app = FastAPI(title=f'{DemoActor.__name__}服务')
# 添加 Dapr actor 扩展
actor = DaprActor(app)
@app.on_event("startup")
async def startup_event():
# 注册 DemoActor
await actor.register_actor(DemoActor)
@app.get("/GetMyData")
def get_my_data():
return "{'message': 'myData'}"
3.3 - Dapr Python SDK 与 Flask 集成
如何使用 Flask 扩展创建 Dapr Python 虚拟 actor
Dapr Python SDK 使用 flask-dapr
扩展来实现与 Flask 的集成。
安装
您可以通过以下命令下载并安装 Dapr Flask 扩展:
注意
开发版包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保卸载任何已安装的稳定版 Python SDK 扩展。
pip install flask-dapr-dev
示例
from flask import Flask
from flask_dapr.actor import DaprActor
from dapr.conf import settings
from demo_actor import DemoActor
app = Flask(f'{DemoActor.__name__}Service')
# 启用 DaprActor Flask 扩展
actor = DaprActor(app)
# 注册 DemoActor
actor.register_actor(DemoActor)
# 设置方法路由
@app.route('/GetMyData', methods=['GET'])
def get_my_data():
return {'message': 'myData'}, 200
# 运行应用程序
if __name__ == '__main__':
app.run(port=settings.HTTP_APP_PORT)
3.4 - Dapr Python SDK 与 Dapr Workflow 扩展集成
如何使用 Dapr Workflow 扩展快速上手
注意
Dapr Workflow 目前处于初始测试阶段(alpha)。Dapr Python SDK 内置了一个 Dapr Workflow 扩展,dapr.ext.workflow
,用于创建 Dapr 服务。
安装
您可以通过以下命令下载并安装 Dapr Workflow 扩展:
pip install dapr-ext-workflow
注意
开发包包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保卸载任何已安装的稳定版 Python SDK 扩展。
pip3 install dapr-ext-workflow-dev
下一步
开始使用 Dapr Workflow Python SDK3.4.1 - 使用 Dapr Workflow Python SDK 入门
如何使用 Dapr Python SDK 开始并运行工作流
注意
Dapr Workflow 目前处于 alpha 阶段。我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的 hello world 工作流示例,您将会:
此示例使用 dapr init
的默认配置在本地模式下运行。
在 Python 示例项目中,app.py
文件包含应用程序的设置,其中包括:
先决条件
设置环境
运行以下命令以安装使用 Dapr Python SDK 运行此工作流示例的必要依赖。
pip3 install -r demo_workflow/requirements.txt
克隆 [Python SDK 仓库]。
git clone https://github.com/dapr/python-sdk.git
从 Python SDK 根目录导航到 Dapr 工作流示例。
cd examples/demo_workflow
本地运行应用程序
要运行 Dapr 应用程序,您需要启动 Python 程序和一个 Dapr 辅助进程。在终端中运行:
dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py
注意: 由于 Windows 中未定义 Python3.exe,您可能需要使用 python app.py
而不是 python3 app.py
。
预期输出
== APP == ==========根据输入开始计数器增加==========
== APP == start_resp exampleInstanceID
== APP == 你好,计数器!
== APP == 新的计数器值是:1!
== APP == 你好,计数器!
== APP == 新的计数器值是:11!
== APP == 你好,计数器!
== APP == 你好,计数器!
== APP == 在暂停调用后从 hello_world_wf 获取响应:已暂停
== APP == 你好,计数器!
== APP == 在恢复调用后从 hello_world_wf 获取响应:运行中
== APP == 你好,计数器!
== APP == 新的计数器值是:111!
== APP == 你好,计数器!
== APP == 实例成功清除
== APP == start_resp exampleInstanceID
== APP == 你好,计数器!
== APP == 新的计数器值是:1112!
== APP == 你好,计数器!
== APP == 新的计数器值是:1122!
== APP == 在终止调用后从 hello_world_wf 获取响应:已终止
== APP == 在终止调用后从 child_wf 获取响应:已终止
== APP == 实例成功清除
发生了什么?
当您运行 dapr run
时,Dapr 客户端:
- 注册了工作流 (
hello_world_wf
) 及其活动 (hello_act
) - 启动了工作流引擎
def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
print("==========根据输入开始计数器增加==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
然后 Dapr 暂停并恢复了工作流:
# 暂停
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在暂停调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
# 恢复
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在恢复调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
一旦工作流恢复,Dapr 触发了一个工作流事件并打印了新的计数器值:
# 触发事件
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
为了从您的状态存储中清除工作流状态,Dapr 清除了工作流:
# 清除
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")
然后示例演示了通过以下步骤终止工作流:
- 使用与已清除工作流相同的
instanceId
启动一个新的工作流。 - 在关闭工作流之前终止并清除工作流。
# 启动另一个工作流
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# 终止
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在终止调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
# 清除
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")
下一步