This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

任务

管理任务的调度与编排

1 - 作业概述

作业API构建模块概述

许多应用程序需要作业调度,或者需要在未来执行某些操作。作业API是一个用于管理和安排这些未来作业的工具,可以在特定时间或间隔执行。

作业API不仅帮助您安排作业,Dapr内部还利用调度服务来安排actor提醒。

在Dapr中,作业包括:

查看示例场景。

显示调度器控制平面服务和作业API的图示

工作原理

作业API是一个作业调度器,而不是作业的执行者。设计上保证作业至少执行一次,注重可靠性和可扩展性,而非精确性。这意味着:

  • 保证: 作业不会在计划时间之前被调用。
  • 不保证: 作业在到期时间之后被调用的具体时间。

所有计划作业的详细信息和用户相关数据都存储在调度器服务的Etcd数据库中。 您可以使用作业来:

  • 延迟您的pubsub消息传递 您可以在未来的特定时间发布消息(例如:一周后,或特定的UTC日期/时间)。
  • 调度应用程序之间的服务调用方法。

场景

作业调度在以下场景中可能会有所帮助:

  • 自动化数据库备份: 确保数据库每天备份以防止数据丢失。安排一个备份脚本在每晚2点运行,创建数据库备份并将其存储在安全位置。

  • 定期数据处理和ETL(提取、转换、加载): 处理和转换来自各种来源的原始数据并将其加载到数据仓库中。安排ETL作业在特定时间运行(例如:每小时、每天)以获取新数据、处理并更新数据仓库中的信息。

  • 电子邮件通知和报告: 通过电子邮件接收每日销售报告和每周性能摘要。安排一个作业生成所需的报告并在每天早上6点通过电子邮件发送每日报告,每周一早上8点发送每周摘要。

  • 维护任务和系统更新: 执行定期维护任务,如清理临时文件、更新软件和检查系统健康状况。安排各种维护脚本在非高峰时段运行,如周末或深夜,以尽量减少对用户的干扰。

  • 金融交易的批处理: 处理需要在每个工作日结束时批处理和结算的大量交易。安排批处理作业在每个工作日下午5点运行,汇总当天的交易并执行必要的结算和对账。

Dapr的作业API确保这些场景中表示的任务在没有人工干预的情况下始终如一地执行,提高效率并减少错误风险。

特性

作业API提供了多种特性,使您可以轻松调度作业。

在多个副本之间调度作业

调度器服务支持在多个副本之间扩展作业调度,同时保证作业仅由一个调度器服务实例触发。

试用作业API

您可以在应用程序中试用作业API。在Dapr安装完成后,您可以开始使用作业API,从如何:调度作业指南开始。

下一步

2 - 操作指南:调度和处理触发的作业

学习如何使用作业API来调度和处理触发的作业

现在您已经了解了作业构建块提供的功能,让我们来看一个如何使用API的示例。下面的代码示例描述了一个为数据库备份应用程序调度作业并在触发时处理它们的应用程序,也就是作业因到达其到期时间而被返回到应用程序的时间。

启动调度器服务

当您在本地托管模式或Kubernetes上运行dapr init时,Dapr调度器服务会启动。

设置作业API

在您的代码中,配置并调度应用程序内的作业。

以下.NET SDK代码示例调度名为prod-db-backup的作业。作业数据包含有关您将定期备份的数据库的信息。在本示例中,您将:

  • 定义在示例其余部分中使用的类型
  • 在应用程序启动期间注册一个端点,以处理服务上的所有作业触发调用
  • 向Dapr注册作业

