This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

工作流

在各种微服务中进行逻辑编排

1 - 工作流概述

Dapr 工作流概述

Dapr 工作流让开发人员能够可靠地编写业务逻辑和集成。由于 Dapr 工作流是有状态的,它们支持长时间运行和容错应用程序,非常适合编排微服务。Dapr 工作流与其他 Dapr 构建块(如服务调用、发布订阅、状态管理和绑定)无缝协作。

Dapr 工作流的耐用性和弹性功能包括:

  • 提供内置的工作流运行时以驱动 Dapr 工作流执行。
  • 提供用于在代码中编写工作流的 SDK,支持多种编程语言。
  • 提供用于管理工作流(启动、查询、暂停/恢复、触发事件、终止、清除)的 HTTP 和 gRPC API。
  • 通过工作流组件与其他工作流运行时集成。
显示 Dapr 工作流基础的图示

Dapr 工作流可以应用于以下场景:

  • 涉及库存管理、支付系统和运输服务之间编排的订单处理。
  • 协调多个部门和参与者任务的人力资源入职工作流。
  • 在全国餐饮连锁店中协调数字菜单更新的推出。
  • 涉及基于 API 的分类和存储的图像处理工作流。

功能

工作流和活动

使用 Dapr 工作流,您可以编写活动,然后在工作流中编排这些活动。工作流活动是:

  • 工作流中的基本工作单元
  • 用于调用其他(Dapr)服务、与状态存储交互以及发布订阅代理。

了解更多关于工作流活动的信息。

子工作流

除了活动之外,您还可以编写工作流以调度其他工作流作为子工作流。子工作流具有独立于启动它的父工作流的实例 ID、历史记录和状态,除了终止父工作流会终止由其创建的所有子工作流这一事实。子工作流还支持自动重试策略。

了解更多关于子工作流的信息。

定时器和提醒

与 Dapr actor 相同,您可以为任何时间范围安排类似提醒的持久延迟。

了解更多关于工作流定时器提醒

使用 HTTP 调用管理工作流

当您使用工作流代码创建应用程序并使用 Dapr 运行它时,您可以调用驻留在应用程序中的特定工作流。每个单独的工作流可以:

  • 通过 POST 请求启动或终止
  • 通过 POST 请求触发以传递命名事件
  • 通过 POST 请求暂停然后恢复
  • 通过 POST 请求从您的状态存储中清除
  • 通过 GET 请求查询工作流状态

了解更多关于如何使用 HTTP 调用管理工作流的信息。

工作流模式

Dapr 工作流简化了微服务架构中复杂的、有状态的协调需求。以下部分描述了可以从 Dapr 工作流中受益的几种应用程序模式。

了解更多关于不同类型的工作流模式

工作流 SDK

Dapr 工作流 编写 SDK 是特定语言的 SDK,包含用于实现工作流逻辑的类型和函数。工作流逻辑存在于您的应用程序中,并由运行在 Dapr sidecar 中的 Dapr 工作流引擎通过 gRPC 流进行编排。

支持的 SDK

您可以使用以下 SDK 编写工作流。

语言栈
Pythondapr-ext-workflow
JavaScriptDaprWorkflowClient
.NETDapr.Workflow
Javaio.dapr.workflows
Goworkflow

试用工作流

快速入门和教程

想要测试工作流?通过以下快速入门和教程来查看工作流的实际应用:

快速入门/教程描述
工作流快速入门运行一个包含四个工作流活动的工作流应用程序,查看 Dapr 工作流的实际应用
工作流 Python SDK 示例了解如何使用 Python dapr-ext-workflow 包创建和调用 Dapr 工作流。
工作流 JavaScript SDK 示例了解如何使用 JavaScript SDK 创建和调用 Dapr 工作流。
工作流 .NET SDK 示例了解如何使用 ASP.NET Core web API 创建和调用 Dapr 工作流。
工作流 Java SDK 示例了解如何使用 Java io.dapr.workflows 包创建和调用 Dapr 工作流。
工作流 Go SDK 示例了解如何使用 Go workflow 包创建和调用 Dapr 工作流。

直接在您的应用中开始使用工作流

想要跳过快速入门?没问题。您可以直接在您的应用程序中试用工作流构建块。在Dapr 安装完成后,您可以开始使用工作流,从如何编写工作流开始。

限制

  • 状态存储: 由于某些数据库选择的底层限制,通常是 NoSQL 数据库,您可能会遇到存储内部状态的限制。例如,CosmosDB 在单个请求中最多只能有 100 个状态的单操作项限制。

观看演示

观看此视频以了解 Dapr 工作流的概述

下一步

工作流功能和概念 >>

相关链接

2 - 功能和概念

详细了解 Dapr 工作流的功能和概念

在您已经从高层次了解了工作流构建块之后,让我们深入探讨 Dapr 工作流引擎和 SDK 所包含的功能和概念。Dapr 工作流在所有支持的语言中都提供了几个核心功能和概念。

工作流

Dapr 工作流是您编写的函数,用于定义一系列按特定顺序执行的任务。Dapr 工作流引擎负责调度和执行这些任务,包括管理故障和重试。如果托管工作流的应用程序在多台机器上扩展,工作流引擎还可以在多台机器上负载均衡工作流及其任务的执行。

工作流可以调度多种类型的任务,包括:

  • 用于执行自定义逻辑的活动
  • 用于将工作流休眠任意时间长度的持久计时器
  • 用于将较大的工作流分解为较小部分的子工作流
  • 用于阻塞工作流直到接收到外部事件信号的外部事件等待器。这些任务在其相应的部分中有更详细的描述。

工作流标识

每个您定义的工作流都有一个类型名称,工作流的每次执行都需要一个唯一的_实例 ID_。工作流实例 ID 可以由您的应用程序代码生成,这在工作流对应于业务实体(如文档或作业)时很有用,或者可以是自动生成的 UUID。工作流的实例 ID 对于调试以及使用工作流 API管理工作流非常有用。

在任何给定时间,只能存在一个具有给定 ID 的工作流实例。然而,如果一个工作流实例完成或失败,其 ID 可以被新的工作流实例重用。但请注意,新工作流实例实际上会在配置的状态存储中替换旧的实例。

工作流重放

Dapr 工作流通过使用一种称为事件溯源的技术来维护其执行状态。工作流引擎不是将工作流的当前状态存储为快照,而是管理一个仅追加的历史事件日志,描述工作流所采取的各种步骤。当使用工作流 SDK 时,这些历史事件会在工作流“等待”计划任务的结果时自动存储。

当工作流“等待”计划任务时,它会从内存中卸载自己,直到任务完成。一旦任务完成,工作流引擎会再次调度工作流函数运行。此时的工作流函数执行被称为_重放_。

当工作流函数被重放时,它会从头开始再次运行。然而,当它遇到已经完成的任务时,工作流引擎不会再次调度该任务,而是:

  1. 将已完成任务的存储结果返回给工作流。
  2. 继续执行直到下一个“等待”点。

这种“重放”行为会持续到工作流函数完成或因错误而失败。

通过这种重放技术,工作流能够从任何“等待”点恢复执行,就像它从未从内存中卸载过一样。即使是先前运行的局部变量的值也可以恢复,而无需工作流引擎了解它们存储了什么数据。这种恢复状态的能力使 Dapr 工作流具有_持久性_和_容错性_。

无限循环和永恒工作流

工作流重放部分所述,工作流维护其所有操作的仅写事件溯源历史日志。为了避免资源使用失控,工作流必须限制其调度的操作数量。例如,确保您的工作流不会:

  • 在其实现中使用无限循环
  • 调度数千个任务。

您可以使用以下两种技术来编写可能需要调度极大量任务的工作流:

  1. 使用 continue-as-new API
    每个工作流 SDK 都公开了一个 continue-as-new API,工作流可以调用该 API 以使用新的输入和历史记录重新启动自己。continue-as-new API 特别适合实现“永恒工作流”,如监控代理,否则将使用 while (true) 类构造实现。使用 continue-as-new 是保持工作流历史记录小的好方法。

    continue-as-new API 会截断现有历史记录,并用新的历史记录替换它。

  2. 使用子工作流
    每个工作流 SDK 都公开了一个用于创建子工作流的 API。子工作流的行为与任何其他工作流相同,只是它由父工作流调度。子工作流具有:

    • 自己的历史记录
    • 在多台机器上分布工作流函数执行的好处。

    如果一个工作流需要调度数千个或更多任务,建议将这些任务分布在子工作流中,以免单个工作流的历史记录大小过大。

更新工作流代码

由于工作流是长时间运行且持久的,因此更新工作流代码必须非常小心。如工作流确定性限制部分所述,工作流代码必须是确定性的。如果系统中有任何未完成的工作流实例,更新工作流代码必须保留这种确定性。否则,更新工作流代码可能会导致下次这些工作流执行时出现运行时故障。

查看已知限制

工作流活动

工作流活动是工作流中的基本工作单元,是在业务流程中被编排的任务。例如,您可能会创建一个工作流来处理订单。任务可能涉及检查库存、向客户收费和创建发货。每个任务将是一个单独的活动。这些活动可以串行执行、并行执行或两者的某种组合。

与工作流不同,活动在您可以在其中执行的工作类型上没有限制。活动经常用于进行网络调用或运行 CPU 密集型操作。活动还可以将数据返回给工作流。

Dapr 工作流引擎保证每个被调用的活动在工作流的执行过程中至少执行一次。由于活动仅保证至少一次执行,建议尽可能将活动逻辑实现为幂等。

子工作流

除了活动之外,工作流还可以调度其他工作流作为_子工作流_。子工作流具有独立于启动它的父工作流的实例 ID、历史记录和状态。

子工作流有许多好处:

  • 您可以将大型工作流拆分为一系列较小的子工作流,使您的代码更易于维护。
  • 您可以在多个计算节点上同时分布工作流逻辑,这在您的工作流逻辑需要协调大量任务时很有用。
  • 您可以通过保持父工作流的历史记录较小来减少内存使用和 CPU 开销。

子工作流的返回值是其输出。如果子工作流因异常而失败,则该异常会像活动任务失败时一样显示给父工作流。子工作流还支持自动重试策略。

终止父工作流会终止由工作流实例创建的所有子工作流。有关更多信息,请参阅终止工作流 API

持久计时器

Dapr 工作流允许您为任何时间范围安排类似提醒的持久延迟,包括分钟、天甚至年。这些_持久计时器_可以由工作流安排以实现简单的延迟或为其他异步任务设置临时超时。更具体地说,持久计时器可以设置为在特定日期触发或在指定持续时间后触发。持久计时器的最大持续时间没有限制,它们在内部由内部 actor 提醒支持。例如,跟踪服务 30 天免费订阅的工作流可以使用在工作流创建后 30 天触发的持久计时器实现。工作流在等待持久计时器触发时可以安全地从内存中卸载。

重试策略

工作流支持活动和子工作流的持久重试策略。工作流重试策略与Dapr 弹性策略在以下方面是分开的和不同的。

  • 工作流重试策略由工作流作者在代码中配置,而 Dapr 弹性策略由应用程序操作员在 YAML 中配置。
  • 工作流重试策略是持久的,并在应用程序重启时保持其状态,而 Dapr 弹性策略不是持久的,必须在应用程序重启后重新应用。
  • 工作流重试策略由活动和子工作流中的未处理错误/异常触发,而 Dapr 弹性策略由操作超时和连接故障触发。

重试在内部使用持久计时器实现。这意味着工作流在等待重试触发时可以安全地从内存中卸载,从而节省系统资源。这也意味着重试之间的延迟可以任意长,包括分钟、小时甚至天。

可以同时使用工作流重试策略和 Dapr 弹性策略。例如,如果工作流活动使用 Dapr 客户端调用服务,则 Dapr 客户端使用配置的弹性策略。有关示例的更多信息,请参阅快速入门:服务到服务的弹性。但是,如果活动本身因任何原因失败,包括耗尽弹性策略的重试次数,则工作流的弹性策略会启动。

由于工作流重试策略是在代码中配置的,因此具体的开发者体验可能会因工作流 SDK 的版本而异。通常,工作流重试策略可以通过以下参数进行配置。

参数描述
最大尝试次数执行活动或子工作流的最大次数。
首次重试间隔第一次重试前的等待时间。
退避系数用于确定退避增长率的系数。例如,系数为 2 会使每次后续重试的等待时间加倍。
最大重试间隔每次后续重试前的最大等待时间。
重试超时重试的总体超时,无论配置的最大尝试次数如何。

外部事件

有时工作流需要等待由外部系统引发的事件。例如,如果订单处理工作流中的总成本超过某个阈值,审批工作流可能需要人类明确批准订单请求。另一个例子是一个问答游戏编排工作流,它在等待所有参与者提交他们对问答问题的答案时暂停。这些中间执行输入被称为_外部事件_。

外部事件具有_名称_和_负载_,并传递给单个工作流实例。工作流可以创建“等待外部事件”任务,订阅外部事件并_等待_这些任务以阻止执行,直到接收到事件。然后,工作流可以读取这些事件的负载,并决定采取哪些下一步。外部事件可以串行或并行处理。外部事件可以由其他工作流或工作流代码引发。

工作流还可以等待同名的多个外部事件信号,在这种情况下,它们会以先进先出 (FIFO) 的方式分派给相应的工作流任务。如果工作流接收到外部事件信号,但尚未创建“等待外部事件”任务,则事件将保存到工作流的历史记录中,并在工作流请求事件后立即消费。

了解有关外部系统交互的更多信息。

工作流后端

Dapr 工作流依赖于 Go 的持久任务框架(即 durabletask-go)作为执行工作流的核心引擎。该引擎设计为支持多种后端实现。例如,durabletask-go 仓库包括一个 SQLite 实现,Dapr 仓库包括一个 actor 实现。

默认情况下,Dapr 工作流支持 actor 后端,该后端稳定且可扩展。然而,您可以选择 Dapr 工作流中支持的其他后端。例如,SQLite(待定未来版本)可以是本地开发和测试的后端选项。

后端实现在很大程度上与您看到的工作流核心引擎或编程模型解耦。后端主要影响:

  • 如何存储工作流状态
  • 如何在副本之间协调工作流执行

从这个意义上说,它类似于 Dapr 的状态存储抽象,但专为工作流设计。无论使用哪个后端,所有 API 和编程模型功能都是相同的。

清除

工作流状态可以从状态存储中清除,清除其所有历史记录并删除与特定工作流实例相关的所有元数据。清除功能用于已运行到 COMPLETEDFAILEDTERMINATED 状态的工作流。

工作流 API 参考指南中了解更多信息。

限制

工作流确定性和代码限制

为了利用工作流重放技术,您的工作流代码需要是确定性的。为了使您的工作流代码确定性,您可能需要绕过一些限制。

工作流函数必须调用确定性 API。

生成随机数、随机 UUID 或当前日期的 API 是_非确定性_的。要解决此限制,您可以:

  • 在活动函数中使用这些 API,或
  • (首选)使用 SDK 提供的内置等效 API。例如,每个创作 SDK 都提供了一个以确定性方式检索当前时间的 API。

例如,不要这样做:

// 不要这样做!
DateTime currentTime = DateTime.UtcNow;
Guid newIdentifier = Guid.NewGuid();
string randomString = GetRandomString();
// 不要这样做!
Instant currentTime = Instant.now();
UUID newIdentifier = UUID.randomUUID();
String randomString = getRandomString();
// 不要这样做!
const currentTime = new Date();
const newIdentifier = uuidv4();
const randomString = getRandomString();
// 不要这样做!
const currentTime = time.Now()

这样做:

// 这样做!!
DateTime currentTime = context.CurrentUtcDateTime;
Guid newIdentifier = context.NewGuid();
string randomString = await context.CallActivityAsync<string>("GetRandomString");
// 这样做!!
Instant currentTime = context.getCurrentInstant();
Guid newIdentifier = context.newGuid();
String randomString = context.callActivity(GetRandomString.class.getName(), String.class).await();
// 这样做!!
const currentTime = context.getCurrentUtcDateTime();
const randomString = yield context.callActivity(getRandomString);
const currentTime = ctx.CurrentUTCDateTime()

工作流函数必须仅_间接_与外部状态交互。

外部数据包括任何不存储在工作流状态中的数据。工作流不得与全局变量、环境变量、文件系统交互或进行网络调用。

相反,工作流应通过工作流输入、活动任务和外部事件处理_间接_与外部状态交互。

例如,不要这样做:

// 不要这样做!
string configuration = Environment.GetEnvironmentVariable("MY_CONFIGURATION")!;
string data = await new HttpClient().GetStringAsync("https://example.com/api/data");
// 不要这样做!
String configuration = System.getenv("MY_CONFIGURATION");

HttpRequest request = HttpRequest.newBuilder().uri(new URI("https://postman-echo.com/post")).GET().build();
HttpResponse<String> response = HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString());
// 不要这样做!
// 访问环境变量(Node.js)
const configuration = process.env.MY_CONFIGURATION;

fetch('https://postman-echo.com/get')
  .then(response => response.text())
  .then(data => {
    console.log(data);
  })
  .catch(error => {
    console.error('Error:', error);
  });
// 不要这样做!
resp, err := http.Get("http://example.com/api/data")

这样做:

// 这样做!!
string configuration = workflowInput.Configuration; // 假想的工作流输入参数
string data = await context.CallActivityAsync<string>("MakeHttpCall", "https://example.com/api/data");
// 这样做!!
String configuration = ctx.getInput(InputType.class).getConfiguration(); // 假想的工作流输入参数
String data = ctx.callActivity(MakeHttpCall.class, "https://example.com/api/data", String.class).await();
// 这样做!!
const configuration = workflowInput.getConfiguration(); // 假想的工作流输入参数
const data = yield ctx.callActivity(makeHttpCall, "https://example.com/api/data");
// 这样做!!
err := ctx.CallActivity(MakeHttpCallActivity, workflow.ActivityInput("https://example.com/api/data")).Await(&output)

工作流函数必须仅在工作流调度线程上执行。

每种语言 SDK 的实现要求所有工作流函数操作在函数被调度的同一线程(goroutine 等)上运行。工作流函数绝不能:

  • 调度后台线程,或
  • 使用调度回调函数在另一个线程上运行的 API。

不遵循此规则可能导致未定义的行为。任何后台处理都应委托给活动任务,这些任务可以串行或并行调度运行。

例如,不要这样做:

// 不要这样做!
Task t = Task.Run(() => context.CallActivityAsync("DoSomething"));
await context.CreateTimer(5000).ConfigureAwait(false);
// 不要这样做!
new Thread(() -> {
    ctx.callActivity(DoSomethingActivity.class.getName()).await();
}).start();
ctx.createTimer(Duration.ofSeconds(5)).await();

不要将 JavaScript 工作流声明为 async。Node.js 运行时不保证异步函数是确定性的。

// 不要这样做!
go func() {
  err := ctx.CallActivity(DoSomething).Await(nil)
}()
err := ctx.CreateTimer(time.Second).Await(nil)

这样做:

// 这样做!!
Task t = context.CallActivityAsync("DoSomething");
await context.CreateTimer(5000).ConfigureAwait(true);
// 这样做!!
ctx.callActivity(DoSomethingActivity.class.getName()).await();
ctx.createTimer(Duration.ofSeconds(5)).await();

由于 Node.js 运行时不保证异步函数是确定性的,因此始终将 JavaScript 工作流声明为同步生成器函数。

// 这样做!
task := ctx.CallActivity(DoSomething)
task.Await(nil)

更新工作流代码

确保您对工作流代码所做的更新保持其确定性。以下是可能破坏工作流确定性的代码更新示例:

  • 更改工作流函数签名
    更改工作流或活动函数的名称、输入或输出被视为重大更改,必须避免。

  • 更改工作流任务的数量或顺序
    更改工作流任务的数量或顺序会导致工作流实例的历史记录不再与代码匹配,可能导致运行时错误或其他意外行为。

要解决这些限制:

  • 不要更新现有工作流代码,而是保持现有工作流代码不变,并创建包含更新的新工作流定义。
  • 上游创建工作流的代码应仅更新以创建新工作流的实例。
  • 保留旧代码以确保现有工作流实例可以继续运行而不受干扰。如果已知旧工作流逻辑的所有实例都已完成,则可以安全地删除旧工作流代码。

下一步

工作流模式 >>

相关链接

3 - 工作流模式

编写不同类型的工作流模式

Dapr 工作流简化了微服务架构中复杂且有状态的协调需求。以下部分描述了几种可以从 Dapr 工作流中受益的应用程序模式。

任务链

在任务链模式中,工作流中的多个步骤按顺序运行,一个步骤的输出可以作为下一个步骤的输入。任务链工作流通常涉及创建一系列需要对某些数据执行的操作,例如过滤、转换和归约。

显示任务链工作流模式如何工作的图示

在某些情况下,工作流的步骤可能需要在多个微服务之间进行协调。为了提高可靠性和可扩展性,您还可能使用队列来触发各个步骤。

虽然模式简单,但实现中隐藏了许多复杂性。例如:

  • 如果某个微服务长时间不可用,会发生什么?
  • 可以自动重试失败的步骤吗?
  • 如果不能,如何促进先前完成步骤的回滚(如果适用)?
  • 除了实现细节之外,是否有办法可视化工作流,以便其他工程师可以理解它的作用和工作原理?

Dapr 工作流通过允许您在所选编程语言中将任务链模式简洁地实现为简单函数来解决这些复杂性,如以下示例所示。

import dapr.ext.workflow as wf


def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
    try:
        result1 = yield ctx.call_activity(step1, input=wf_input)
        result2 = yield ctx.call_activity(step2, input=result1)
        result3 = yield ctx.call_activity(step3, input=result2)
    except Exception as e:
        yield ctx.call_activity(error_handler, input=str(e))
        raise
    return [result1, result2, result3]


def step1(ctx, activity_input):
    print(f'步骤 1: 接收到输入: {activity_input}.')
    # 执行一些操作
    return activity_input + 1


def step2(ctx, activity_input):
    print(f'步骤 2: 接收到输入: {activity_input}.')
    # 执行一些操作
    return activity_input * 2


def step3(ctx, activity_input):
    print(f'步骤 3: 接收到输入: {activity_input}.')
    # 执行一些操作
    return activity_input ^ 2


def error_handler(ctx, error):
    print(f'执行错误处理程序: {error}.')
    # 执行一些补偿操作

注意 工作流重试策略将在 Python SDK 的未来版本中提供。

import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";

async function start() {
  // 更新 gRPC 客户端和工作者以使用本地地址和端口
  const daprHost = "localhost";
  const daprPort = "50001";
  const workflowClient = new DaprWorkflowClient({
    daprHost,
    daprPort,
  });
  const workflowRuntime = new WorkflowRuntime({
    daprHost,
    daprPort,
  });

  const hello = async (_: WorkflowActivityContext, name: string) => {
    return `Hello ${name}!`;
  };

  const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
    const cities: string[] = [];

    const result1 = yield ctx.callActivity(hello, "Tokyo");
    cities.push(result1);
    const result2 = yield ctx.callActivity(hello, "Seattle");
    cities.push(result2);
    const result3 = yield ctx.callActivity(hello, "London");
    cities.push(result3);

    return cities;
  };

  workflowRuntime.registerWorkflow(sequence).registerActivity(hello);

  // 将工作者启动包装在 try-catch 块中以处理启动期间的任何错误
  try {
    await workflowRuntime.start();
    console.log("工作流运行时启动成功");
  } catch (error) {
    console.error("启动工作流运行时时出错:", error);
  }

  // 调度新的编排
  try {
    const id = await workflowClient.scheduleNewWorkflow(sequence);
    console.log(`编排已调度,ID: ${id}`);

    // 等待编排完成
    const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);

    console.log(`编排完成!结果: ${state?.serializedOutput}`);
  } catch (error) {
    console.error("调度或等待编排时出错:", error);
  }

  await workflowRuntime.stop();
  await workflowClient.stop();

  // 停止 dapr sidecar
  process.exit(0);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
// 支持长时间中断的指数退避重试策略
var retryOptions = new WorkflowTaskOptions
{
    RetryPolicy = new WorkflowRetryPolicy(
        firstRetryInterval: TimeSpan.FromMinutes(1),
        backoffCoefficient: 2.0,
        maxRetryInterval: TimeSpan.FromHours(1),
        maxNumberOfAttempts: 10),
};

try
{
    var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryOptions);
    var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryOptions);
    var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryOptions);
    return string.Join(", ", result4);
}
catch (TaskFailedException) // 任务失败会作为 TaskFailedException 显示
{
    // 重试过期 - 应用自定义补偿逻辑
    await context.CallActivityAsync<long[]>("MyCompensation", options: retryOptions);
    throw;
}

注意 在上面的示例中,"Step1""Step2""Step3""MyCompensation" 代表工作流活动,它们是您代码中实际实现工作流步骤的函数。为了简洁起见,这些活动实现未包含在此示例中。

public class ChainWorkflow extends Workflow {
    @Override
    public WorkflowStub create() {
        return ctx -> {
            StringBuilder sb = new StringBuilder();
            String wfInput = ctx.getInput(String.class);
            String result1 = ctx.callActivity("Step1", wfInput, String.class).await();
            String result2 = ctx.callActivity("Step2", result1, String.class).await();
            String result3 = ctx.callActivity("Step3", result2, String.class).await();
            String result = sb.append(result1).append(',').append(result2).append(',').append(result3).toString();
            ctx.complete(result);
        };
    }
}

    class Step1 implements WorkflowActivity {

        @Override
        public Object run(WorkflowActivityContext ctx) {
            Logger logger = LoggerFactory.getLogger(Step1.class);
            logger.info("Starting Activity: " + ctx.getName());
            // Do some work
            return null;
        }
    }

    class Step2 implements WorkflowActivity {

        @Override
        public Object run(WorkflowActivityContext ctx) {
            Logger logger = LoggerFactory.getLogger(Step2.class);
            logger.info("Starting Activity: " + ctx.getName());
            // Do some work
            return null;
        }
    }

    class Step3 implements WorkflowActivity {

        @Override
        public Object run(WorkflowActivityContext ctx) {
            Logger logger = LoggerFactory.getLogger(Step3.class);
            logger.info("Starting Activity: " + ctx.getName());
            // Do some work
            return null;
        }
    }
func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	var result1 int
	if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil {
		return nil, err
	}
	var result2 int
	if err := ctx.CallActivity(Step2, workflow.ActivityInput(input)).Await(&result2); err != nil {
		return nil, err
	}
	var result3 int
	if err := ctx.CallActivity(Step3, workflow.ActivityInput(input)).Await(&result3); err != nil {
		return nil, err
	}
	return []int{result1, result2, result3}, nil
}
func Step1(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	fmt.Printf("步骤 1: 接收到输入: %s", input)
	return input + 1, nil
}
func Step2(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	fmt.Printf("步骤 2: 接收到输入: %s", input)
	return input * 2, nil
}
func Step3(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	fmt.Printf("步骤 3: 接收到输入: %s", input)
	return int(math.Pow(float64(input), 2)), nil
}

如您所见,工作流被表达为所选编程语言中的简单语句序列。这使得组织中的任何工程师都可以快速理解端到端的流程,而不必了解端到端的系统架构。

在幕后,Dapr 工作流运行时:

  • 负责执行工作流并确保其运行到完成。
  • 自动保存进度。
  • 如果工作流进程本身因任何原因失败,自动从上次完成的步骤恢复工作流。
  • 允许在目标编程语言中自然地表达错误处理,使您可以轻松实现补偿逻辑。
  • 提供内置的重试配置原语,以简化为工作流中的各个步骤配置复杂重试策略的过程。

扇出/扇入

在扇出/扇入设计模式中,您可以在多个工作者上同时执行多个任务,等待它们完成,并对结果进行一些聚合。

显示扇出/扇入工作流模式如何工作的图示

除了前一个模式中提到的挑战外,在手动实现扇出/扇入模式时还有几个重要问题需要考虑:

  • 如何控制并行度?
  • 如何知道何时触发后续聚合步骤?
  • 如果并行步骤的数量是动态的怎么办?

Dapr 工作流提供了一种将扇出/扇入模式表达为简单函数的方法,如以下示例所示:

import time
from typing import List
import dapr.ext.workflow as wf


def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
    # 获取一批 N 个工作项以并行处理
    work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)

    # 调度 N 个并行任务以处理工作项并等待所有任务完成
    parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
    outputs = yield wf.when_all(parallel_tasks)

    # 聚合结果并将其发送到另一个活动
    total = sum(outputs)
    yield ctx.call_activity(process_results, input=total)


def get_work_batch(ctx, batch_size: int) -> List[int]:
    return [i + 1 for i in range(batch_size)]


def process_work_item(ctx, work_item: int) -> int:
    print(f'处理工作项: {work_item}.')
    time.sleep(5)
    result = work_item * 2
    print(f'工作项 {work_item} 已处理. 结果: {result}.')
    return result


def process_results(ctx, final_result: int):
    print(f'最终结果: {final_result}.')
import {
  Task,
  DaprWorkflowClient,
  WorkflowActivityContext,
  WorkflowContext,
  WorkflowRuntime,
  TWorkflow,
} from "@dapr/dapr";

// 将整个代码包装在一个立即调用的异步函数中
async function start() {
  // 更新 gRPC 客户端和工作者以使用本地地址和端口
  const daprHost = "localhost";
  const daprPort = "50001";
  const workflowClient = new DaprWorkflowClient({
    daprHost,
    daprPort,
  });
  const workflowRuntime = new WorkflowRuntime({
    daprHost,
    daprPort,
  });

  function getRandomInt(min: number, max: number): number {
    return Math.floor(Math.random() * (max - min + 1)) + min;
  }

  async function getWorkItemsActivity(_: WorkflowActivityContext): Promise<string[]> {
    const count: number = getRandomInt(2, 10);
    console.log(`生成 ${count} 个工作项...`);

    const workItems: string[] = Array.from({ length: count }, (_, i) => `工作项 ${i}`);
    return workItems;
  }

  function sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise<number> {
    console.log(`处理工作项: ${item}`);

    // 模拟一些需要可变时间的工作
    const sleepTime = Math.random() * 5000;
    await sleep(sleepTime);

    // 返回给定工作项的结果,在这种情况下也是一个随机数
    // 有关工作流中随机数的更多信息,请查看
    // https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers
    return Math.floor(Math.random() * 11);
  }

  const workflow: TWorkflow = async function* (ctx: WorkflowContext): any {
    const tasks: Task<any>[] = [];
    const workItems = yield ctx.callActivity(getWorkItemsActivity);
    for (const workItem of workItems) {
      tasks.push(ctx.callActivity(processWorkItemActivity, workItem));
    }
    const results: number[] = yield ctx.whenAll(tasks);
    const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
    return sum;
  };

  workflowRuntime.registerWorkflow(workflow);
  workflowRuntime.registerActivity(getWorkItemsActivity);
  workflowRuntime.registerActivity(processWorkItemActivity);

  // 将工作者启动包装在 try-catch 块中以处理启动期间的任何错误
  try {
    await workflowRuntime.start();
    console.log("工作者启动成功");
  } catch (error) {
    console.error("启动工作者时出错:", error);
  }

  // 调度新的编排
  try {
    const id = await workflowClient.scheduleNewWorkflow(workflow);
    console.log(`编排已调度,ID: ${id}`);

    // 等待编排完成
    const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);

    console.log(`编排完成!结果: ${state?.serializedOutput}`);
  } catch (error) {
    console.error("调度或等待编排时出错:", error);
  }

  // 停止工作者和客户端
  await workflowRuntime.stop();
  await workflowClient.stop();

  // 停止 dapr sidecar
  process.exit(0);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
// 获取要并行处理的 N 个工作项的列表。
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

// 调度并行任务,但不等待它们完成。
var parallelTasks = new List<Task<int>>(workBatch.Length);
for (int i = 0; i < workBatch.Length; i++)
{
    Task<int> task = context.CallActivityAsync<int>("ProcessWorkItem", workBatch[i]);
    parallelTasks.Add(task);
}

// 一切都已调度。在此处等待,直到所有并行任务完成。
await Task.WhenAll(parallelTasks);

// 聚合所有 N 个输出并发布结果。
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("PostResults", sum);
public class FaninoutWorkflow extends Workflow {
    @Override
    public WorkflowStub create() {
        return ctx -> {
            // 获取要并行处理的 N 个工作项的列表。
            Object[] workBatch = ctx.callActivity("GetWorkBatch", Object[].class).await();
            // 调度并行任务,但不等待它们完成。
            List<Task<Integer>> tasks = Arrays.stream(workBatch)
                    .map(workItem -> ctx.callActivity("ProcessWorkItem", workItem, int.class))
                    .collect(Collectors.toList());
            // 一切都已调度。在此处等待,直到所有并行任务完成。
            List<Integer> results = ctx.allOf(tasks).await();
            // 聚合所有 N 个输出并发布结果。
            int sum = results.stream().mapToInt(Integer::intValue).sum();
            ctx.complete(sum);
        };
    }
}
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return 0, err
	}
	var workBatch []int
	if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
		return 0, err
	}
	parallelTasks := workflow.NewTaskSlice(len(workBatch))
	for i, workItem := range workBatch {
		parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
	}
	var outputs int
	for _, task := range parallelTasks {
		var output int
		err := task.Await(&output)
		if err == nil {
			outputs += output
		} else {
			return 0, err
		}
	}
	if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
		return 0, err
	}
	return 0, nil
}
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
	var batchSize int
	if err := ctx.GetInput(&batchSize); err != nil {
		return 0, err
	}
	batch := make([]int, batchSize)
	for i := 0; i < batchSize; i++ {
		batch[i] = i
	}
	return batch, nil
}
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
	var workItem int
	if err := ctx.GetInput(&workItem); err != nil {
		return 0, err
	}
	fmt.Printf("处理工作项: %d\n", workItem)
	time.Sleep(time.Second * 5)
	result := workItem * 2
	fmt.Printf("工作项 %d 已处理. 结果: %d\n", workItem, result)
	return result, nil
}
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
	var finalResult int
	if err := ctx.GetInput(&finalResult); err != nil {
		return 0, err
	}
	fmt.Printf("最终结果: %d\n", finalResult)
	return finalResult, nil
}

此示例的关键要点是:

  • 扇出/扇入模式可以使用普通编程构造表达为简单函数
  • 并行任务的数量可以是静态的或动态的
  • 工作流本身能够聚合并行执行的结果

此外,工作流的执行是持久的。如果一个工作流启动了 100 个并行任务执行,并且只有 40 个在进程崩溃前完成,工作流会自动重新启动并仅调度剩余的 60 个任务。

可以进一步使用简单的、特定语言的构造来限制并发度。下面的示例代码说明了如何将扇出的程度限制为仅 5 个并发活动执行:


// 回顾之前的示例...
// 获取要并行处理的 N 个工作项的列表。
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

const int MaxParallelism = 5;
var results = new List<int>();
var inFlightTasks = new HashSet<Task<int>>();
foreach(var workItem in workBatch)
{
  if (inFlightTasks.Count >= MaxParallelism)
  {
    var finishedTask = await Task.WhenAny(inFlightTasks);
    results.Add(finishedTask.Result);
    inFlightTasks.Remove(finishedTask);
  }

  inFlightTasks.Add(context.CallActivityAsync<int>("ProcessWorkItem", workItem));
}
results.AddRange(await Task.WhenAll(inFlightTasks));

var sum = results.Sum(t => t);
await context.CallActivityAsync("PostResults", sum);

以这种方式限制并发度对于限制对共享资源的争用可能很有用。例如,如果活动需要调用具有自身并发限制的外部资源(如数据库或外部 API),则确保不超过指定数量的活动同时调用该资源可能很有用。

异步 HTTP API

异步 HTTP API 通常使用异步请求-回复模式实现。传统上实现此模式涉及以下步骤:

  1. 客户端向 HTTP API 端点(启动 API)发送请求
  2. 启动 API 将消息写入后端队列,从而触发长时间运行操作的开始
  3. 在调度后端操作后,启动 API 立即向客户端返回 HTTP 202 响应,其中包含可用于轮询状态的标识符
  4. 状态 API 查询包含长时间运行操作状态的数据库
  5. 客户端重复轮询 状态 API,直到某个超时到期或收到“完成”响应

以下图示说明了端到端流程。

显示异步请求响应模式如何工作的图示

实现异步请求-回复模式的挑战在于它涉及使用多个 API 和状态存储。它还涉及正确实现协议,以便客户端知道如何自动轮询状态并知道操作何时完成。

Dapr 工作流 HTTP API 开箱即支持异步请求-回复模式,无需编写任何代码或进行任何状态管理。

以下 curl 命令说明了工作流 API 如何支持此模式。

curl -X POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 -d '{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}'

上一个命令将导致以下响应 JSON:

{"instanceID":"12345678"}

HTTP 客户端然后可以使用工作流实例 ID 构建状态查询 URL,并反复轮询,直到在负载中看到“COMPLETE”、“FAILURE”或“TERMINATED”状态。

curl http://localhost:3500/v1.0/workflows/dapr/12345678

以下是进行中的工作流状态可能的样子。

{
  "instanceID": "12345678",
  "workflowName": "OrderProcessingWorkflow",
  "createdAt": "2023-05-03T23:22:11.143069826Z",
  "lastUpdatedAt": "2023-05-03T23:22:22.460025267Z",
  "runtimeStatus": "RUNNING",
  "properties": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}"
  }
}

如上例所示,工作流的运行时状态为 RUNNING,这让客户端知道它应该继续轮询。

如果工作流已完成,状态可能如下所示。

{
  "instanceID": "12345678",
  "workflowName": "OrderProcessingWorkflow",
  "createdAt": "2023-05-03T23:30:11.381146313Z",
  "lastUpdatedAt": "2023-05-03T23:30:52.923870615Z",
  "runtimeStatus": "COMPLETED",
  "properties": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}",
    "dapr.workflow.output": "{\"Processed\":true}"
  }
}

如上例所示,工作流的运行时状态现在为 COMPLETED,这意味着客户端可以停止轮询更新。

监控

监控模式是一个通常包括以下步骤的重复过程:

  1. 检查系统状态
  2. 根据该状态采取某些行动 - 例如发送通知
  3. 休眠一段时间
  4. 重复

下图提供了此模式的粗略说明。

显示监控模式如何工作的图示

根据业务需求,可能只有一个监控器,也可能有多个监控器,每个业务实体(例如股票)一个。此外,休眠时间可能需要根据情况进行更改。这些要求使得使用基于 cron 的调度系统不切实际。

Dapr 工作流通过允许您实现_永恒工作流_本地支持此模式。Dapr 工作流公开了一个 continue-as-new API,工作流作者可以使用该 API 从头开始使用新输入重新启动工作流函数,而不是编写无限循环(这是一种反模式)。

from dataclasses import dataclass
from datetime import timedelta
import random
import dapr.ext.workflow as wf


@dataclass
class JobStatus:
    job_id: str
    is_healthy: bool


def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
    # 轮询与此 job 关联的状态端点
    status = yield ctx.call_activity(check_status, input=job)
    if not ctx.is_replaying:
        print(f"Job '{job.job_id}' is {status}.")

    if status == "healthy":
        job.is_healthy = True
        next_sleep_interval = 60  # 在健康状态下检查频率较低
    else:
        if job.is_healthy:
            job.is_healthy = False
            ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!")
        next_sleep_interval = 5  # 在不健康状态下检查频率较高

    yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(minutes=next_sleep_interval))

    # 使用新的 JobStatus 输入从头开始重新启动
    ctx.continue_as_new(job)


def check_status(ctx, _) -> str:
    return random.choice(["healthy", "unhealthy"])


def send_alert(ctx, message: str):
    print(f'*** Alert: {message}')
const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): any {
    let duration;
    const status = yield ctx.callActivity(checkStatusActivity);
    if (status === "healthy") {
      // 在健康状态下检查频率较低
      // 设置持续时间为 1 小时
      duration = 60 * 60;
    } else {
      yield ctx.callActivity(alertActivity, "job unhealthy");
      // 在不健康状态下检查频率较高
      // 设置持续时间为 5 分钟
      duration = 5 * 60;
    }

    // 将工作流置于休眠状态,直到确定的时间
    ctx.createTimer(duration);

    // 使用更新的状态从头开始重新启动
    ctx.continueAsNew();
  };
public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState)
{
    TimeSpan nextSleepInterval;

    var status = await context.CallActivityAsync<string>("GetStatus");
    if (status == "healthy")
    {
        myEntityState.IsHealthy = true;

        // 在健康状态下检查频率较低
        nextSleepInterval = TimeSpan.FromMinutes(60);
    }
    else
    {
        if (myEntityState.IsHealthy)
        {
            myEntityState.IsHealthy = false;
            await context.CallActivityAsync("SendAlert", myEntityState);
        }

        // 在不健康状态下检查频率较高
        nextSleepInterval = TimeSpan.FromMinutes(5);
    }

    // 将工作流置于休眠状态,直到确定的时间
    await context.CreateTimer(nextSleepInterval);

    // 使用更新的状态从头开始重新启动
    context.ContinueAsNew(myEntityState);
    return null;
}

此示例假设您有一个预定义的 MyEntityState 类,其中包含一个布尔 IsHealthy 属性。

public class MonitorWorkflow extends Workflow {

  @Override
  public WorkflowStub create() {
    return ctx -> {

      Duration nextSleepInterval;

      var status = ctx.callActivity(DemoWorkflowStatusActivity.class.getName(), DemoStatusActivityOutput.class).await();
      var isHealthy = status.getIsHealthy();

      if (isHealthy) {
        // 在健康状态下检查频率较低
        nextSleepInterval = Duration.ofMinutes(60);
      } else {

        ctx.callActivity(DemoWorkflowAlertActivity.class.getName()).await();

        // 在不健康状态下检查频率较高
        nextSleepInterval = Duration.ofMinutes(5);
      }

      // 将工作流置于休眠状态,直到确定的时间
      try {
        ctx.createTimer(nextSleepInterval);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }

      // 使用更新的状态从头开始重新启动
      ctx.continueAsNew();
    }
  }
}
type JobStatus struct {
	JobID     string `json:"job_id"`
	IsHealthy bool   `json:"is_healthy"`
}
func StatusMonitorWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var sleepInterval time.Duration
	var job JobStatus
	if err := ctx.GetInput(&job); err != nil {
		return "", err
	}
	var status string
	if err := ctx.CallActivity(CheckStatus, workflow.ActivityInput(job)).Await(&status); err != nil {
		return "", err
	}
	if status == "healthy" {
		job.IsHealthy = true
		sleepInterval = time.Minutes * 60
	} else {
		if job.IsHealthy {
			job.IsHealthy = false
			err := ctx.CallActivity(SendAlert, workflow.ActivityInput(fmt.Sprintf("Job '%s' is unhealthy!", job.JobID))).Await(nil)
			if err != nil {
				return "", err
			}
		}
		sleepInterval = time.Minutes * 5
	}
	if err := ctx.CreateTimer(sleepInterval).Await(nil); err != nil {
		return "", err
	}
	ctx.ContinueAsNew(job, false)
	return "", nil
}
func CheckStatus(ctx workflow.ActivityContext) (any, error) {
	statuses := []string{"healthy", "unhealthy"}
	return statuses[rand.Intn(1)], nil
}
func SendAlert(ctx workflow.ActivityContext) (any, error) {
	var message string
	if err := ctx.GetInput(&message); err != nil {
		return "", err
	}
	fmt.Printf("*** Alert: %s", message)
	return "", nil
}

实现监控模式的工作流可以永远循环,也可以通过不调用 continue-as-new 来优雅地终止自身。

外部系统交互

在某些情况下,工作流可能需要暂停并等待外部系统执行某些操作。例如,工作流可能需要暂停并等待接收到付款。在这种情况下,支付系统可能会在收到付款时将事件发布到 pub/sub 主题,并且该主题上的侦听器可以使用触发事件工作流 API向工作流触发事件。

另一个非常常见的场景是工作流需要暂停并等待人类,例如在批准采购订单时。Dapr 工作流通过外部事件功能支持此事件模式。

以下是涉及人类的采购订单工作流示例:

  1. 收到采购订单时触发工作流。
  2. 工作流中的规则确定需要人类执行某些操作。例如,采购订单成本超过某个自动批准阈值。
  3. 工作流发送请求人类操作的通知。例如,它向指定的审批人发送带有批准链接的电子邮件。
  4. 工作流暂停并等待人类通过点击链接批准或拒绝订单。
  5. 如果在指定时间内未收到批准,工作流将恢复并执行某些补偿逻辑,例如取消订单。

下图说明了此流程。

显示外部系统交互模式如何与人类交互的图示

以下示例代码显示了如何使用 Dapr 工作流实现此模式。

from dataclasses import dataclass
from datetime import timedelta
import dapr.ext.workflow as wf


@dataclass
class Order:
    cost: float
    product: str
    quantity: int

    def __str__(self):
        return f'{self.product} ({self.quantity})'


@dataclass
class Approval:
    approver: str

    @staticmethod
    def from_dict(dict):
        return Approval(**dict)


def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
    # 低于 $1000 的订单自动批准
    if order.cost < 1000:
        return "Auto-approved"

    # $1000 或以上的订单需要经理批准
    yield ctx.call_activity(send_approval_request, input=order)

    # 必须在 24 小时内收到批准,否则将被取消。
    approval_event = ctx.wait_for_external_event("approval_received")
    timeout_event = ctx.create_timer(timedelta(hours=24))
    winner = yield wf.when_any([approval_event, timeout_event])
    if winner == timeout_event:
        return "Cancelled"

    # 订单已获批准
    yield ctx.call_activity(place_order, input=order)
    approval_details = Approval.from_dict(approval_event.get_result())
    return f"Approved by '{approval_details.approver}'"


def send_approval_request(_, order: Order) -> None:
    print(f'*** 发送审批请求: {order}')


def place_order(_, order: Order) -> None:
    print(f'*** 下订单: {order}')
import {
  Task,
  DaprWorkflowClient,
  WorkflowActivityContext,
  WorkflowContext,
  WorkflowRuntime,
  TWorkflow,
} from "@dapr/dapr";
import * as readlineSync from "readline-sync";

// 将整个代码包装在一个立即调用的异步函数中
async function start() {
  class Order {
    cost: number;
    product: string;
    quantity: number;
    constructor(cost: number, product: string, quantity: number) {
      this.cost = cost;
      this.product = product;
      this.quantity = quantity;
    }
  }

  function sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  // 更新 gRPC 客户端和工作者以使用本地地址和端口
  const daprHost = "localhost";
  const daprPort = "50001";
  const workflowClient = new DaprWorkflowClient({
    daprHost,
    daprPort,
  });
  const workflowRuntime = new WorkflowRuntime({
    daprHost,
    daprPort,
  });

  // 发送审批请求给经理的活动函数
  const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
    // 模拟一些需要时间的工作
    await sleep(3000);
    console.log(`发送审批请求: ${order.product}`);
  };

  // 下订单的活动函数
  const placeOrder = async (_: WorkflowActivityContext, order: Order) => {
    console.log(`下订单: ${order.product}`);
  };

  // 表示采购订单工作流的编排函数
  const purchaseOrderWorkflow: TWorkflow = async function* (ctx: WorkflowContext, order: Order): any {
    // 低于 $1000 的订单自动批准
    if (order.cost < 1000) {
      return "Auto-approved";
    }

    // $1000 或以上的订单需要经理批准
    yield ctx.callActivity(sendApprovalRequest, order);

    // 必须在 24 小时内收到批准,否则将被取消。
    const tasks: Task<any>[] = [];
    const approvalEvent = ctx.waitForExternalEvent("approval_received");
    const timeoutEvent = ctx.createTimer(24 * 60 * 60);
    tasks.push(approvalEvent);
    tasks.push(timeoutEvent);
    const winner = ctx.whenAny(tasks);

    if (winner == timeoutEvent) {
      return "Cancelled";
    }

    yield ctx.callActivity(placeOrder, order);
    const approvalDetails = approvalEvent.getResult();
    return `Approved by ${approvalDetails.approver}`;
  };

  workflowRuntime
    .registerWorkflow(purchaseOrderWorkflow)
    .registerActivity(sendApprovalRequest)
    .registerActivity(placeOrder);

  // 将工作者启动包装在 try-catch 块中以处理启动期间的任何错误
  try {
    await workflowRuntime.start();
    console.log("工作者启动成功");
  } catch (error) {
    console.error("启动工作者时出错:", error);
  }

  // 调度新的编排
  try {
    const cost = readlineSync.questionInt("输入订单金额:");
    const approver = readlineSync.question("输入审批人:");
    const timeout = readlineSync.questionInt("输入订单超时时间(秒):");
    const order = new Order(cost, "MyProduct", 1);
    const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
    console.log(`编排已调度,ID: ${id}`);

    // 异步提示批准
    promptForApproval(approver, workflowClient, id);

    // 等待编排完成
    const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2);

    console.log(`编排完成!结果: ${state?.serializedOutput}`);
  } catch (error) {
    console.error("调度或等待编排时出错:", error);
  }

  // 停止工作者和客户端
  await workflowRuntime.stop();
  await workflowClient.stop();

  // 停止 dapr sidecar
  process.exit(0);
}

