This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Dapr Messaging .NET SDK

Get up and running with the Dapr Messaging .NET SDK

With the Dapr Messaging package, you can interact with the Dapr messaging APIs from a .NET application. In the v1.15 release, this package only contains the functionality corresponding to the streaming PubSub capability.

Future Dapr .NET SDK releases will migrate existing messaging capabilities out from Dapr.Client to this Dapr.Messaging package. This will be documented in the release notes, documentation and obsolete attributes in advance.

To get started, walk through the Dapr Messaging how-to guide and refer to best practices documentation for additional guidance.

1 - How to: Author and manage Dapr streaming subscriptions in the .NET SDK

Learn how to author and manage Dapr streaming subscriptions using the .NET SDK

Let’s create a subscription to a pub/sub topic or queue at using the streaming capability. We’ll use the simple example provided here, for the following demonstration and walk through it as an explainer of how you can configure message handlers at runtime and which do not require an endpoint to be pre-configured. In this guide, you will:

  • Deploy a .NET Web API application (StreamingSubscriptionExample)
  • Utilize the Dapr .NET Messaging SDK to subscribe dynamically to a pub/sub topic.

Prerequisites

Set up the environment

Clone the .NET SDK repo.

git clone https://github.com/dapr/dotnet-sdk.git

From the .NET SDK root directory, navigate to the Dapr streaming PubSub example.

cd examples/Client/PublishSubscribe

Run the application locally

To run the Dapr application, you need to start the .NET program and a Dapr sidecar. Navigate to the StreamingSubscriptionExample directory.

cd StreamingSubscriptionExample

We’ll run a command that starts both the Dapr sidecar and the .NET program at the same time.

dapr run --app-id pubsubapp --dapr-grpc-port 4001 --dapr-http-port 3500 -- dotnet run

Dapr listens for HTTP requests at http://localhost:3500 and internal Jobs gRPC requests at http://localhost:4001.

Register the Dapr PubSub client with dependency injection

The Dapr Messaging SDK provides an extension method to simplify the registration of the Dapr PubSub client. Before completing the dependency injection registration in Program.cs, add the following line:

var builder = WebApplication.CreateBuilder(args);

//Add anywhere between these two
builder.Services.AddDaprPubSubClient(); //That's it

var app = builder.Build();

It’s possible that you may want to provide some configuration options to the Dapr PubSub client that should be present with each call to the sidecar such as a Dapr API token, or you want to use a non-standard HTTP or gRPC endpoint. This be possible through use of an overload of the registration method that allows configuration of a DaprPublishSubscribeClientBuilder instance:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprPubSubClient((_, daprPubSubClientBuilder) => {
    daprPubSubClientBuilder.UseDaprApiToken("abc123");
    daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512"); //Non-standard sidecar HTTP endpoint
});

var app = builder.Build();

Still, it’s possible that whatever values you wish to inject need to be retrieved from some other source, itself registered as a dependency. There’s one more overload you can use to inject an IServiceProvider into the configuration action method. In the following example, we register a fictional singleton that can retrieve secrets from somewhere and pass it into the configuration method for AddDaprJobClient so we can retrieve our Dapr API token from somewhere else for registration here:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<SecretRetriever>();
builder.Services.AddDaprPubSubClient((serviceProvider, daprPubSubClientBuilder) => {
    var secretRetriever = serviceProvider.GetRequiredService<SecretRetriever>();
    var daprApiToken = secretRetriever.GetSecret("DaprApiToken").Value;
    daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
    
    daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512");
});

var app = builder.Build();

Use the Dapr PubSub client using IConfiguration

It’s possible to configure the Dapr PubSub client using the values in your registered IConfiguration as well without explicitly specifying each of the value overrides using the DaprPublishSubscribeClientBuilder as demonstrated in the previous section. Rather, by populating an IConfiguration made available through dependency injection the AddDaprPubSubClient() registration will automatically use these values over their respective defaults.

Start by populating the values in your configuration. This can be done in several different ways as demonstrated below.

Configuration via ConfigurationBuilder

Application settings can be configured without using a configuration source and by instead populating the value in-memory using a ConfigurationBuilder instance:

var builder = WebApplication.CreateBuilder();

//Create the configuration
var configuration = new ConfigurationBuilder()
    .AddInMemoryCollection(new Dictionary<string, string> {
            { "DAPR_HTTP_ENDPOINT", "http://localhost:54321" },
            { "DAPR_API_TOKEN", "abc123" }
        })
    .Build();

builder.Configuration.AddConfiguration(configuration);
builder.Services.AddDaprPubSubClient(); //This will automatically populate the HTTP endpoint and API token values from the IConfiguration

Configuration via Environment Variables

Application settings can be accessed from environment variables available to your application.

The following environment variables will be used to populate both the HTTP endpoint and API token used to register the Dapr PubSub client.

Key Value
DAPR_HTTP_ENDPOINT http://localhost:54321
DAPR_API_TOKEN abc123
var builder = WebApplication.CreateBuilder();

builder.Configuration.AddEnvironmentVariables();
builder.Services.AddDaprPubSubClient();

The Dapr PubSub client will be configured to use both the HTTP endpoint http://localhost:54321 and populate all outbound requests with the API token header abc123.

Configuration via prefixed Environment Variables

However, in shared-host scenarios where there are multiple applications all running on the same machine without using containers or in development environments, it’s not uncommon to prefix environment variables. The following example assumes that both the HTTP endpoint and the API token will be pulled from environment variables prefixed with the value “myapp_”. The two environment variables used in this scenario are as follows:

Key Value
myapp_DAPR_HTTP_ENDPOINT http://localhost:54321
myapp_DAPR_API_TOKEN abc123

These environment variables will be loaded into the registered configuration in the following example and made available without the prefix attached.

var builder = WebApplication.CreateBuilder();

builder.Configuration.AddEnvironmentVariables(prefix: "myapp_");
builder.Services.AddDaprPubSubClient();

The Dapr PubSub client will be configured to use both the HTTP endpoint http://localhost:54321 and populate all outbound requests with the API token header abc123.

Use the Dapr PubSub client without relying on dependency injection

While the use of dependency injection simplifies the use of complex types in .NET and makes it easier to deal with complicated configurations, you’re not required to register the DaprPublishSubscribeClient in this way. Rather, you can also elect to create an instance of it from a DaprPublishSubscribeClientBuilder instance as demonstrated below:


public class MySampleClass
{
    public void DoSomething()
    {
        var daprPubSubClientBuilder = new DaprPublishSubscribeClientBuilder();
        var daprPubSubClient = daprPubSubClientBuilder.Build();

        //Do something with the `daprPubSubClient`
    }
}

Set up message handler

The streaming subscription implementation in Dapr gives you greater control over handling backpressure from events by leaving the messages in the Dapr runtime until your application is ready to accept them. The .NET SDK supports a high-performance queue for maintaining a local cache of these messages in your application while processing is pending. These messages will persist in the queue until processing either times out for each one or a response action is taken for each (typically after processing succeeds or fails). Until this response action is received by the Dapr runtime, the messages will be persisted by Dapr and made available in case of a service failure.

The various response actions available are as follows:

Response Action Description
Retry The event should be delivered again in the future.
Drop The event should be deleted (or forwarded to a dead letter queue, if configured) and not attempted again.
Success The event should be deleted as it was successfully processed.

The handler will receive only one message at a time and if a cancellation token is provided to the subscription, this token will be provided during the handler invocation.

The handler must be configured to return a Task<TopicResponseAction> indicating one of these operations, even if from a try/catch block. If an exception is not caught by your handler, the subscription will use the response action configured in the options during subscription registration.

