This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Dapr Jobs .NET SDK

快速上手使用 Dapr Jobs 和 Dapr .NET SDK

使用 Dapr Job 包,您可以在 .NET 应用程序中与 Dapr Job API 进行交互。通过预设的计划,您可以安排未来的操作,并可选择附带数据。

要开始使用,请查看 Dapr Jobs 指南,并参考 最佳实践文档 以获取更多指导。

1 - 如何:在 .NET SDK 中编写和管理 Dapr 任务

学习如何使用 .NET SDK 编写和管理 Dapr 任务

我们来创建一个端点,该端点将在 Dapr 任务触发时被调用,然后在同一个应用中调度该任务。我们将使用此处提供的简单示例,进行以下演示,并通过它来解释如何使用间隔或 Cron 表达式自行调度一次性或重复性任务。在本指南中,您将:

  • 部署一个 .NET Web API 应用程序 (JobsSample)
  • 利用 Dapr .NET 任务 SDK 调度任务调用并设置被触发的端点

在 .NET 示例项目中:

  • 主要的 Program.cs 文件是整个演示的核心。

前提条件

设置环境

克隆 .NET SDK 仓库

git clone https://github.com/dapr/dotnet-sdk.git

从 .NET SDK 根目录,导航到 Dapr 任务示例。

cd examples/Jobs

本地运行应用程序

要运行 Dapr 应用程序,您需要启动 .NET 程序和一个 Dapr sidecar。导航到 JobsSample 目录。

cd JobsSample

我们将运行一个命令,同时启动 Dapr sidecar 和 .NET 程序。

dapr run --app-id jobsapp --dapr-grpc-port 4001 --dapr-http-port 3500 -- dotnet run

Dapr 监听 HTTP 请求在 http://localhost:3500 和内部任务 gRPC 请求在 http://localhost:4001

使用依赖注入注册 Dapr 任务客户端

Dapr 任务 SDK 提供了一个扩展方法来简化 Dapr 任务客户端的注册。在 Program.cs 中完成依赖注入注册之前,添加以下行:

var builder = WebApplication.CreateBuilder(args);

//在这两行之间的任意位置添加
builder.Services.AddDaprJobsClient(); //这样就完成了

var app = builder.Build();

请注意,在当前的任务 API 实现中,调度任务的应用也将是接收触发通知的应用。换句话说,您不能调度一个触发器在另一个应用中运行。因此,虽然您不需要在应用中显式注册 Dapr 任务客户端来调度触发调用端点,但如果没有同一个应用以某种方式调度任务(无论是通过此 Dapr 任务 .NET SDK 还是对 sidecar 的 HTTP 调用),您的端点将永远不会被调用。

您可能希望为 Dapr 任务客户端提供一些配置选项,这些选项应在每次调用 sidecar 时存在,例如 Dapr API 令牌,或者您希望使用非标准的 HTTP 或 gRPC 端点。这可以通过使用允许配置 DaprJobsClientBuilder 实例的注册方法重载来实现:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient((_, daprJobsClientBuilder) =>
{
    daprJobsClientBuilder.UseDaprApiToken("abc123");
    daprJobsClientBuilder.UseHttpEndpoint("http://localhost:8512"); //非标准 sidecar HTTP 端点
});

var app = builder.Build();

如果您需要从其他来源检索注入的值,这些来源本身注册为依赖项,您可以使用另一个重载来将 IServiceProvider 注入到配置操作方法中。在以下示例中,我们注册了一个虚构的单例,可以从某处检索 secret,并将其传递到 AddDaprJobClient 的配置方法中,以便我们可以从其他地方检索我们的 Dapr API 令牌以在此处注册:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<SecretRetriever>();
builder.Services.AddDaprJobsClient((serviceProvider, daprJobsClientBuilder) =>
{
    var secretRetriever = serviceProvider.GetRequiredService<SecretRetriever>();
    var daprApiToken = secretRetriever.GetSecret("DaprApiToken").Value;
    daprJobsClientBuilder.UseDaprApiToken(daprApiToken);

    daprJobsClientBuilder.UseHttpEndpoint("http://localhost:8512");
});

var app = builder.Build();

使用 IConfiguration 配置 Dapr 任务客户端

可以使用注册的 IConfiguration 中的值来配置 Dapr 任务客户端,而无需显式指定每个值的重写,如前一节中使用 DaprJobsClientBuilder 所示。相反,通过填充通过依赖注入提供的 IConfigurationAddDaprJobsClient() 注册将自动使用这些值覆盖其各自的默认值。

首先在您的配置中填充值。这可以通过以下示例中的几种不同方式完成。

通过 ConfigurationBuilder 配置

应用程序设置可以在不使用配置源的情况下配置,而是通过填充 ConfigurationBuilder 实例中的值来实现:

var builder = WebApplication.CreateBuilder();

//创建配置
var configuration = new ConfigurationBuilder()
    .AddInMemoryCollection(new Dictionary<string, string> {
            { "DAPR_HTTP_ENDPOINT", "http://localhost:54321" },
            { "DAPR_API_TOKEN", "abc123" }
        })
    .Build();

builder.Configuration.AddConfiguration(configuration);
builder.Services.AddDaprJobsClient(); //这将自动从 IConfiguration 中填充 HTTP 端点和 API 令牌值

通过环境变量配置

应用程序设置可以从应用程序可用的环境变量中访问。

以下环境变量将用于填充用于注册 Dapr 任务客户端的 HTTP 端点和 API 令牌。

KeyValue
DAPR_HTTP_ENDPOINThttp://localhost:54321
DAPR_API_TOKENabc123
var builder = WebApplication.CreateBuilder();

builder.Configuration.AddEnvironmentVariables();
builder.Services.AddDaprJobsClient();

Dapr 任务客户端将被配置为使用 HTTP 端点 http://localhost:54321 并用 API 令牌头 abc123 填充所有出站请求。

通过前缀环境变量配置

然而,在共享主机场景中,多个应用程序都在同一台机器上运行而不使用容器或在开发环境中,前缀环境变量并不罕见。以下示例假设 HTTP 端点和 API 令牌都将从前缀为 “myapp_” 的环境变量中提取。在此场景中使用的两个环境变量如下:

KeyValue
myapp_DAPR_HTTP_ENDPOINThttp://localhost:54321
myapp_DAPR_API_TOKENabc123

这些环境变量将在以下示例中加载到注册的配置中,并在没有附加前缀的情况下提供。

var builder = WebApplication.CreateBuilder();

builder.Configuration.AddEnvironmentVariables(prefix: "myapp_");
builder.Services.AddDaprJobsClient();

Dapr 任务客户端将被配置为使用 HTTP 端点 http://localhost:54321 并用 API 令牌头 abc123 填充所有出站请求。

不依赖于依赖注入使用 Dapr 任务客户端

虽然使用依赖注入简化了 .NET 中复杂类型的使用,并使处理复杂配置变得更容易,但您不需要以这种方式注册 DaprJobsClient。相反,您也可以选择从 DaprJobsClientBuilder 实例创建它的实例,如下所示:


public class MySampleClass
{
    public void DoSomething()
    {
        var daprJobsClientBuilder = new DaprJobsClientBuilder();
        var daprJobsClient = daprJobsClientBuilder.Build();

        //使用 `daprJobsClient` 做一些事情
    }
}

设置一个在任务触发时被调用的端点

如果您熟悉 ASP.NET Core 中的最小 API,那么设置一个任务端点很简单,因为两者的语法相同。

一旦完成依赖注入注册,按照您在 ASP.NET Core 中使用最小 API 功能处理 HTTP 请求映射的方式配置应用程序。实现为扩展方法,传递它应该响应的任务名称和一个委托。服务可以根据需要注入到委托的参数中,您可以选择传递 JobDetails 以获取有关已触发任务的信息(例如,访问其调度设置或负载)。

这里有两个委托可以使用。一个提供 IServiceProvider 以防您需要将其他服务注入处理程序:

//我们从上面的示例中得到了这个
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient();

var app = builder.Build();

//添加我们的端点注册
app.MapDaprScheduledJob("myJob", (IServiceProvider serviceProvider, string? jobName, JobDetails? jobDetails) => {
    var logger = serviceProvider.GetService<ILogger>();
    logger?.LogInformation("Received trigger invocation for '{jobName}'", "myJob");

    //做一些事情...
});

app.Run();

如果不需要,委托的另一个重载不需要 IServiceProvider

//我们从上面的示例中得到了这个
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient();

var app = builder.Build();

//添加我们的端点注册
app.MapDaprScheduledJob("myJob", (string? jobName, JobDetails? jobDetails) => {
    //做一些事情...
});

app.Run();

注册任务

最后,我们必须注册我们想要调度的任务。请注意,从这里开始,所有 SDK 方法都支持取消令牌,并在未设置时使用默认令牌。

有三种不同的方式来设置任务,具体取决于您想要如何配置调度:

一次性任务

一次性任务就是这样;它将在某个时间点运行,并且不会重复。这种方法要求您选择一个任务名称并指定一个触发时间。