async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) {
  if (readlineSync.keyInYN("按 [Y] 批准订单... Y/是, N/否")) {
    const approvalEvent = { approver: approver };
    await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
  } else {
    return "订单被拒绝";
  }
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
    // ...(其他步骤)...

    // 需要对超过某个阈值的订单进行批准
    if (order.TotalCost > OrderApprovalThreshold)
    {
        try
        {
            // 请求人类批准此订单
            await context.CallActivityAsync(nameof(RequestApprovalActivity), order);

            // 暂停并等待人类批准订单
            ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
                eventName: "ManagerApproval",
                timeout: TimeSpan.FromDays(3));
            if (approvalResult == ApprovalResult.Rejected)
            {
                // 订单被拒绝,在此结束工作流
                return new OrderResult(Processed: false);
            }
        }
        catch (TaskCanceledException)
        {
            // 批准超时会导致自动取消订单
            return new OrderResult(Processed: false);
        }
    }

    // ...(其他步骤)...

    // 以成功结果结束工作流
    return new OrderResult(Processed: true);
}

注意 在上面的示例中,RequestApprovalActivity 是要调用的工作流活动的名称,ApprovalResult 是由工作流应用程序定义的枚举。为了简洁起见,这些定义未包含在示例代码中。

public class ExternalSystemInteractionWorkflow extends Workflow {
    @Override
    public WorkflowStub create() {
        return ctx -> {
            // ...其他步骤...
            Integer orderCost = ctx.getInput(int.class);
            // 需要对超过某个阈值的订单进行批准
            if (orderCost > ORDER_APPROVAL_THRESHOLD) {
                try {
                    // 请求人类批准此订单
                    ctx.callActivity("RequestApprovalActivity", orderCost, Void.class).await();
                    // 暂停并等待人类批准订单
                    boolean approved = ctx.waitForExternalEvent("ManagerApproval", Duration.ofDays(3), boolean.class).await();
                    if (!approved) {
                        // 订单被拒绝,在此结束工作流
                        ctx.complete("Process reject");
                    }
                } catch (TaskCanceledException e) {
                    // 批准超时会导致自动取消订单
                    ctx.complete("Process cancel");
                }
            }
            // ...其他步骤...

            // 以成功结果结束工作流
            ctx.complete("Process approved");
        };
    }
}
type Order struct {
	Cost     float64 `json:"cost"`
	Product  string  `json:"product"`
	Quantity int     `json:"quantity"`
}
type Approval struct {
	Approver string `json:"approver"`
}
func PurchaseOrderWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var order Order
	if err := ctx.GetInput(&order); err != nil {
		return "", err
	}
	// 低于 $1000 的订单自动批准
	if order.Cost < 1000 {
		return "Auto-approved", nil
	}
	// $1000 或以上的订单需要经理批准
	if err := ctx.CallActivity(SendApprovalRequest, workflow.ActivityInput(order)).Await(nil); err != nil {
		return "", err
	}
	// 必须在 24 小时内收到批准,否则将被取消
	var approval Approval
	if err := ctx.WaitForExternalEvent("approval_received", time.Hour*24).Await(&approval); err != nil {
		// 假设发生了超时 - 无论如何;一个错误。
		return "error/cancelled", err
	}
	// 订单已获批准
	if err := ctx.CallActivity(PlaceOrder, workflow.ActivityInput(order)).Await(nil); err != nil {
		return "", err
	}
	return fmt.Sprintf("Approved by %s", approval.Approver), nil
}
func SendApprovalRequest(ctx workflow.ActivityContext) (any, error) {
	var order Order
	if err := ctx.GetInput(&order); err != nil {
		return "", err
	}
	fmt.Printf("*** 发送审批请求: %v\n", order)
	return "", nil
}
func PlaceOrder(ctx workflow.ActivityContext) (any, error) {
	var order Order
	if err := ctx.GetInput(&order); err != nil {
		return "", err
	}
	fmt.Printf("*** 下订单: %v", order)
	return "", nil
}

恢复工作流执行的事件的代码在工作流之外。可以使用触发事件工作流管理 API 将工作流事件传递给等待的工作流实例,如以下示例所示:

from dapr.clients import DaprClient
from dataclasses import asdict

with DaprClient() as d:
    d.raise_workflow_event(
        instance_id=instance_id,
        workflow_component="dapr",
        event_name="approval_received",
        event_data=asdict(Approval("Jane Doe")))
import { DaprClient } from "@dapr/dapr";

  public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
    this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  }
// 向等待的工作流触发工作流事件
await daprClient.RaiseWorkflowEventAsync(
    instanceId: orderId,
    workflowComponent: "dapr",
    eventName: "ManagerApproval",
    eventData: ApprovalResult.Approved);
System.out.println("**SendExternalMessage: RestartEvent**");
client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
func raiseEvent() {
  daprClient, err := client.NewClient()
  if err != nil {
    log.Fatalf("failed to initialize the client")
  }
  err = daprClient.RaiseEventWorkflow(context.Background(), &client.RaiseEventWorkflowRequest{
    InstanceID: "instance_id",
    WorkflowComponent: "dapr",
    EventName: "approval_received",
    EventData: Approval{
      Approver: "Jane Doe",
    },
  })
  if err != nil {
    log.Fatalf("failed to raise event on workflow")
  }
  log.Println("raised an event on specified workflow")
}

外部事件不必由人类直接触发。它们也可以由其他系统触发。例如,工作流可能需要暂停并等待接收到付款。在这种情况下,支付系统可能会在收到付款时将事件发布到 pub/sub 主题,并且该主题上的侦听器可以使用触发事件工作流 API 向工作流触发事件。

下一步

工作流架构 >>

相关链接

4 - 工作流架构

Dapr 工作流引擎架构

Dapr 工作流 允许开发者使用多种编程语言的普通代码定义工作流。工作流引擎运行在 Dapr sidecar 内部,并协调作为应用程序一部分部署的工作流代码。本文描述了:

  • Dapr 工作流引擎的架构
  • 工作流引擎如何与应用程序代码交互
  • 工作流引擎如何融入整体 Dapr 架构
  • 不同的工作流后端如何与工作流引擎协作

有关如何在应用程序中编写 Dapr 工作流的更多信息,请参见 如何:编写工作流

Dapr 工作流引擎的内部支持来自于 Dapr 的 actor 运行时。下图展示了 Kubernetes 模式下的 Dapr 工作流架构:

展示 Kubernetes 模式下工作流架构如何工作的图示

要使用 Dapr 工作流构建块,您需要在应用程序中使用 Dapr 工作流 SDK 编写工作流代码,该 SDK 内部通过 gRPC 流连接到 sidecar。这会注册工作流和任何工作流活动,或工作流可以调度的任务。

引擎直接嵌入在 sidecar 中,并通过 durabletask-go 框架库实现。此框架允许您更换不同的存储提供者,包括为 Dapr 创建的存储提供者,该提供者在幕后利用内部 actor。由于 Dapr 工作流使用 actor,您可以将工作流状态存储在状态存储中。

Sidecar 交互

当工作流应用程序启动时,它使用工作流编写 SDK 向 Dapr sidecar 发送 gRPC 请求,并根据 服务器流式 RPC 模式 获取工作流工作项流。这些工作项可以是从“启动一个新的 X 工作流”(其中 X 是工作流的类型)到“调度活动 Y,输入 Z 以代表工作流 X 运行”的任何内容。

工作流应用程序执行相应的工作流代码,然后将执行结果通过 gRPC 请求发送回 sidecar。

Dapr 工作流引擎协议

所有交互都通过单个 gRPC 通道进行,并由应用程序发起,这意味着应用程序不需要打开任何入站端口。这些交互的细节由特定语言的 Dapr 工作流编写 SDK 内部处理。

工作流和 actor sidecar 交互的区别

如果您熟悉 Dapr actor,您可能会注意到工作流与 actor 的 sidecar 交互方式有一些不同。

Actor工作流
Actor 可以使用 HTTP 或 gRPC 与 sidecar 交互。工作流仅使用 gRPC。由于工作流 gRPC 协议的复杂性,实现工作流时需要一个 SDK。
Actor 操作从 sidecar 推送到应用程序代码。这需要应用程序在特定的 应用端口 上监听。对于工作流,操作是由应用程序使用流协议从 sidecar 拉取的。应用程序不需要监听任何端口即可运行工作流。
Actor 明确地向 sidecar 注册自己。工作流不向 sidecar 注册自己。嵌入的引擎不跟踪工作流类型。这一责任被委托给工作流应用程序及其 SDK。

工作流分布式追踪

工作流引擎使用 durabletask-go 核心通过 Open Telemetry SDKs 写入分布式追踪。这些追踪由 Dapr sidecar 自动捕获并导出到配置的 Open Telemetry 提供者,例如 Zipkin。

引擎管理的每个工作流实例都表示为一个或多个跨度。有一个单一的父跨度表示完整的工作流执行,以及各种任务的子跨度,包括活动任务执行和持久计时器的跨度。

工作流活动代码目前无法访问追踪上下文。

内部工作流 actor

在 Dapr sidecar 内部注册了两种类型的 actor,以支持工作流引擎:

  • dapr.internal.{namespace}.{appID}.workflow
  • dapr.internal.{namespace}.{appID}.activity

{namespace} 值是 Dapr 命名空间,如果没有配置命名空间,则默认为 default{appID} 值是应用程序的 ID。例如,如果您有一个名为 “wfapp” 的工作流应用程序,那么工作流 actor 的类型将是 dapr.internal.default.wfapp.workflow,活动 actor 的类型将是 dapr.internal.default.wfapp.activity

下图展示了在 Kubernetes 场景中内部工作流 actor 如何操作:

展示跨集群内部注册的 actor 的图示

与用户定义的 actor 一样,内部工作流 actor 由 actor 放置服务分布在集群中。它们也维护自己的状态并使用提醒。然而,与存在于应用程序代码中的 actor 不同,这些 内部 actor 嵌入在 Dapr sidecar 中。应用程序代码完全不知道这些 actor 的存在。

工作流 actor

工作流 actor 负责管理应用程序中运行的所有工作流的状态和放置。每当创建一个工作流实例时,就会激活一个新的工作流 actor 实例。工作流 actor 的 ID 是工作流的 ID。这个内部 actor 存储工作流的状态,并通过 actor 放置服务确定工作流代码执行的节点。

每个工作流 actor 使用以下键在配置的状态存储中保存其状态:

描述
inbox-NNNNNN工作流的收件箱实际上是一个驱动工作流执行的 消息 的 FIFO 队列。示例消息包括工作流创建消息、活动任务完成消息等。每条消息都存储在状态存储中的一个键中,名称为 inbox-NNNNNN,其中 NNNNNN 是一个 6 位数,表示消息的顺序。这些状态键在相应的消息被工作流消费后被移除。
history-NNNNNN工作流的历史是一个有序的事件列表,表示工作流的执行历史。历史中的每个键保存单个历史事件的数据。像一个只追加的日志一样,工作流历史事件只会被添加而不会被移除(除非工作流执行“继续为新”操作,这会清除所有历史并使用新输入重新启动工作流)。
customStatus包含用户定义的工作流状态值。每个工作流 actor 实例只有一个 customStatus 键。
metadata以 JSON blob 形式包含有关工作流的元信息,包括收件箱的长度、历史的长度以及表示工作流生成的 64 位整数(用于实例 ID 被重用的情况)。长度信息用于确定在加载或保存工作流状态更新时需要读取或写入哪些键。

下图展示了工作流 actor 的典型生命周期。

Dapr 工作流 Actor 流程图

总结:

  1. 当工作流 actor 收到新消息时被激活。
  2. 新消息触发相关的工作流代码(在您的应用程序中)运行,并将执行结果返回给工作流 actor。
  3. 一旦收到结果,actor 会根据需要调度任何任务。
  4. 调度后,actor 在状态存储中更新其状态。
  5. 最后,actor 进入空闲状态,直到收到另一条消息。在此空闲时间内,sidecar 可能决定从内存中卸载工作流 actor。

活动 actor

活动 actor 负责管理所有工作流活动调用的状态和放置。每当工作流调度一个活动任务时,就会激活一个新的活动 actor 实例。活动 actor 的 ID 是工作流的 ID 加上一个序列号(序列号从 0 开始)。例如,如果一个工作流的 ID 是 876bf371,并且是工作流调度的第三个活动,它的 ID 将是 876bf371::2,其中 2 是序列号。

每个活动 actor 将单个键存储到状态存储中:

描述
activityState键包含活动调用负载,其中包括序列化的活动输入数据。此键在活动调用完成后自动删除。

下图展示了活动 actor 的典型生命周期。

工作流活动 Actor 流程图

活动 actor 是短暂的:

  1. 当工作流 actor 调度一个活动任务时,活动 actor 被激活。
  2. 活动 actor 然后立即调用工作流应用程序以调用相关的活动代码。
  3. 一旦活动代码完成运行并返回其结果,活动 actor 将执行结果的消息发送给父工作流 actor。
  4. 一旦结果被发送,工作流被触发以继续其下一步。

提醒使用和执行保证

Dapr 工作流通过使用 actor 提醒 来确保工作流的容错性,以从瞬态系统故障中恢复。在调用应用程序工作流代码之前,工作流或活动 actor 将创建一个新的提醒。如果应用程序代码执行没有中断,提醒将被删除。然而,如果托管相关工作流或活动的节点或 sidecar 崩溃,提醒将重新激活相应的 actor 并重试执行。

展示调用工作流 actor 过程的图示

状态存储使用

Dapr 工作流在内部使用 actor 来驱动工作流的执行。像任何 actor 一样,这些内部工作流 actor 将其状态存储在配置的状态存储中。任何支持 actor 的状态存储都隐式支持 Dapr 工作流。

工作流 actor 部分所述,工作流通过追加到历史日志中增量保存其状态。工作流的历史日志分布在多个状态存储键中,以便每个“检查点”只需追加最新的条目。

每个检查点的大小由工作流在进入空闲状态之前调度的并发操作数决定。顺序工作流 因此将对状态存储进行较小的批量更新,而 扇出/扇入工作流 将需要更大的批量。批量的大小还受到工作流 调用活动子工作流 时输入和输出大小的影响。

工作流 actor 状态存储交互图示

不同的状态存储实现可能隐式对您可以编写的工作流类型施加限制。例如,Azure Cosmos DB 状态存储将项目大小限制为 2 MB 的 UTF-8 编码 JSON(来源)。活动或子工作流的输入或输出负载作为状态存储中的单个记录存储,因此 2 MB 的项目限制意味着工作流和活动的输入和输出不能超过 2 MB 的 JSON 序列化数据。

同样,如果状态存储对批量事务的大小施加限制,这可能会限制工作流可以调度的并行操作数。

工作流状态可以从状态存储中清除,包括其所有历史记录。每个 Dapr SDK 都公开用于清除特定工作流实例的所有元数据的 API。

工作流可扩展性

由于 Dapr 工作流在内部使用 actor 实现,Dapr 工作流具有与 actor 相同的可扩展性特征。放置服务:

  • 不区分工作流 actor 和您在应用程序中定义的 actor
  • 将使用与 actor 相同的算法对工作流进行负载均衡

工作流的预期可扩展性由以下因素决定:

  • 用于托管工作流应用程序的机器数量
  • 运行工作流的机器上可用的 CPU 和内存资源
  • 为 actor 配置的状态存储的可扩展性
  • actor 放置服务和提醒子系统的可扩展性

目标应用程序中工作流代码的实现细节也在个别工作流实例的可扩展性中起作用。每个工作流实例一次在单个节点上执行,但工作流可以调度在其他节点上运行的活动和子工作流。

工作流还可以调度这些活动和子工作流以并行运行,允许单个工作流可能将计算任务分布在集群中的所有可用节点上。

跨多个 Dapr 实例扩展的工作流和活动 actor 图示

工作流不控制负载在集群中的具体分布方式。例如,如果一个工作流调度 10 个活动任务并行运行,所有 10 个任务可能在多达 10 个不同的计算节点上运行,也可能在少至一个计算节点上运行。实际的扩展行为由 actor 放置服务决定,该服务管理表示工作流每个任务的 actor 的分布。

工作流后端

工作流后端负责协调和保存工作流的状态。在任何给定时间,只能支持一个后端。您可以将工作流后端配置为一个组件,类似于 Dapr 中的任何其他组件。配置要求:

  1. 指定工作流后端的类型。
  2. 提供特定于该后端的配置。

例如,以下示例演示了如何定义一个 actor 后端组件。Dapr 工作流目前默认仅支持 actor 后端,用户不需要定义 actor 后端组件即可使用它。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: actorbackend
spec:
  type: workflowbackend.actor
  version: v1

工作流延迟

为了提供关于持久性和弹性的保证,Dapr 工作流频繁地写入状态存储并依赖提醒来驱动执行。因此,Dapr 工作流可能不适合对延迟敏感的工作负载。预期的高延迟来源包括:

  • 在持久化工作流状态时来自状态存储的延迟。
  • 在使用大型历史记录重新加载工作流时来自状态存储的延迟。
  • 集群中过多活动提醒导致的延迟。
  • 集群中高 CPU 使用率导致的延迟。

有关工作流 actor 设计如何影响执行延迟的更多详细信息,请参见 提醒使用和执行保证部分

下一步

编写工作流 >>

相关链接

5 - 如何:编写一个工作流

学习如何使用Dapr工作流引擎开发和编写工作流

本文提供了如何编写由Dapr工作流引擎执行的工作流的高级概述。

以代码形式编写工作流

Dapr工作流逻辑是通过通用编程语言实现的,这使您可以:

  • 使用您喜欢的编程语言(无需学习新的DSL或YAML模式)。
  • 访问语言的标准库。
  • 构建您自己的库和抽象。
  • 使用调试器并检查本地变量。
  • 为您的工作流编写单元测试,就像应用程序逻辑的其他部分一样。

Dapr sidecar不加载任何工作流定义。相反,sidecar仅负责驱动工作流的执行,而所有具体的工作流任务则由应用程序的一部分来处理。

编写工作流任务

工作流任务是工作流中的基本工作单元,是在业务流程中被编排的任务。

定义您希望工作流执行的工作流任务。任务是一个函数定义,可以接受输入并返回输出。以下示例创建了一个名为hello_act的任务,用于打印当前计数器的值。hello_act是一个从WorkflowActivityContext类派生的函数。

def hello_act(ctx: WorkflowActivityContext, input):
    global counter
    counter += input
    print(f'New counter value is: {counter}!', flush=True)

查看上下文中的hello_act工作流任务。

定义您希望工作流执行的工作流任务。任务被封装在实现工作流任务的WorkflowActivityContext类中。

export default class WorkflowActivityContext {
  private readonly _innerContext: ActivityContext;
  constructor(innerContext: ActivityContext) {
    if (!innerContext) {
      throw new Error("ActivityContext cannot be undefined");
    }
    this._innerContext = innerContext;
  }

  public getWorkflowInstanceId(): string {
    return this._innerContext.orchestrationId;
  }

  public getWorkflowActivityId(): number {
    return this._innerContext.taskId;
  }
}

查看上下文中的工作流任务。

定义您希望工作流执行的工作流任务。任务是一个类定义,可以接受输入并返回输出。任务还可以通过依赖注入与Dapr客户端进行交互。

以下示例中调用的任务是:

  • NotifyActivity:接收新订单的通知。
  • ReserveInventoryActivity:检查是否有足够的库存来满足新订单。
  • ProcessPaymentActivity:处理订单的付款。包括NotifyActivity以发送成功订单的通知。

NotifyActivity

public class NotifyActivity : WorkflowActivity<Notification, object>
{
    //...

    public NotifyActivity(ILoggerFactory loggerFactory)
    {
        this.logger = loggerFactory.CreateLogger<NotifyActivity>();
    }

    //...
}

查看完整的NotifyActivity.cs工作流任务示例。

ReserveInventoryActivity

public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
    //...

    public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
    {
        this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
        this.client = client;
    }

    //...

}

查看完整的ReserveInventoryActivity.cs工作流任务示例。

ProcessPaymentActivity

public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
    //...
    public ProcessPaymentActivity(ILoggerFactory loggerFactory)
    {
        this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
    }

    //...

}

查看完整的ProcessPaymentActivity.cs工作流任务示例。

定义您希望工作流执行的工作流任务。任务被封装在实现工作流任务的公共DemoWorkflowActivity类中。

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class DemoWorkflowActivity implements WorkflowActivity {

  @Override
  public DemoActivityOutput run(WorkflowActivityContext ctx) {
    Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
    logger.info("Starting Activity: " + ctx.getName());

    var message = ctx.getInput(DemoActivityInput.class).getMessage();
    var newMessage = message + " World!, from Activity";
    logger.info("Message Received from input: " + message);
    logger.info("Sending message to output: " + newMessage);

    logger.info("Sleeping for 5 seconds to simulate long running operation...");

    try {
      TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }

    logger.info("Activity finished");

    var output = new DemoActivityOutput(message, newMessage);
    logger.info("Activity returned: " + output);

    return output;
  }
}

查看上下文中的Java SDK工作流任务示例。

定义您希望工作流执行的每个工作流任务。任务输入可以通过ctx.GetInput从上下文中解组。任务应定义为接受ctx workflow.ActivityContext参数并返回接口和错误。

func TestActivity(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	
	// Do something here
	return "result", nil
}

查看上下文中的Go SDK工作流任务示例。

编写工作流

接下来,在工作流中注册并调用任务。

hello_world_wf函数是从一个名为DaprWorkflowContext的类派生的,具有输入和输出参数类型。它还包括一个yield语句,该语句完成工作流的繁重工作并调用工作流任务。

def hello_world_wf(ctx: DaprWorkflowContext, input):
    print(f'{input}')
    yield ctx.call_activity(hello_act, input=1)
    yield ctx.call_activity(hello_act, input=10)
    yield ctx.wait_for_external_event("event1")
    yield ctx.call_activity(hello_act, input=100)
    yield ctx.call_activity(hello_act, input=1000)

查看上下文中的hello_world_wf工作流。

接下来,使用WorkflowRuntime类注册工作流并启动工作流运行时。

export default class WorkflowRuntime {

  //..
  // Register workflow implementation for handling orchestrations
  public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
    const name = getFunctionName(workflow);
    const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
      const workflowContext = new WorkflowContext(ctx);
      return workflow(workflowContext, input);
    };
    this.worker.addNamedOrchestrator(name, workflowWrapper);
    return this;
  }

  // Register workflow activities
  public registerActivity(fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
    const name = getFunctionName(fn);
    const activityWrapper = (ctx: ActivityContext, intput: TInput): TOutput => {
      const wfActivityContext = new WorkflowActivityContext(ctx);
      return fn(wfActivityContext, intput);
    };
    this.worker.addNamedActivity(name, activityWrapper);
    return this;
  }

  // Start the workflow runtime processing items and block.
  public async start() {
    await this.worker.start();
  }

}

查看上下文中的WorkflowRuntime

OrderProcessingWorkflow类是从一个名为Workflow的基类派生的,具有输入和输出参数类型。它还包括一个RunAsync方法,该方法完成工作流的繁重工作并调用工作流任务。

 class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
    {
        public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
        {
            //...

            await context.CallActivityAsync(
                nameof(NotifyActivity),
                new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));

            //...

            InventoryResult result = await context.CallActivityAsync<InventoryResult>(
                nameof(ReserveInventoryActivity),
                new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
            //...
            
            await context.CallActivityAsync(
                nameof(ProcessPaymentActivity),
                new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));

            await context.CallActivityAsync(
                nameof(NotifyActivity),
                new Notification($"Order {orderId} processed successfully!"));

            // End the workflow with a success result
            return new OrderResult(Processed: true);
        }
    }

查看OrderProcessingWorkflow.cs中的完整工作流示例。

接下来,使用WorkflowRuntimeBuilder注册工作流并启动工作流运行时。

public class DemoWorkflowWorker {

  public static void main(String[] args) throws Exception {

    // Register the Workflow with the builder.
    WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
    builder.registerActivity(DemoWorkflowActivity.class);

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      runtime.start();
    }

    System.exit(0);
  }
}

查看上下文中的Java SDK工作流。

定义您的工作流函数,参数为ctx *workflow.WorkflowContext,返回任何和错误。从您的工作流中调用您定义的任务。

func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return nil, err
	}
	var output string
	if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
		return nil, err
	}
	if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
		return nil, err
	}
	
	if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
		return nil, nil
	}
	return output, nil
}

查看上下文中的Go SDK工作流。

编写应用程序

最后,使用工作流编写应用程序。

在以下示例中,对于使用Python SDK的基本Python hello world应用程序,您的项目代码将包括:

  • 一个名为DaprClient的Python包,用于接收Python SDK功能。
  • 一个带有扩展的构建器,称为:
  • API调用。在下面的示例中,这些调用启动、暂停、恢复、清除和终止工作流。
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

# ...

def main():
    with DaprClient() as d:
        host = settings.DAPR_RUNTIME_HOST
        port = settings.DAPR_GRPC_PORT
        workflowRuntime = WorkflowRuntime(host, port)
        workflowRuntime = WorkflowRuntime()
        workflowRuntime.register_workflow(hello_world_wf)
        workflowRuntime.register_activity(hello_act)
        workflowRuntime.start()

        # Start workflow
        print("==========Start Counter Increase as per Input:==========")
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # ...

        # Pause workflow
        d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")

        # Resume workflow
        d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
        
        sleep(1)
        # Raise workflow
        d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

        sleep(5)
        # Purge workflow
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("Instance Successfully Purged")

        # Kick off another workflow for termination purposes 
        start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
        print(f"start_resp {start_resp.instance_id}")

        # Terminate workflow
        d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        sleep(1)
        getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")

        # Purge workflow
        d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        try:
            getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
        except DaprInternalError as err:
            if nonExistentIDError in err._message:
                print("Instance Successfully Purged")

        workflowRuntime.shutdown()

if __name__ == '__main__':
    main()

以下示例是一个使用JavaScript SDK的基本JavaScript应用程序。在此示例中,您的项目代码将包括:

  • 一个带有扩展的构建器,称为:
  • API调用。在下面的示例中,这些调用启动、终止、获取状态、暂停、恢复、引发事件和清除工作流。
import { TaskHubGrpcClient } from "@microsoft/durabletask-js";
import { WorkflowState } from "./WorkflowState";
import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index";
import { TWorkflow } from "../../types/workflow/Workflow.type";
import { getFunctionName } from "../internal";
import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";

/** DaprWorkflowClient类定义了管理工作流实例的客户端操作。 */

export default class DaprWorkflowClient {
  private readonly _innerClient: TaskHubGrpcClient;

  /** 初始化DaprWorkflowClient的新实例。
   */
  constructor(options: Partial<WorkflowClientOptions> = {}) {
    const grpcEndpoint = generateEndpoint(options);
    options.daprApiToken = getDaprApiToken(options);
    this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
  }

  private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
    let innerOptions = options?.grpcOptions;
    if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
      innerOptions = {
        ...innerOptions,
        interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
      };
    }
    return new TaskHubGrpcClient(hostAddress, innerOptions);
  }

  /**
   * 使用DurableTask客户端调度新的工作流。
   */
  public async scheduleNewWorkflow(
    workflow: TWorkflow | string,
    input?: any,
    instanceId?: string,
    startAt?: Date,
  ): Promise<string> {
    if (typeof workflow === "string") {
      return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt);
    }
    return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt);
  }

  /**
   * 终止与提供的实例ID关联的工作流。
   *
   * @param {string} workflowInstanceId - 要终止的工作流实例ID。
   * @param {any} output - 为终止的工作流实例设置的可选输出。
   */
  public async terminateWorkflow(workflowInstanceId: string, output: any) {
    await this._innerClient.terminateOrchestration(workflowInstanceId, output);
  }

  /**
   * 从配置的持久存储中获取工作流实例元数据。
   */
  public async getWorkflowState(
    workflowInstanceId: string,
    getInputsAndOutputs: boolean,
  ): Promise<WorkflowState | undefined> {
    const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs);
    if (state !== undefined) {
      return new WorkflowState(state);
    }
  }

  /**
   * 等待工作流开始运行
   */
  public async waitForWorkflowStart(
    workflowInstanceId: string,
    fetchPayloads = true,
    timeoutInSeconds = 60,
  ): Promise<WorkflowState | undefined> {
    const state = await this._innerClient.waitForOrchestrationStart(
      workflowInstanceId,
      fetchPayloads,
      timeoutInSeconds,
    );
    if (state !== undefined) {
      return new WorkflowState(state);
    }
  }

  /**
   * 等待工作流完成运行
   */
  public async waitForWorkflowCompletion(
    workflowInstanceId: string,
    fetchPayloads = true,
    timeoutInSeconds = 60,
  ): Promise<WorkflowState | undefined> {
    const state = await this._innerClient.waitForOrchestrationCompletion(
      workflowInstanceId,
      fetchPayloads,
      timeoutInSeconds,
    );
    if (state != undefined) {
      return new WorkflowState(state);
    }
  }

  /**
   * 向等待的工作流实例发送事件通知消息
   */
  public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
    this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  }

  /**
   * 从工作流状态存储中清除工作流实例状态。
   */
  public async purgeWorkflow(workflowInstanceId: string): Promise<boolean> {
    const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId);
    if (purgeResult !== undefined) {
      return purgeResult.deletedInstanceCount > 0;
    }
    return false;
  }

  /**
   * 关闭内部DurableTask客户端并关闭GRPC通道。
   */
  public async stop() {
    await this._innerClient.stop();
  }
}

在以下Program.cs示例中,对于使用.NET SDK的基本ASP.NET订单处理应用程序,您的项目代码将包括:

  • 一个名为Dapr.Workflow的NuGet包,用于接收.NET SDK功能
  • 一个带有扩展方法的构建器,称为AddDaprWorkflow
    • 这将允许您注册工作流和工作流任务(工作流可以调度的任务)
  • HTTP API调用
    • 一个用于提交新订单
    • 一个用于检查现有订单的状态
using Dapr.Workflow;
//...

// Dapr工作流作为服务配置的一部分注册
builder.Services.AddDaprWorkflow(options =>
{
    // 请注意,也可以将lambda函数注册为工作流或任务实现,而不是类。
    options.RegisterWorkflow<OrderProcessingWorkflow>();

    // 这些是由工作流调用的任务。
    options.RegisterActivity<NotifyActivity>();
    options.RegisterActivity<ReserveInventoryActivity>();
    options.RegisterActivity<ProcessPaymentActivity>();
});

WebApplication app = builder.Build();

// POST启动新的订单工作流实例
app.MapPost("/orders", async (DaprWorkflowClient client, [FromBody] OrderPayload orderInfo) =>
{
    if (orderInfo?.Name == null)
    {
        return Results.BadRequest(new
        {
            message = "Order data was missing from the request",
            example = new OrderPayload("Paperclips", 99.95),
        });
    }

//...
});

// GET获取订单工作流的状态以报告状态
app.MapGet("/orders/{orderId}", async (string orderId, DaprWorkflowClient client) =>
{
    WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
    if (!state.Exists)
    {
        return Results.NotFound($"No order with ID = '{orderId}' was found.");
    }

    var httpResponsePayload = new
    {
        details = state.ReadInputAs<OrderPayload>(),
        status = state.RuntimeStatus.ToString(),
        result = state.ReadOutputAs<OrderResult>(),
    };

//...
}).WithName("GetOrderInfoEndpoint");

app.Run();

如以下示例所示,使用Java SDK和Dapr工作流的hello-world应用程序将包括:

  • 一个名为io.dapr.workflows.client的Java包,用于接收Java SDK客户端功能。
  • 导入io.dapr.workflows.Workflow
  • 扩展WorkflowDemoWorkflow
  • 使用输入和输出创建工作流。
  • API调用。在下面的示例中,这些调用启动并调用工作流任务。
package io.dapr.examples.workflows;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

/**
 * DemoWorkflow的服务器端实现。
 */
public class DemoWorkflow extends Workflow {
  @Override
  public WorkflowStub create() {
    return ctx -> {
      ctx.getLogger().info("Starting Workflow: " + ctx.getName());
      // ...
      ctx.getLogger().info("Calling Activity...");
      var input = new DemoActivityInput("Hello Activity!");
      var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
      // ...
    };
  }
}

查看上下文中的完整Java SDK工作流示例。

如以下示例所示,使用Go SDK和Dapr工作流的hello-world应用程序将包括:

  • 一个名为client的Go包,用于接收Go SDK客户端功能。
  • TestWorkflow方法
  • 使用输入和输出创建工作流。
  • API调用。在下面的示例中,这些调用启动并调用工作流任务。
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/dapr/go-sdk/client"
	"github.com/dapr/go-sdk/workflow"
)

var stage = 0

const (
	workflowComponent = "dapr"
)

func main() {
	w, err := workflow.NewWorker()
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Worker initialized")

	if err := w.RegisterWorkflow(TestWorkflow); err != nil {
		log.Fatal(err)
	}
	fmt.Println("TestWorkflow registered")

	if err := w.RegisterActivity(TestActivity); err != nil {
		log.Fatal(err)
	}
	fmt.Println("TestActivity registered")

	// Start workflow runner
	if err := w.Start(); err != nil {
		log.Fatal(err)
	}
	fmt.Println("runner started")

	daprClient, err := client.NewClient()
	if err != nil {
		log.Fatalf("failed to intialise client: %v", err)
	}
	defer daprClient.Close()
	ctx := context.Background()

	// Start workflow test
	respStart, err := daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
		WorkflowName:      "TestWorkflow",
		Options:           nil,
		Input:             1,
		SendRawInput:      false,
	})
	if err != nil {
		log.Fatalf("failed to start workflow: %v", err)
	}
	fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)

	// Pause workflow test
	err = daprClient.PauseWorkflow(ctx, &client.PauseWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})

	if err != nil {
		log.Fatalf("failed to pause workflow: %v", err)
	}

	respGet, err := daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}

	if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
		log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
	}

	fmt.Printf("workflow paused\n")

	// Resume workflow test
	err = daprClient.ResumeWorkflow(ctx, &client.ResumeWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})

	if err != nil {
		log.Fatalf("failed to resume workflow: %v", err)
	}

	respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}

	if respGet.RuntimeStatus != workflow.StatusRunning.String() {
		log.Fatalf("workflow not running")
	}

	fmt.Println("workflow resumed")

	fmt.Printf("stage: %d\n", stage)

	// Raise Event Test

	err = daprClient.RaiseEventWorkflow(ctx, &client.RaiseEventWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
		EventName:         "testEvent",
		EventData:         "testData",
		SendRawData:       false,
	})

	if err != nil {
		fmt.Printf("failed to raise event: %v", err)
	}

	fmt.Println("workflow event raised")

	time.Sleep(time.Second) // allow workflow to advance

	fmt.Printf("stage: %d\n", stage)

	respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}

	fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)

	// Purge workflow test
	err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil {
		log.Fatalf("failed to purge workflow: %v", err)
	}

	respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil && respGet != nil {
		log.Fatal("failed to purge workflow")
	}

	fmt.Println("workflow purged")

	fmt.Printf("stage: %d\n", stage)

	// Terminate workflow test
	respStart, err = daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
		WorkflowName:      "TestWorkflow",
		Options:           nil,
		Input:             1,
		SendRawInput:      false,
	})
	if err != nil {
		log.Fatalf("failed to start workflow: %v", err)
	}

	fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)

	err = daprClient.TerminateWorkflow(ctx, &client.TerminateWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil {
		log.Fatalf("failed to terminate workflow: %v", err)
	}

	respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}
	if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
		log.Fatal("failed to terminate workflow")
	}

	fmt.Println("workflow terminated")

	err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})

	respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
		InstanceID:        "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
		WorkflowComponent: workflowComponent,
	})
	if err == nil || respGet != nil {
		log.Fatalf("failed to purge workflow: %v", err)
	}

	fmt.Println("workflow purged")

	stage = 0
	fmt.Println("workflow client test")

	wfClient, err := workflow.NewClient()
	if err != nil {
		log.Fatalf("[wfclient] faield to initialize: %v", err)
	}

	id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
	if err != nil {
		log.Fatalf("[wfclient] failed to start workflow: %v", err)
	}

	fmt.Printf("[wfclient] started workflow with id: %s\n", id)

	metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
	if err != nil {
		log.Fatalf("[wfclient] failed to get worfklow: %v", err)
	}

	fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())

	if stage != 1 {
		log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
	}

	fmt.Printf("[wfclient] stage: %d\n", stage)

	// raise event

	if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
		log.Fatalf("[wfclient] failed to raise event: %v", err)
	}

	fmt.Println("[wfclient] event raised")

	// Sleep to allow the workflow to advance
	time.Sleep(time.Second)

	if stage != 2 {
		log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
	}

	fmt.Printf("[wfclient] stage: %d\n", stage)

	// stop workflow
	if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
		log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
	}

	fmt.Println("[wfclient] workflow terminated")

	if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
		log.Fatalf("[wfclient] failed to purge workflow: %v", err)
	}

	fmt.Println("[wfclient] workflow purged")

	// stop workflow runtime
	if err := w.Shutdown(); err != nil {
		log.Fatalf("failed to shutdown runtime: %v", err)
	}

	fmt.Println("workflow worker successfully shutdown")
}

func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return nil, err
	}
	var output string
	if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
		return nil, err
	}

	err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
	if err != nil {
		return nil, err
	}

	if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
		return nil, err
	}

	return output, nil
}

func TestActivity(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}

	stage += input

	return fmt.Sprintf("Stage: %d", stage), nil
}

查看上下文中的完整Go SDK工作流示例。

下一步

现在您已经编写了一个工作流,学习如何管理它。

管理工作流 >>

相关链接

6 - 如何:管理工作流

管理和运行工作流

现在您已经在应用程序中编写了工作流及其活动,您可以使用HTTP API调用来启动、终止和获取工作流的信息。有关更多信息,请阅读工作流API参考

在代码中管理您的工作流。在编写工作流指南中的工作流示例中,工作流通过以下API在代码中注册:

  • start_workflow: 启动工作流的一个实例
  • get_workflow: 获取工作流状态的信息
  • pause_workflow: 暂停或挂起一个工作流实例,稍后可以恢复
  • resume_workflow: 恢复一个暂停的工作流实例
  • raise_workflow_event: 在工作流上触发一个事件
  • purge_workflow: 删除与特定工作流实例相关的所有元数据
  • terminate_workflow: 终止或停止特定的工作流实例
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

# 合适的参数
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

# 启动工作流
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
                        workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)

# 获取工作流信息
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# 暂停工作流
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# 恢复工作流
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# 在工作流上触发一个事件
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
                    event_name=eventName, event_data=eventData)

# 清除工作流
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)

# 终止工作流
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)

在代码中管理您的工作流。在编写工作流指南中的工作流示例中,工作流通过以下API在代码中注册:

  • client.workflow.start: 启动工作流的一个实例
  • client.workflow.get: 获取工作流状态的信息
  • client.workflow.pause: 暂停或挂起一个工作流实例,稍后可以恢复
  • client.workflow.resume: 恢复一个暂停的工作流实例
  • client.workflow.purge: 删除与特定工作流实例相关的所有元数据
  • client.workflow.terminate: 终止或停止特定的工作流实例
import { DaprClient } from "@dapr/dapr";

async function printWorkflowStatus(client: DaprClient, instanceId: string) {
  const workflow = await client.workflow.get(instanceId);
  console.log(
    `工作流 ${workflow.workflowName}, 创建于 ${workflow.createdAt.toUTCString()}, 状态为 ${
      workflow.runtimeStatus
    }`,
  );
  console.log(`附加属性: ${JSON.stringify(workflow.properties)}`);
  console.log("--------------------------------------------------\n\n");
}

async function start() {
  const client = new DaprClient();

  // 启动一个新的工作流实例
  const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
    Name: "Paperclips",
    TotalCost: 99.95,
    Quantity: 4,
  });
  console.log(`已启动工作流实例 ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 暂停一个工作流实例
  await client.workflow.pause(instanceId);
  console.log(`已暂停工作流实例 ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 恢复一个工作流实例
  await client.workflow.resume(instanceId);
  console.log(`已恢复工作流实例 ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 终止一个工作流实例
  await client.workflow.terminate(instanceId);
  console.log(`已终止工作流实例 ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 等待工作流完成,30秒!
  await new Promise((resolve) => setTimeout(resolve, 30000));
  await printWorkflowStatus(client, instanceId);

  // 清除一个工作流实例
  await client.workflow.purge(instanceId);
  console.log(`已清除工作流实例 ${instanceId}`);
  // 这将抛出一个错误,因为工作流实例不再存在。
  await printWorkflowStatus(client, instanceId);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});

在代码中管理您的工作流。在编写工作流指南中的OrderProcessingWorkflow示例中,工作流在代码中注册。您现在可以启动、终止并获取正在运行的工作流的信息:

string orderId = "exampleOrderId";
string workflowComponent = "dapr";
string workflowName = "OrderProcessingWorkflow";
OrderPayload input = new OrderPayload("Paperclips", 99.95);
Dictionary<string, string> workflowOptions; // 这是一个可选参数

// 启动工作流。这将返回一个"StartWorkflowResponse",其中包含特定工作流实例的实例ID。
StartWorkflowResponse startResponse = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, input, workflowOptions);

// 获取工作流的信息。此响应包含工作流的状态、启动时间等信息!
GetWorkflowResponse getResponse = await daprClient.GetWorkflowAsync(orderId, workflowComponent, eventName);

// 终止工作流
await daprClient.TerminateWorkflowAsync(orderId, workflowComponent);

// 触发一个事件(一个传入的采购订单),您的工作流将等待此事件。这将返回等待购买的项目。
await daprClient.RaiseWorkflowEventAsync(orderId, workflowComponent, workflowName, input);

// 暂停
await daprClient.PauseWorkflowAsync(orderId, workflowComponent);

// 恢复
await daprClient.ResumeWorkflowAsync(orderId, workflowComponent);

// 清除工作流,删除与关联实例的所有收件箱和历史信息
await daprClient.PurgeWorkflowAsync(orderId, workflowComponent);

在代码中管理您的工作流。在Java SDK中的工作流示例中,工作流通过以下API在代码中注册:

  • scheduleNewWorkflow: 启动一个新的工作流实例
  • getInstanceState: 获取工作流状态的信息
  • waitForInstanceStart: 暂停或挂起一个工作流实例,稍后可以恢复
  • raiseEvent: 为正在运行的工作流实例触发事件/任务
  • waitForInstanceCompletion: 等待工作流完成其任务
  • purgeInstance: 删除与特定工作流实例相关的所有元数据
  • terminateWorkflow: 终止工作流
  • purgeInstance: 删除与特定工作流相关的所有元数据
package io.dapr.examples.workflows;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;

// ...
public class DemoWorkflowClient {

  // ...
  public static void main(String[] args) throws InterruptedException {
    DaprWorkflowClient client = new DaprWorkflowClient();

    try (client) {
      // 启动工作流
      String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
      
      // 获取工作流的状态信息
      WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);

      // 等待或暂停工作流实例启动
      try {
        WorkflowInstanceStatus waitForInstanceStartResult =
            client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
      }

      // 为工作流触发一个事件;您可以并行触发多个事件
      client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
      client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
      client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
      client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");

      // 等待工作流完成任务
      try {
        WorkflowInstanceStatus waitForInstanceCompletionResult =
            client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
      } 

      // 清除工作流实例,删除与其相关的所有元数据
      boolean purgeResult = client.purgeInstance(instanceId);

      // 终止工作流实例
      client.terminateWorkflow(instanceToTerminateId, null);

    System.exit(0);
  }
}

在代码中管理您的工作流。在Go SDK中的工作流示例中,工作流通过以下API在代码中注册:

  • StartWorkflow: 启动一个新的工作流实例
  • GetWorkflow: 获取工作流状态的信息
  • PauseWorkflow: 暂停或挂起一个工作流实例,稍后可以恢复
  • RaiseEventWorkflow: 为正在运行的工作流实例触发事件/任务
  • ResumeWorkflow: 等待工作流完成其任务
  • PurgeWorkflow: 删除与特定工作流实例相关的所有元数据
  • TerminateWorkflow: 终止工作流
// 启动工作流
type StartWorkflowRequest struct {
	InstanceID        string // 可选实例标识符
	WorkflowComponent string
	WorkflowName      string
	Options           map[string]string // 可选元数据
	Input             any               // 可选输入
	SendRawInput      bool              // 设置为True以禁用输入的序列化
}

type StartWorkflowResponse struct {
	InstanceID string
}

// 获取工作流状态
type GetWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

type GetWorkflowResponse struct {
	InstanceID    string
	WorkflowName  string
	CreatedAt     time.Time
	LastUpdatedAt time.Time
	RuntimeStatus string
	Properties    map[string]string
}

// 清除工作流
type PurgeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 终止工作流
type TerminateWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 暂停工作流
type PauseWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 恢复工作流
type ResumeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 为正在运行的工作流触发一个事件
type RaiseEventWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
	EventName         string
	EventData         any
	SendRawData       bool // 设置为True以禁用数据的序列化
}

使用HTTP调用管理您的工作流。下面的示例将编写工作流示例中的属性与一个随机实例ID号结合使用。

启动工作流

要使用ID 12345678启动您的工作流,请运行:

POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678

请注意,工作流实例ID只能包含字母数字字符、下划线和破折号。

终止工作流

要使用ID 12345678终止您的工作流,请运行:

POST http://localhost:3500/v1.0/workflows/dapr/12345678/terminate

触发一个事件

对于支持订阅外部事件的工作流组件,例如Dapr工作流引擎,您可以使用以下“触发事件”API将命名事件传递给特定的工作流实例。

POST http://localhost:3500/v1.0/workflows/<workflowComponentName>/<instanceID>/raiseEvent/<eventName>

eventName可以是任何自定义的事件名称。

暂停或恢复工作流

为了计划停机时间、等待输入等,您可以暂停然后恢复工作流。要暂停ID为12345678的工作流,直到触发恢复,请运行:

POST http://localhost:3500/v1.0/workflows/dapr/12345678/pause

要恢复ID为12345678的工作流,请运行:

POST http://localhost:3500/v1.0/workflows/dapr/12345678/resume

清除工作流

清除API可用于从底层状态存储中永久删除工作流元数据,包括任何存储的输入、输出和工作流历史记录。这通常对于实施数据保留策略和释放资源很有用。

只有处于COMPLETED、FAILED或TERMINATED状态的工作流实例可以被清除。如果工作流处于其他状态,调用清除将返回错误。

POST http://localhost:3500/v1.0/workflows/dapr/12345678/purge

获取工作流信息

要获取ID为12345678的工作流信息(输出和输入),请运行:

GET http://localhost:3500/v1.0/workflows/dapr/12345678

工作流API参考指南中了解更多关于这些HTTP调用的信息。

下一步