我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的 hello world 工作流示例,您将会:
DaprClient
的 Python 控制台应用程序此示例使用 dapr init
的默认配置在本地模式下运行。
在 Python 示例项目中,app.py
文件包含应用程序的设置,其中包括:
运行以下命令以安装使用 Dapr Python SDK 运行此工作流示例的必要依赖。
pip3 install -r demo_workflow/requirements.txt
克隆 [Python SDK 仓库]。
git clone https://github.com/dapr/python-sdk.git
从 Python SDK 根目录导航到 Dapr 工作流示例。
cd examples/demo_workflow
要运行 Dapr 应用程序,您需要启动 Python 程序和一个 Dapr 辅助进程。在终端中运行:
dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resources-path components --placement-host-address localhost:50005 -- python3 app.py
注意: 由于 Windows 中未定义 Python3.exe,您可能需要使用
python app.py
而不是python3 app.py
。
预期输出
== APP == ==========根据输入开始计数器增加==========
== APP == start_resp exampleInstanceID
== APP == 你好,计数器!
== APP == 新的计数器值是:1!
== APP == 你好,计数器!
== APP == 新的计数器值是:11!
== APP == 你好,计数器!
== APP == 你好,计数器!
== APP == 在暂停调用后从 hello_world_wf 获取响应:已暂停
== APP == 你好,计数器!
== APP == 在恢复调用后从 hello_world_wf 获取响应:运行中
== APP == 你好,计数器!
== APP == 新的计数器值是:111!
== APP == 你好,计数器!
== APP == 实例成功清除
== APP == start_resp exampleInstanceID
== APP == 你好,计数器!
== APP == 新的计数器值是:1112!
== APP == 你好,计数器!
== APP == 新的计数器值是:1122!
== APP == 在终止调用后从 hello_world_wf 获取响应:已终止
== APP == 在终止调用后从 child_wf 获取响应:已终止
== APP == 实例成功清除
当您运行 dapr run
时,Dapr 客户端:
hello_world_wf
) 及其活动 (hello_act
)def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
print("==========根据输入开始计数器增加==========")
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
然后 Dapr 暂停并恢复了工作流:
# 暂停
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在暂停调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
# 恢复
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在恢复调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
一旦工作流恢复,Dapr 触发了一个工作流事件并打印了新的计数器值:
# 触发事件
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
为了从您的状态存储中清除工作流状态,Dapr 清除了工作流:
# 清除
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")
然后示例演示了通过以下步骤终止工作流:
instanceId
启动一个新的工作流。 # 启动另一个工作流
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# 终止
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"在终止调用后从 {workflowName} 获取响应:{getResponse.runtime_status}")
# 清除
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("实例成功清除")