This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Dapr Messaging .NET SDK

快速上手使用 Dapr Messaging .NET SDK

使用 Dapr Messaging 包,您可以在 .NET 应用程序中与 Dapr 消息 API 进行交互。在 v1.15 版本中,该包仅支持流式 pubsub 功能

未来的 Dapr .NET SDK 版本将会把现有的消息功能从 Dapr.Client 迁移到 Dapr.Messaging 包中。这一变更将在发布说明、文档和相关的技术说明中提前告知。

要开始使用,请查看 Dapr Messaging 指南,并参考最佳实践文档以获取更多指导。

1 - 如何:在 .NET SDK 中编写和管理 Dapr 流式订阅

学习如何使用 .NET SDK 编写和管理 Dapr 流式订阅

我们来创建一个使用流式功能的发布/订阅主题或队列的订阅。我们将使用此处提供的简单示例,进行演示,并逐步讲解如何在运行时配置消息处理程序,而无需预先配置端点。在本指南中,您将会学习如何:

前提条件

设置环境

克隆 .NET SDK 仓库

git clone https://github.com/dapr/dotnet-sdk.git

从 .NET SDK 根目录,导航到 Dapr 流式发布/订阅示例。

cd examples/Client/PublishSubscribe

本地运行应用程序

要运行 Dapr 应用程序,您需要启动 .NET 程序和一个 Dapr sidecar。导航到 StreamingSubscriptionExample 目录。

cd StreamingSubscriptionExample

我们将运行一个命令,同时启动 Dapr sidecar 和 .NET 程序。

dapr run --app-id pubsubapp --dapr-grpc-port 4001 --dapr-http-port 3500 -- dotnet run

Dapr 监听 HTTP 请求在 http://localhost:3500,而 gRPC 请求在 http://localhost:4001

使用依赖注入注册 Dapr PubSub 客户端

Dapr Messaging SDK 提供了一个扩展方法来简化 Dapr PubSub 客户端的注册。在 Program.cs 中完成依赖注入注册之前,添加以下行:

var builder = WebApplication.CreateBuilder(args);

//可以在这两行之间的任何位置添加
builder.Services.AddDaprPubSubClient(); //就是这样

var app = builder.Build();

您可能希望为 Dapr PubSub 客户端提供一些配置选项,这些选项应在每次调用 sidecar 时存在,例如 Dapr API 令牌,或者您希望使用非标准的 HTTP 或 gRPC 端点。这可以通过使用允许配置 DaprPublishSubscribeClientBuilder 实例的注册方法重载来实现:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprPubSubClient((_, daprPubSubClientBuilder) => {
    daprPubSubClientBuilder.UseDaprApiToken("abc123");
    daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512"); //非标准 sidecar HTTP 端点
});

var app = builder.Build();

尽管如此,您可能希望注入的任何值需要从其他来源检索,该来源本身注册为依赖项。您可以使用另一个重载将 IServiceProvider 注入到配置操作方法中。在以下示例中,我们注册了一个虚构的单例,可以从某处检索 secret 并将其传递到 AddDaprJobClient 的配置方法中,以便我们可以从其他地方检索我们的 Dapr API 令牌以在此处注册:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<SecretRetriever>();
builder.Services.AddDaprPubSubClient((serviceProvider, daprPubSubClientBuilder) => {
    var secretRetriever = serviceProvider.GetRequiredService<SecretRetriever>();
    var daprApiToken = secretRetriever.GetSecret("DaprApiToken").Value;
    daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
    
    daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512");
});

var app = builder.Build();

使用 IConfiguration 使用 Dapr PubSub 客户端

可以使用注册的 IConfiguration 中的值配置 Dapr PubSub 客户端,而无需显式指定每个值覆盖,如前一节中使用 DaprPublishSubscribeClientBuilder 所示。相反,通过填充通过依赖注入提供的 IConfigurationAddDaprPubSubClient() 注册将自动使用这些值覆盖其各自的默认值。

首先在您的配置中填充值。这可以通过多种不同的方式完成,如下所示。

通过 ConfigurationBuilder 配置

应用程序设置可以在不使用配置源的情况下配置,而是通过使用 ConfigurationBuilder 实例在内存中填充值:

var builder = WebApplication.CreateBuilder();

//创建配置
var configuration = new ConfigurationBuilder()
    .AddInMemoryCollection(new Dictionary<string, string> {
            { "DAPR_HTTP_ENDPOINT", "http://localhost:54321" },
            { "DAPR_API_TOKEN", "abc123" }
        })
    .Build();

builder.Configuration.AddConfiguration(configuration);
builder.Services.AddDaprPubSubClient(); //这将自动从 IConfiguration 填充 HTTP 端点和 API 令牌值

通过环境变量配置

应用程序设置可以从可用于您的应用程序的环境变量中访问。

以下环境变量将用于填充用于注册 Dapr PubSub 客户端的 HTTP 端点和 API 令牌。

DAPR_HTTP_ENDPOINThttp://localhost:54321
DAPR_API_TOKENabc123
var builder = WebApplication.CreateBuilder();

builder.Configuration.AddEnvironmentVariables();
builder.Services.AddDaprPubSubClient();

Dapr PubSub 客户端将被配置为使用 HTTP 端点 http://localhost:54321 并用 API 令牌头 abc123 填充所有出站请求。

通过前缀环境变量配置

然而,在共享主机场景中,多个应用程序都在同一台机器上运行而不使用容器或在开发环境中,前缀环境变量并不罕见。以下示例假设 HTTP 端点和 API 令牌都将从前缀为 “myapp_” 的环境变量中提取。在此场景中使用的两个环境变量如下:

myapp_DAPR_HTTP_ENDPOINThttp://localhost:54321
myapp_DAPR_API_TOKENabc123

这些环境变量将在以下示例中加载到注册的配置中,并在没有附加前缀的情况下提供。

var builder = WebApplication.CreateBuilder();

builder.Configuration.AddEnvironmentVariables(prefix: "myapp_");
builder.Services.AddDaprPubSubClient();

Dapr PubSub 客户端将被配置为使用 HTTP 端点 http://localhost:54321 并用 API 令牌头 abc123 填充所有出站请求。

不依赖于依赖注入使用 Dapr PubSub 客户端

虽然使用依赖注入简化了 .NET 中复杂类型的使用,并使处理复杂配置变得更容易,但您不需要以这种方式注册 DaprPublishSubscribeClient。相反,您还可以选择从 DaprPublishSubscribeClientBuilder 实例创建它的实例,如下所示:


public class MySampleClass
{
    public void DoSomething()
    {
        var daprPubSubClientBuilder = new DaprPublishSubscribeClientBuilder();
        var daprPubSubClient = daprPubSubClientBuilder.Build();

        //使用 `daprPubSubClient` 做一些事情
    }
}

设置消息处理程序

Dapr 中的流式订阅实现使您可以更好地控制事件的背压处理,通过在您的应用程序准备好接受它们之前将消息保留在 Dapr 运行时中。 .NET SDK 支持一个高性能队列,用于在处理挂起时在您的应用程序中维护这些消息的本地缓存。这些消息将保留在队列中,直到每个消息的处理超时或采取响应操作(通常在处理成功或失败后)。在 Dapr 运行时收到此响应操作之前,消息将由 Dapr 保留,并在服务故障时可用。

可用的各种响应操作如下:

响应操作描述
重试事件应在将来再次传递。
丢弃事件应被删除(或转发到死信队列,如果已配置)并且不再尝试。
成功事件应被删除,因为它已成功处理。

处理程序将一次只接收一条消息,如果为订阅提供了取消令牌,则将在处理程序调用期间提供此令牌。

处理程序必须配置为返回一个 Task<TopicResponseAction>,指示这些操作之一,即使是从 try/catch 块中返回。如果您的处理程序未捕获异常,订阅将在订阅注册期间配置的选项中使用响应操作。

以下演示了示例中提供的示例消息处理程序:

Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
    try
    {
        //对消息做一些事情
        Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
        return Task.FromResult(TopicResponseAction.Success);
    }
    catch
    {
        return Task.FromResult(TopicResponseAction.Retry);
    }
}

配置并订阅 PubSub 主题

流式订阅的配置需要在 Dapr 中注册的 PubSub 组件的名称、要订阅的主题或队列的名称、提供订阅配置的 DaprSubscriptionOptions、消息处理程序和可选的取消令牌。 DaprSubscriptionOptions 的唯一必需参数是默认的 MessageHandlingPolicy,它由每个事件的超时和超时时要采取的 TopicResponseAction 组成。

其他选项如下:

属性名称描述
Metadata额外的订阅元数据
DeadLetterTopic发送丢弃消息的死信主题的可选名称。
MaximumQueuedMessages默认情况下,内部队列没有强制的最大边界,但设置此属性将施加上限。
MaximumCleanupTimeout当订阅被处理或令牌标记取消请求时,这指定了处理内部队列中剩余消息的最大时间。

然后按以下示例配置订阅:

var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();

var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60)); //覆盖默认的30秒
var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry));
var subscription = await messagingClient.SubscribeAsync("pubsub", "mytopic", options, HandleMessageAsync, cancellationTokenSource.Token);

终止并清理订阅

当您完成订阅并希望停止接收新事件时,只需等待对订阅实例的 DisposeAsync() 调用。这将导致客户端取消注册其他事件,并在处理所有仍在背压队列中的事件(如果有)后,处理任何内部资源。此清理将限于在注册订阅时提供的 DaprSubscriptionOptions 中的超时间隔,默认情况下设置为 30 秒。

2 - DaprPublishSubscribeClient 使用指南

使用 DaprPublishSubscribeClient 的基本提示和建议

生命周期管理

DaprPublishSubscribeClient 是 Dapr 客户端的一个版本,专门用于与 Dapr 消息 API 交互。它可以与 DaprClient 和其他 Dapr 客户端一起注册而不会出现问题。

它通过 TCP 套接字与 Dapr sidecar 通信,维护对网络资源的访问,并实现了 IAsyncDisposable 接口以支持资源的快速清理。

为了获得最佳性能,建议创建一个长生命周期的 DaprPublishSubscribeClient 实例,并在整个应用程序中共享使用。DaprPublishSubscribeClient 实例是线程安全的,适合共享使用。

可以通过依赖注入来实现这一点。注册方法支持以单例、作用域实例或瞬态(每次注入时重新创建)的方式进行注册,但也可以通过 IConfiguration 或其他注入服务的值来注册,这在每个类中从头创建客户端时是不切实际的。

避免为每个操作创建一个 DaprPublishSubscribeClient 并在操作完成后销毁它。DaprPublishSubscribeClient 应仅在您不再希望接收订阅事件时才被销毁,因为销毁它将取消正在进行的新事件接收。

通过 DaprPublishSubscribeClientBuilder 配置 DaprPublishSubscribeClient

可以通过在 DaprPublishSubscribeClientBuilder 类上调用方法来配置 DaprPublishSubscribeClient,然后调用 .Build() 来创建客户端本身。每个 DaprPublishSubscribeClient 的设置是独立的,并且在调用 .Build() 之后无法更改。

var daprPubsubClient = new DaprPublishSubscribeClientBuilder()
    .UseDaprApiToken("abc123") // 指定用于认证到其他 Dapr sidecar 的 API 令牌
    .Build();

DaprPublishSubscribeClientBuilder 包含以下设置:

  • Dapr sidecar 的 HTTP 端点
  • Dapr sidecar 的 gRPC 端点
  • 用于配置 JSON 序列化的 JsonSerializerOptions 对象
  • 用于配置 gRPC 的 GrpcChannelOptions 对象
  • 用于认证请求到 sidecar 的 API 令牌
  • 用于创建 SDK 使用的 HttpClient 实例的工厂方法
  • 用于在向 sidecar 发出请求时使用的 HttpClient 实例的超时

SDK 将读取以下环境变量来配置默认值:

  • DAPR_HTTP_ENDPOINT:用于查找 Dapr sidecar 的 HTTP 端点,例如:https://dapr-api.mycompany.com
  • DAPR_GRPC_ENDPOINT:用于查找 Dapr sidecar 的 gRPC 端点,例如:https://dapr-grpc-api.mycompany.com
  • DAPR_HTTP_PORT:如果未设置 DAPR_HTTP_ENDPOINT,则用于查找 Dapr sidecar 的本地 HTTP 端点
  • DAPR_GRPC_PORT:如果未设置 DAPR_GRPC_ENDPOINT,则用于查找 Dapr sidecar 的本地 gRPC 端点
  • DAPR_API_TOKEN:用于设置 API 令牌

配置 gRPC 通道选项

Dapr 使用 CancellationToken 进行取消依赖于 gRPC 通道选项的配置。如果您需要自行配置这些选项,请确保启用 ThrowOperationCanceledOnCancellation 设置

var daprPubsubClient = new DaprPublishSubscribeClientBuilder()
    .UseGrpcChannelOptions(new GrpcChannelOptions { ... ThrowOperationCanceledOnCancellation = true })
    .Build();

使用 DaprPublishSubscribeClient 进行取消操作

DaprPublishSubscribeClient 上的 API 执行异步操作并接受一个可选的 CancellationToken 参数。这遵循 .NET 的标准做法,用于可取消的操作。请注意,当取消发生时,不能保证远程端点停止处理请求,只能保证客户端已停止等待完成。

当操作被取消时,它将抛出一个 OperationCancelledException

通过依赖注入配置 DaprPublishSubscribeClient

使用内置的扩展方法在依赖注入容器中注册 DaprPublishSubscribeClient 可以提供注册长生命周期服务一次的好处,集中复杂的配置并通过确保在可能的情况下重新利用类似的长生命周期资源(例如 HttpClient 实例)来提高性能。

有三种重载可用,以便为开发人员提供最大的灵活性来配置客户端以适应他们的场景。每个重载都会代表您注册 IHttpClientFactory(如果尚未注册),并配置 DaprPublishSubscribeClientBuilder 以在创建 HttpClient 实例时使用它,以便尽可能多地重用相同的实例并避免套接字耗尽和其他问题。

在第一种方法中,开发人员没有进行任何配置,DaprPublishSubscribeClient 使用默认设置进行配置。

var builder = WebApplication.CreateBuilder(args);

builder.Services.DaprPublishSubscribeClient(); //根据需要注册 `DaprPublishSubscribeClient` 以进行注入
var app = builder.Build();

有时,开发人员需要使用上述各种配置选项来配置创建的客户端。这是通过传入 DaprJobsClientBuiler 的重载来完成的,并公开用于配置必要选项的方法。

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient((_, daprPubSubClientBuilder) => {
   //设置 API 令牌
   daprPubSubClientBuilder.UseDaprApiToken("abc123");
   //指定非标准 HTTP 端点
   daprPubSubClientBuilder.UseHttpEndpoint("http://dapr.my-company.com");
});

var app = builder.Build();

最后,开发人员可能需要从其他服务中检索信息以填充这些配置值。该值可以从 DaprClient 实例、供应商特定的 SDK 或某些本地服务中提供,但只要它也在 DI 中注册,就可以通过最后一个重载将其注入到此配置操作中:

var builder = WebApplication.CreateBuilder(args);

//注册一个虚构的服务,从某处检索秘密
builder.Services.AddSingleton<SecretService>();

builder.Services.AddDaprPublishSubscribeClient((serviceProvider, daprPubSubClientBuilder) => {
    //从服务提供者中检索 `SecretService` 的实例
    var secretService = serviceProvider.GetRequiredService<SecretService>();
    var daprApiToken = secretService.GetSecret("DaprApiToken").Value;

    //配置 `DaprPublishSubscribeClientBuilder`
    daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
});

var app = builder.Build();