参数名称类型描述必需
jobNamestring正在调度的任务的名称。
scheduledTimeDateTime任务应运行的时间点。
payloadReadOnlyMemory触发时提供给调用端点的任务数据。
cancellationTokenCancellationToken用于提前取消操作,例如由于操作超时。

可以从 Dapr 任务客户端调度一次性任务,如以下示例所示:

public class MyOperation(DaprJobsClient daprJobsClient)
{
    public async Task ScheduleOneTimeJobAsync(CancellationToken cancellationToken)
    {
        var today = DateTime.UtcNow;
        var threeDaysFromNow = today.AddDays(3);

        await daprJobsClient.ScheduleOneTimeJobAsync("myJobName", threeDaysFromNow, cancellationToken: cancellationToken);
    }
}

基于间隔的任务

基于间隔的任务是一个在配置为固定时间量的循环中运行的任务,不像今天在 actor 构建块中工作的提醒。这些任务也可以通过许多可选参数进行调度:

参数名称类型描述必需
jobNamestring正在调度的任务的名称。
intervalTimeSpan任务应触发的间隔。
startingFromDateTime任务调度应开始的时间点。
repeatsint任务应触发的最大次数。
ttl任务何时过期且不再触发。
payloadReadOnlyMemory触发时提供给调用端点的任务数据。
cancellationTokenCancellationToken用于提前取消操作,例如由于操作超时。

可以从 Dapr 任务客户端调度基于间隔的任务,如以下示例所示:

public class MyOperation(DaprJobsClient daprJobsClient)
{

    public async Task ScheduleIntervalJobAsync(CancellationToken cancellationToken)
    {
        var hourlyInterval = TimeSpan.FromHours(1);

        //每小时触发任务,但最多触发 5 次
        await daprJobsClient.ScheduleIntervalJobAsync("myJobName", hourlyInterval, repeats: 5), cancellationToken: cancellationToken;
    }
}

基于 Cron 的任务

基于 Cron 的任务是使用 Cron 表达式调度的。这提供了更多基于日历的控制,以便在任务触发时使用日历值在表达式中。与其他选项一样,这些任务也可以通过许多可选参数进行调度:

参数名称类型描述必需
jobNamestring正在调度的任务的名称。
cronExpressionstring指示任务应触发的 systemd 类似 Cron 表达式。
startingFromDateTime任务调度应开始的时间点。
repeatsint任务应触发的最大次数。
ttl任务何时过期且不再触发。
payloadReadOnlyMemory触发时提供给调用端点的任务数据。
cancellationTokenCancellationToken用于提前取消操作,例如由于操作超时。

可以从 Dapr 任务客户端调度基于 Cron 的任务,如下所示:

public class MyOperation(DaprJobsClient daprJobsClient)
{
    public async Task ScheduleCronJobAsync(CancellationToken cancellationToken)
    {
        //在每个月的第五天的每隔一小时的顶部
        const string cronSchedule = "0 */2 5 * *";

        //直到下个月才开始
        var now = DateTime.UtcNow;
        var oneMonthFromNow = now.AddMonths(1);
        var firstOfNextMonth = new DateTime(oneMonthFromNow.Year, oneMonthFromNow.Month, 1, 0, 0, 0);

        //每小时触发任务,但最多触发 5 次
        await daprJobsClient.ScheduleCronJobAsync("myJobName", cronSchedule, dueTime: firstOfNextMonth, cancellationToken: cancellationToken);
    }
}

获取已调度任务的详细信息

如果您知道已调度任务的名称,您可以在不等待其触发的情况下检索其元数据。返回的 JobDetails 提供了一些有用的属性,用于从 Dapr 任务 API 消费信息:

  • 如果 Schedule 属性包含 Cron 表达式,则 IsCronExpression 属性将为 true,并且表达式也将在 CronExpression 属性中可用。
  • 如果 Schedule 属性包含持续时间值,则 IsIntervalExpression 属性将为 true,并且该值将转换为 TimeSpan 值,可从 Interval 属性访问。

这可以通过使用以下方法完成:

public class MyOperation(DaprJobsClient daprJobsClient)
{
    public async Task<JobDetails> GetJobDetailsAsync(string jobName, CancellationToken cancellationToken)
    {
        var jobDetails = await daprJobsClient.GetJobAsync(jobName, canecllationToken);
        return jobDetails;
    }
}

删除已调度的任务

要删除已调度的任务,您需要知道其名称。从那里开始,只需在 Dapr 任务客户端上调用 DeleteJobAsync 方法即可:

public class MyOperation(DaprJobsClient daprJobsClient)
{
    public async Task DeleteJobAsync(string jobName, CancellationToken cancellationToken)
    {
        await daprJobsClient.DeleteJobAsync(jobName, cancellationToken);
    }
}

2 - DaprJobsClient 使用指南

使用 DaprJobsClient 的基本技巧和建议

生命周期管理

DaprJobsClient 是专门用于与 Dapr Jobs API 交互的 Dapr 客户端。它可以与 DaprClient 和其他 Dapr 客户端一起注册而不会出现问题。

它通过 TCP 套接字与 Dapr sidecar 通信,并实现了 IDisposable 接口以便快速清理资源。

为了获得最佳性能,建议创建一个长生命周期的 DaprJobsClient 实例,并在整个应用程序中共享使用。DaprJobsClient 实例是线程安全的,适合在多个线程中共享。

可以通过依赖注入来实现这一点。注册方法支持以单例、作用域实例或瞬态方式注册,但也可以利用 IConfiguration 或其他注入服务中的值进行注册,这样就不需要在每个类中重新创建客户端。

避免为每个操作创建一个新的 DaprJobsClient 并在操作完成后销毁它。

通过 DaprJobsClientBuilder 配置 DaprJobsClient

可以通过 DaprJobsClientBuilder 类上的方法来配置 DaprJobsClient,然后调用 .Build() 来创建客户端。每个 DaprJobsClient 的设置是独立的,调用 .Build() 后无法更改。

var daprJobsClient = new DaprJobsClientBuilder()
    .UseDaprApiToken("abc123") // 指定用于验证到其他 Dapr sidecar 的 API 令牌
    .Build();

DaprJobsClientBuilder 包含以下设置:

  • Dapr sidecar 的 HTTP 端点
  • Dapr sidecar 的 gRPC 端点
  • 用于配置 JSON 序列化的 JsonSerializerOptions 对象
  • 用于配置 gRPC 的 GrpcChannelOptions 对象
  • 用于验证请求到 sidecar 的 API 令牌
  • 用于创建 SDK 使用的 HttpClient 实例的工厂方法
  • 用于在向 sidecar 发出请求时使用的 HttpClient 实例的超时

SDK 将读取以下环境变量来配置默认值:

  • DAPR_HTTP_ENDPOINT:用于查找 Dapr sidecar 的 HTTP 端点,例如:https://dapr-api.mycompany.com
  • DAPR_GRPC_ENDPOINT:用于查找 Dapr sidecar 的 gRPC 端点,例如:https://dapr-grpc-api.mycompany.com
  • DAPR_HTTP_PORT:如果未设置 DAPR_HTTP_ENDPOINT,则用于查找 Dapr sidecar 的本地 HTTP 端点
  • DAPR_GRPC_PORT:如果未设置 DAPR_GRPC_ENDPOINT,则用于查找 Dapr sidecar 的本地 gRPC 端点
  • DAPR_API_TOKEN:用于设置 API 令牌

配置 gRPC 通道选项

Dapr 使用 CancellationToken 进行取消依赖于 gRPC 通道选项的配置。如果您需要自行配置这些选项,请确保启用 ThrowOperationCanceledOnCancellation 设置

var daprJobsClient = new DaprJobsClientBuilder()
    .UseGrpcChannelOptions(new GrpcChannelOptions { ... ThrowOperationCanceledOnCancellation = true })
    .Build();

使用 DaprJobsClient 进行取消操作

DaprJobsClient 上的 API 执行异步操作并接受一个可选的 CancellationToken 参数。这遵循 .NET 的标准做法,用于可取消的操作。请注意,当取消发生时,不能保证远程端点停止处理请求,只能保证客户端已停止等待完成。

当操作被取消时,将抛出 OperationCancelledException

通过依赖注入配置 DaprJobsClient

使用内置的扩展方法在依赖注入容器中注册 DaprJobsClient 可以提供一次性注册长生命周期服务的好处,集中复杂配置并通过确保在可能的情况下重新利用类似的长生命周期资源(例如 HttpClient 实例)来提高性能。

有三种重载可用,以便开发人员在为其场景配置客户端时具有最大的灵活性。每个重载都会代表您注册 IHttpClientFactory(如果尚未注册),并在创建 HttpClient 实例时配置 DaprJobsClientBuilder 以尽可能多地重用相同的实例,避免套接字耗尽和其他问题。

在第一种方法中,开发人员没有进行任何配置,DaprJobsClient 使用默认设置进行配置。

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient(); //根据需要注册 `DaprJobsClient` 以进行注入
var app = builder.Build();

有时,开发人员需要使用上面详细介绍的各种配置选项来配置创建的客户端。这是通过传入 DaprJobsClientBuiler 并公开用于配置必要选项的方法的重载来完成的。

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient((_, daprJobsClientBuilder) => {
   //设置 API 令牌
   daprJobsClientBuilder.UseDaprApiToken("abc123");
   //指定非标准 HTTP 端点
   daprJobsClientBuilder.UseHttpEndpoint("http://dapr.my-company.com");
});

var app = builder.Build();

最后,开发人员可能需要从其他服务中检索信息以填充这些配置值。该值可以从 DaprClient 实例、供应商特定的 SDK 或某些本地服务中提供,但只要它也在 DI 中注册,就可以通过最后一个重载将其注入到此配置操作中:

var builder = WebApplication.CreateBuilder(args);

//注册一个虚构的服务,从某处检索 secret
builder.Services.AddSingleton<SecretService>();

builder.Services.AddDaprJobsClient((serviceProvider, daprJobsClientBuilder) => {
    //从服务提供者中检索 `SecretService` 的实例
    var secretService = serviceProvider.GetRequiredService<SecretService>();
    var daprApiToken = secretService.GetSecret("DaprApiToken").Value;

    //配置 `DaprJobsClientBuilder`
    daprJobsClientBuilder.UseDaprApiToken(daprApiToken);
});

var app = builder.Build();

理解 DaprJobsClient 上的负载序列化

虽然 DaprClient 上有许多方法可以使用 System.Text.Json 序列化器自动序列化和反序列化数据,但此 SDK 采用不同的理念。相反,相关方法接受一个可选的 ReadOnlyMemory<byte> 负载,这意味着序列化是留给开发人员的练习,通常不由 SDK 处理。

话虽如此,每种调度方法都有一些可用的辅助扩展方法。如果您知道要使用 JSON 可序列化的类型,可以使用每种调度类型的 Schedule*WithPayloadAsync 方法,该方法接受一个 object 作为负载和一个可选的 JsonSerializerOptions 用于序列化值。这将为您将值转换为 UTF-8 编码的字节,作为一种便利。以下是调度 Cron 表达式时可能的示例:

public sealed record Doodad (string Name, int Value);

//...
var doodad = new Doodad("Thing", 100);
await daprJobsClient.ScheduleCronJobWithPayloadAsync("myJob", "5 * * * *", doodad);

同样,如果您有一个普通的字符串值,可以使用相同方法的重载来序列化字符串类型的负载,JSON 序列化步骤将被跳过,只会被编码为 UTF-8 编码字节的数组。以下是调度一次性 job 时可能的示例:

var now = DateTime.UtcNow;
var oneWeekFromNow = now.AddDays(7);
await daprJobsClient.ScheduleOneTimeJobWithPayloadAsync("myOtherJob", oneWeekFromNow, "This is a test!");

JobDetails 类型将数据返回为 ReadOnlyMemory<byte>?,因此开发人员可以根据需要进行反序列化,但同样有两个包含的辅助扩展,可以将其反序列化为 JSON 兼容类型或字符串。这两种方法都假设开发人员对最初调度的 job 进行了编码(可能使用辅助序列化方法),因为这些方法不会强制字节表示它们不是的东西。

要将字节反序列化为字符串,可以使用以下辅助方法:

if (jobDetails.Payload is not null)
{
    string payloadAsString = jobDetails.Payload.DeserializeToString(); //如果成功,返回一个具有值的字符串
}

要将 JSON 编码的 UTF-8 字节反序列化为相应的类型,可以使用以下辅助方法。提供了一个重载参数,允许开发人员传入自己的 JsonSerializerOptions 以在反序列化期间应用。

public sealed record Doodad (string Name, int Value);

//...
if (jobDetails.Payload is not null)
{
    var deserializedDoodad = jobDetails.Payload.DeserializeFromJsonBytes<Doodad>();
}

错误处理

DaprJobsClient 上的方法如果在 SDK 和运行在 Dapr sidecar 上的 Jobs API 服务之间遇到问题,将抛出 DaprJobsServiceException。如果由于通过此 SDK 向 Jobs API 服务发出的请求格式不正确而遇到失败,将抛出 DaprMalformedJobException。在非法参数值的情况下,将抛出适当的标准异常(例如 ArgumentOutOfRangeExceptionArgumentNullException),并附上有问题的参数名称。对于其他任何情况,将抛出 DaprException

最常见的失败情况将与以下内容相关:

  • 在与 Jobs API 交互时参数格式不正确
  • 瞬态故障,例如网络问题
  • 无效数据,例如无法将值反序列化为其最初未从中序列化的类型

在任何这些情况下,您都可以通过 .InnerException 属性检查更多异常详细信息。