在以下示例中,您将创建记录,序列化并与作业一起注册,以便在将来作业被触发时可以使用这些信息:

  • 备份任务的名称(db-backup
  • 备份任务的Metadata,包括:
    • 数据库名称(DBName
    • 数据库位置(BackupLocation

创建一个ASP.NET Core项目,并从NuGet添加最新版本的Dapr.Jobs

注意: 虽然您的项目不严格需要使用Microsoft.NET.Sdk.Web SDK来创建作业,但在撰写本文档时,只有调度作业的服务会接收到其触发调用。由于这些调用期望有一个可以处理作业触发的端点,并且需要Microsoft.NET.Sdk.Web SDK,因此建议您为此目的使用ASP.NET Core项目。

首先定义类型以持久化我们的备份作业数据,并将我们自己的JSON属性名称属性应用于属性,以便它们与其他语言示例保持一致。

//定义我们将用来表示作业数据的类型
internal sealed record BackupJobData([property: JsonPropertyName("task")] string Task, [property: JsonPropertyName("metadata")] BackupMetadata Metadata);
internal sealed record BackupMetadata([property: JsonPropertyName("DBName")]string DatabaseName, [property: JsonPropertyName("BackupLocation")] string BackupLocation);

接下来,作为应用程序设置的一部分,设置一个处理程序,该处理程序将在作业在您的应用程序上被触发时调用。此处理程序负责根据提供的作业名称识别应如何处理作业。

这通过在ASP.NET Core中注册一个处理程序来实现,路径为/job/<job-name>,其中<job-name>是参数化的,并传递给此处理程序委托,以满足Dapr期望有一个端点可用于处理触发的命名作业。

在您的Program.cs文件中填入以下内容:

using System.Text;
using System.Text.Json;
using Dapr.Jobs;
using Dapr.Jobs.Extensions;
using Dapr.Jobs.Models;
using Dapr.Jobs.Models.Responses;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprJobsClient();
var app = builder.Build();

//注册一个端点以接收和处理触发的作业
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
app.MapDaprScheduledJobHandler((string jobName, DaprJobDetails jobDetails, ILogger logger, CancellationToken cancellationToken) => {
  logger?.LogInformation("Received trigger invocation for job '{jobName}'", jobName);
  switch (jobName)
  {
    case "prod-db-backup":
      // 反序列化作业负载元数据
      var jobData = JsonSerializer.Deserialize<BackupJobData>(jobDetails.Payload);
      
      // 处理备份操作 - 我们假设这在您的代码中已实现
      await BackupDatabaseAsync(jobData, cancellationToken);
      break;
  }
}, cancellationTokenSource.Token);

await app.RunAsync();

最后,作业本身需要在Dapr中注册,以便可以在以后触发。您可以通过将DaprJobsClient注入到类中并作为应用程序的入站操作的一部分执行此操作,但为了本示例的目的,它将放在您上面开始的Program.cs文件的底部。因为您将使用依赖注入注册的DaprJobsClient,所以首先创建一个范围以便可以访问它。

//创建一个范围以便可以访问注册的DaprJobsClient
await using scope = app.Services.CreateAsyncScope();
var daprJobsClient = scope.ServiceProvider.GetRequiredService<DaprJobsClient>();

//创建我们希望与未来作业触发一起呈现的负载
var jobData = new BackupJobData("db-backup", new BackupMetadata("my-prod-db", "/backup-dir")); 

//将我们的负载序列化为UTF-8字节
var serializedJobData = JsonSerializer.SerializeToUtf8Bytes(jobData);

//调度我们的备份作业每分钟运行一次,但只重复10次
await daprJobsClient.ScheduleJobAsync("prod-db-backup", DaprJobSchedule.FromDuration(TimeSpan.FromMinutes(1)),
    serializedJobData, repeats: 10);

以下Go SDK代码示例调度名为prod-db-backup的作业。作业数据存储在备份数据库("my-prod-db")中,并使用ScheduleJobAlpha1进行调度。这提供了jobData,其中包括:

  • 备份Task名称
  • 备份任务的Metadata,包括:
    • 数据库名称(DBName
    • 数据库位置(BackupLocation
package main

import (
    //...

	daprc "github.com/dapr/go-sdk/client"
	"github.com/dapr/go-sdk/examples/dist-scheduler/api"
	"github.com/dapr/go-sdk/service/common"
	daprs "github.com/dapr/go-sdk/service/grpc"
)

func main() {
    // 初始化服务器
	server, err := daprs.NewService(":50070")
    // ...

	if err = server.AddJobEventHandler("prod-db-backup", prodDBBackupHandler); err != nil {
		log.Fatalf("failed to register job event handler: %v", err)
	}

	log.Println("starting server")
	go func() {
		if err = server.Start(); err != nil {
			log.Fatalf("failed to start server: %v", err)
		}
	}()
    // ...

    // 设置备份位置
	jobData, err := json.Marshal(&api.DBBackup{
		Task: "db-backup",
		Metadata: api.Metadata{
			DBName:         "my-prod-db",
			BackupLocation: "/backup-dir",
		},
	},
	)
	// ...
}

作业是通过设置Schedule和所需的Repeats数量来调度的。这些设置决定了作业应被触发并发送回应用程序的最大次数。

在此示例中,在触发时间,即根据Schedule@every 1s,此作业被触发并最多发送回应用程序Repeats10)次。

    // ...
    // 设置作业
	job := daprc.Job{
		Name:     "prod-db-backup",
		Schedule: "@every 1s",
		Repeats:  10,
		Data: &anypb.Any{
			Value: jobData,
		},
	}

在触发时间,调用prodDBBackupHandler函数,在触发时间执行此作业的所需业务逻辑。例如:

HTTP

当您使用Dapr的作业API创建作业时,Dapr会自动假定在/job/<job-name>有一个可用的端点。例如,如果您调度一个名为test的作业,Dapr期望您的应用程序在/job/test监听作业事件。确保您的应用程序为此端点设置了一个处理程序,以便在作业被触发时处理它。例如:

注意:以下示例是用Go编写的,但适用于任何编程语言。


func main() {
    ...
    http.HandleFunc("/job/", handleJob)
	http.HandleFunc("/job/<job-name>", specificJob)
    ...
}

func specificJob(w http.ResponseWriter, r *http.Request) {
    // 处理特定触发的作业
}

func handleJob(w http.ResponseWriter, r *http.Request) {
    // 处理触发的作业
}

gRPC

当作业到达其计划的触发时间时,触发的作业通过以下回调函数发送回应用程序:

注意:以下示例是用Go编写的,但适用于任何支持gRPC的编程语言。

import rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
...
func (s *JobService) OnJobEventAlpha1(ctx context.Context, in *rtv1.JobEventRequest) (*rtv1.JobEventResponse, error) {
    // 处理触发的作业
}

此函数在您的gRPC服务器上下文中处理触发的作业。当您设置服务器时,确保注册回调服务器,当作业被触发时将调用此函数:

...
js := &JobService{}
rtv1.RegisterAppCallbackAlphaServer(server, js)

在此设置中,您可以完全控制如何接收和处理触发的作业,因为它们直接通过此gRPC方法路由。

SDKs

对于SDK用户,处理触发的作业更简单。当作业被触发时,Dapr会自动将作业路由到您在服务器初始化期间设置的事件处理程序。例如,在Go中,您可以这样注册事件处理程序:

...
if err = server.AddJobEventHandler("prod-db-backup", prodDBBackupHandler); err != nil {
    log.Fatalf("failed to register job event handler: %v", err)
}

Dapr负责底层路由。当作业被触发时,您的prodDBBackupHandler函数将被调用,并带有触发的作业数据。以下是处理触发作业的示例:

// ...

// 在作业触发时调用此函数
func prodDBBackupHandler(ctx context.Context, job *common.JobEvent) error {
	var jobData common.Job
	if err := json.Unmarshal(job.Data, &jobData); err != nil {
		// ...
	}

	var jobPayload api.DBBackup
	if err := json.Unmarshal(job.Data, &jobPayload); err != nil {
		// ...
	}
	fmt.Printf("job %d received:\n type: %v \n typeurl: %v\n value: %v\n extracted payload: %v\n", jobCount, job.JobType, jobData.TypeURL, jobData.Value, jobPayload)
	jobCount++
	return nil
}

运行Dapr sidecar

一旦您在应用程序中设置了作业API,在终端窗口中使用以下命令运行Dapr sidecar。

dapr run --app-id=distributed-scheduler \
                --metrics-port=9091 \
                --dapr-grpc-port 50001 \
                --app-port 50070 \
                --app-protocol grpc \
                --log-level debug \
                go run ./main.go

下一步