The following demonstrates the sample message handler provided in the example:

Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
    try
    {
        //Do something with the message
        Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
        return Task.FromResult(TopicResponseAction.Success);
    }
    catch
    {
        return Task.FromResult(TopicResponseAction.Retry);
    }
}

Configure and subscribe to the PubSub topic

Configuration of the streaming subscription requires the name of the PubSub component registered with Dapr, the name of the topic or queue being subscribed to, the DaprSubscriptionOptions providing the configuration for the subscription, the message handler and an optional cancellation token. The only required argument to the DaprSubscriptionOptions is the default MessageHandlingPolicy which consists of a per-event timeout and the TopicResponseAction to take when that timeout occurs.

Other options are as follows:

Property Name Description
Metadata Additional subscription metadata
DeadLetterTopic The optional name of the dead-letter topic to send dropped messages to.
MaximumQueuedMessages By default, there is no maximum boundary enforced for the internal queue, but setting this
property would impose an upper limit.
MaximumCleanupTimeout When the subscription is disposed of or the token flags a cancellation request, this specifies
the maximum amount of time available to process the remaining messages in the internal queue.

Subscription is then configured as in the following example:

var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();

var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60)); //Override the default of 30 seconds
var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry));
var subscription = await messagingClient.SubscribeAsync("pubsub", "mytopic", options, HandleMessageAsync, cancellationTokenSource.Token);

Terminate and clean up subscription

When you’ve finished with your subscription and wish to stop receiving new events, simply await a call to DisposeAsync() on your subscription instance. This will cause the client to unregister from additional events and proceed to finish processing all the events still leftover in the backpressure queue, if any, before disposing of any internal resources. This cleanup will be limited to the timeout interval provided in the DaprSubscriptionOptions when the subscription was registered and by default, this is set to 30 seconds.

2 - DaprPublishSubscribeClient usage

Essential tips and advice for using DaprPublishSubscribeClient

Lifetime management

A DaprPublishSubscribeClient is a version of the Dapr client that is dedicated to interacting with the Dapr Messaging API. It can be registered alongside a DaprClient and other Dapr clients without issue.

It maintains access to networking resources in the form of TCP sockets used to communicate with the Dapr sidecar and implements IAsyncDisposable to support the eager cleanup of resources.

For best performance, create a single long-lived instance of DaprPublishSubscribeClient and provide access to that shared instance throughout your application. DaprPublishSubscribeClient instances are thread-safe and intended to be shared.

This can be aided by utilizing the dependency injection functionality. The registration method supports registration using as a singleton, a scoped instance or as transient (meaning it’s recreated every time it’s injected), but also enables registration to utilize values from an IConfiguration or other injected service in a way that’s impractical when creating the client from scratch in each of your classes.

Avoid creating a DaprPublishSubscribeClient for each operation and disposing it when the operation is complete. It’s intended that the DaprPublishSubscribeClient should only be disposed when you no longer wish to receive events on the subscription as disposing it will cancel the ongoing receipt of new events.

Configuring DaprPublishSubscribeClient via the DaprPublishSubscribeClientBuilder

A DaprPublishSubscribeClient can be configured by invoking methods on the DaprPublishSubscribeClientBuilder class before calling .Build() to create the client itself. The settings for each DaprPublishSubscribeClient are separate and cannot be changed after calling .Build().

var daprPubsubClient = new DaprPublishSubscribeClientBuilder()
    .UseDaprApiToken("abc123") // Specify the API token used to authenticate to other Dapr sidecars
    .Build();

The DaprPublishSubscribeClientBuilder contains settings for:

  • The HTTP endpoint of the Dapr sidecar
  • The gRPC endpoint of the Dapr sidecar
  • The JsonSerializerOptions object used to configure JSON serialization
  • The GrpcChannelOptions object used to configure gRPC
  • The API token used to authenticate requests to the sidecar
  • The factory method used to create the HttpClient instance used by the SDK
  • The timeout used for the HttpClient instance when making requests to the sidecar

The SDK will read the following environment variables to configure the default values:

  • DAPR_HTTP_ENDPOINT: used to find the HTTP endpoint of the Dapr sidecar, example: https://dapr-api.mycompany.com
  • DAPR_GRPC_ENDPOINT: used to find the gRPC endpoint of the Dapr sidecar, example: https://dapr-grpc-api.mycompany.com
  • DAPR_HTTP_PORT: if DAPR_HTTP_ENDPOINT is not set, this is used to find the HTTP local endpoint of the Dapr sidecar
  • DAPR_GRPC_PORT: if DAPR_GRPC_ENDPOINT is not set, this is used to find the gRPC local endpoint of the Dapr sidecar
  • DAPR_API_TOKEN: used to set the API token

Configuring gRPC channel options

Dapr’s use of CancellationToken for cancellation relies on the configuration of the gRPC channel options. If you need to configure these options yourself, make sure to enable the ThrowOperationCanceledOnCancellation setting.

var daprPubsubClient = new DaprPublishSubscribeClientBuilder()
    .UseGrpcChannelOptions(new GrpcChannelOptions { ... ThrowOperationCanceledOnCancellation = true })
    .Build();

Using cancellation with DaprPublishSubscribeClient

The APIs on DaprPublishSubscribeClient perform asynchronous operations and accept an optional CancellationToken parameter. This follows a standard .NET practice for cancellable operations. Note that when cancellation occurs, there is no guarantee that the remote endpoint stops processing the request, only that the client has stopped waiting for completion.

When an operation is cancelled, it will throw an OperationCancelledException.

Configuring DaprPublishSubscribeClient via dependency injection

Using the built-in extension methods for registering the DaprPublishSubscribeClient in a dependency injection container can provide the benefit of registering the long-lived service a single time, centralize complex configuration and improve performance by ensuring similarly long-lived resources are re-purposed when possible (e.g. HttpClient instances).

There are three overloads available to give the developer the greatest flexibility in configuring the client for their scenario. Each of these will register the IHttpClientFactory on your behalf if not already registered, and configure the DaprPublishSubscribeClientBuilder to use it when creating the HttpClient instance in order to re-use the same instance as much as possible and avoid socket exhaustion and other issues.

In the first approach, there’s no configuration done by the developer and the DaprPublishSubscribeClient is configured with the default settings.

var builder = WebApplication.CreateBuilder(args);

builder.Services.DaprPublishSubscribeClient(); //Registers the `DaprPublishSubscribeClient` to be injected as needed
var app = builder.Build();

Sometimes the developer will need to configure the created client using the various configuration options detailed above. This is done through an overload that passes in the DaprJobsClientBuiler and exposes methods for configuring the necessary options.

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddDaprJobsClient((_, daprPubSubClientBuilder) => {
   //Set the API token
   daprPubSubClientBuilder.UseDaprApiToken("abc123");
   //Specify a non-standard HTTP endpoint
   daprPubSubClientBuilder.UseHttpEndpoint("http://dapr.my-company.com");
});

var app = builder.Build();

Finally, it’s possible that the developer may need to retrieve information from another service in order to populate these configuration values. That value may be provided from a DaprClient instance, a vendor-specific SDK or some local service, but as long as it’s also registered in DI, it can be injected into this configuration operation via the last overload:

var builder = WebApplication.CreateBuilder(args);

//Register a fictional service that retrieves secrets from somewhere
builder.Services.AddSingleton<SecretService>();

builder.Services.AddDaprPublishSubscribeClient((serviceProvider, daprPubSubClientBuilder) => {
    //Retrieve an instance of the `SecretService` from the service provider
    var secretService = serviceProvider.GetRequiredService<SecretService>();
    var daprApiToken = secretService.GetSecret("DaprApiToken").Value;

    //Configure the `DaprPublishSubscribeClientBuilder`
    daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
});

var app = builder.Build();