This is the multi-page printable view of this section. Click here to print.
Dapr Workflow .NET SDK
1 - DaprWorkflowClient 使用
生命周期管理
DaprWorkflowClient
可以访问网络资源,这些资源通过 TCP 套接字与 Dapr sidecar 以及其他用于管理和操作工作流的类型进行通信。DaprWorkflowClient
实现了 IAsyncDisposable
接口,以便快速清理资源。
依赖注入
AddDaprWorkflow()
方法用于通过 ASP.NET Core 的依赖注入机制注册 Dapr 工作流服务。此方法需要一个选项委托,用于定义您希望在应用程序中注册和使用的每个工作流和活动。
注意
此方法会尝试注册一个DaprClient
实例,但仅在尚未以其他生命周期注册的情况下才有效。例如,如果之前以单例生命周期调用了 AddDaprClient()
,那么无论为工作流客户端选择何种生命周期,都会始终使用单例。DaprClient
实例用于与 Dapr sidecar 通信,如果尚未注册,则在 AddDaprWorkflow()
注册期间提供的生命周期将用于注册 DaprWorkflowClient
及其依赖项。单例注册
默认情况下,AddDaprWorkflow
方法会以单例生命周期注册 DaprWorkflowClient
和相关服务。这意味着服务只会被实例化一次。
以下是在典型的 Program.cs
文件中注册 DaprWorkflowClient
的示例:
builder.Services.AddDaprWorkflow(options => {
options.RegisterWorkflow<YourWorkflow>();
options.RegisterActivity<YourActivity>();
});
var app = builder.Build();
await app.RunAsync();
作用域注册
虽然默认的单例注册通常适用,但您可能希望指定不同的生命周期。这可以通过在 AddDaprWorkflow
中传递一个 ServiceLifetime
参数来实现。例如,您可能需要将另一个作用域服务注入到 ASP.NET Core 处理管道中,该管道需要 DaprClient
使用的上下文,如果前者服务注册为单例,则无法使用。
以下示例演示了这一点:
builder.Services.AddDaprWorkflow(options => {
options.RegisterWorkflow<YourWorkflow>();
options.RegisterActivity<YourActivity>();
}, ServiceLifecycle.Scoped);
var app = builder.Build();
await app.RunAsync();
瞬态注册
最后,Dapr 服务也可以使用瞬态生命周期注册,这意味着每次注入时都会重新初始化。这在以下示例中演示:
builder.Services.AddDaprWorkflow(options => {
options.RegisterWorkflow<YourWorkflow>();
options.RegisterActivity<YourActivity>();
}, ServiceLifecycle.Transient);
var app = builder.Build();
await app.RunAsync();
将服务注入到工作流活动中
工作流活动支持现代 C# 应用程序中常用的依赖注入。假设在启动时进行了适当的注册,任何此类类型都可以注入到工作流活动的构造函数中,并在工作流执行期间使用。这使得通过注入的 ILogger
添加日志记录或通过注入 DaprClient
或 DaprJobsClient
访问其他 Dapr 组件变得简单。
internal sealed class SquareNumberActivity : WorkflowActivity<int, int>
{
private readonly ILogger _logger;
public MyActivity(ILogger logger)
{
this._logger = logger;
}
public override Task<int> RunAsync(WorkflowActivityContext context, int input)
{
this._logger.LogInformation("Squaring the value {number}", input);
var result = input * input;
this._logger.LogInformation("Got a result of {squareResult}", result);
return Task.FromResult(result);
}
}
在工作流中使用 ILogger
由于工作流必须是确定性的,因此不能将任意服务注入其中。例如,如果您能够将标准 ILogger
注入到工作流中,并且由于错误需要重放它,日志记录的重复操作可能会导致混淆,因为这些操作实际上并没有再次发生。为了解决这个问题,工作流中提供了一种重放安全的日志记录器。它只会在工作流第一次运行时记录事件,而在重放时不会记录任何内容。
这种日志记录器可以通过工作流实例上的 WorkflowContext
中的方法获取,并可以像使用 ILogger
实例一样使用。
一个展示此功能的完整示例可以在 .NET SDK 仓库 中找到,以下是该示例的简要摘录。
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>(); //使用此方法访问日志记录器实例
logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);
//...
}
}
2 - 如何:在 .NET SDK 中编写和管理 Dapr 工作流
我们来创建一个 Dapr 工作流并通过控制台调用它。在提供的订单处理工作流示例中,控制台会提示如何进行购买和补货。在本指南中,您将:
- 部署一个 .NET 控制台应用程序 (WorkflowConsoleApp)。
- 使用 .NET 工作流 SDK 和 API 调用来启动和查询工作流实例。
在 .NET 示例项目里:
- 主要的
Program.cs
文件包含应用程序的设置,包括工作流和工作流活动的注册。 - 工作流定义位于
Workflows
目录中。 - 工作流活动定义位于
Activities
目录中。
先决条件
注意
Dapr.Workflows 在 v1.15 中支持 .NET 7 或更高版本。然而,从 Dapr v1.16 开始,仅支持 .NET 8 和 .NET 9。设置环境
克隆 .NET SDK 仓库。
git clone https://github.com/dapr/dotnet-sdk.git
从 .NET SDK 根目录,导航到 Dapr 工作流示例。
cd examples/Workflow
本地运行应用程序
要运行 Dapr 应用程序,您需要启动 .NET 程序和一个 Dapr sidecar。导航到 WorkflowConsoleApp
目录。
cd WorkflowConsoleApp
启动程序。
dotnet run
在一个新的终端中,再次导航到 WorkflowConsoleApp
目录,并在程序旁边运行 Dapr sidecar。
dapr run --app-id wfapp --dapr-grpc-port 4001 --dapr-http-port 3500
Dapr 会监听 HTTP 请求在
http://localhost:3500
和内部工作流 gRPC 请求在http://localhost:4001
。
启动工作流
要启动工作流,您有两种选择:
- 按照控制台提示的指示。
- 使用工作流 API 并直接向 Dapr 发送请求。
本指南重点介绍工作流 API 选项。
注意
- 您可以在
WorkflowConsoleApp
/demo.http
文件中找到以下命令。 - curl 请求的主体是作为工作流输入的采购订单信息。
- 命令中的 “12345678” 表示工作流的唯一标识符,可以替换为您选择的任何标识符。
运行以下命令以启动工作流。
curl -i -X POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 \
-H "Content-Type: application/json" \
-d '{"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}'
curl -i -X POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 `
-H "Content-Type: application/json" `
-d '{"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}'
如果成功,您应该会看到如下响应:
{"instanceID":"12345678"}
发送 HTTP 请求以获取已启动工作流的状态:
curl -i -X GET http://localhost:3500/v1.0/workflows/dapr/12345678
工作流设计为需要几秒钟才能完成。如果在您发出 HTTP 请求时工作流尚未完成,您将看到以下 JSON 响应(为便于阅读而格式化),工作流状态为 RUNNING
:
{
"instanceID": "12345678",
"workflowName": "OrderProcessingWorkflow",
"createdAt": "2023-05-10T00:42:03.911444105Z",
"lastUpdatedAt": "2023-05-10T00:42:06.142214153Z",
"runtimeStatus": "RUNNING",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "{\"Name\": \"Paperclips\", \"TotalCost\": 99.95, \"Quantity\": 1}"
}
}
一旦工作流完成运行,您应该会看到以下输出,表明它已达到 COMPLETED
状态:
{
"instanceID": "12345678",
"workflowName": "OrderProcessingWorkflow",
"createdAt": "2023-05-10T00:42:03.911444105Z",
"lastUpdatedAt": "2023-05-10T00:42:18.527704176Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "{\"Name\": \"Paperclips\", \"TotalCost\": 99.95, \"Quantity\": 1}",
"dapr.workflow.output": "{\"Processed\":true}"
}
}
当工作流完成时,工作流应用程序的标准输出应如下所示:
info: WorkflowConsoleApp.Activities.NotifyActivity[0]
Received order 12345678 for Paperclips at $99.95
info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
Reserving inventory: 12345678, Paperclips, 1
info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
Processing payment: 12345678, 99.95, USD
info: WorkflowConsoleApp.Activities.NotifyActivity[0]
Order 12345678 processed successfully!
如果您在本地机器上为 Dapr 配置了 Zipkin,那么您可以在 Zipkin Web UI(通常在 http://localhost:9411/zipkin/)中查看工作流跟踪跨度。
演示
观看此视频演示 .NET 工作流: