1 - How to: Author and manage Dapr streaming subscriptions in the .NET SDK
Learn how to author and manage Dapr streaming subscriptions using the .NET SDK
Let’s create a subscription to a pub/sub topic or queue at using the streaming capability. We’ll use the
simple example provided here,
for the following demonstration and walk through it as an explainer of how you can configure message handlers at
runtime and which do not require an endpoint to be pre-configured. In this guide, you will:
- Deploy a .NET Web API application (StreamingSubscriptionExample)
- Utilize the Dapr .NET Messaging SDK to subscribe dynamically to a pub/sub topic.
Prerequisites
Note
Note that while .NET 6 is the minimum support version of .NET in Dapr v1.15, only .NET 8 and .NET 9 will continue to be supported by Dapr in v1.16 and later.
Set up the environment
Clone the .NET SDK repo.
git clone https://github.com/dapr/dotnet-sdk.git
From the .NET SDK root directory, navigate to the Dapr streaming PubSub example.
cd examples/Client/PublishSubscribe
Run the application locally
To run the Dapr application, you need to start the .NET program and a Dapr sidecar. Navigate to the StreamingSubscriptionExample
directory.
cd StreamingSubscriptionExample
We’ll run a command that starts both the Dapr sidecar and the .NET program at the same time.
dapr run --app-id pubsubapp --dapr-grpc-port 4001 --dapr-http-port 3500 -- dotnet run
Dapr listens for HTTP requests at http://localhost:3500
and internal Jobs gRPC requests at http://localhost:4001
.
Register the Dapr PubSub client with dependency injection
The Dapr Messaging SDK provides an extension method to simplify the registration of the Dapr PubSub client. Before
completing the dependency injection registration in Program.cs
, add the following line:
var builder = WebApplication.CreateBuilder(args);
//Add anywhere between these two
builder.Services.AddDaprPubSubClient(); //That's it
var app = builder.Build();
It’s possible that you may want to provide some configuration options to the Dapr PubSub client that
should be present with each call to the sidecar such as a Dapr API token, or you want to use a non-standard
HTTP or gRPC endpoint. This be possible through use of an overload of the registration method that allows configuration
of a DaprPublishSubscribeClientBuilder
instance:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient((_, daprPubSubClientBuilder) => {
daprPubSubClientBuilder.UseDaprApiToken("abc123");
daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512"); //Non-standard sidecar HTTP endpoint
});
var app = builder.Build();
Still, it’s possible that whatever values you wish to inject need to be retrieved from some other source, itself registered as a dependency. There’s one more overload you can use to inject an IServiceProvider
into the configuration action method. In the following example, we register a fictional singleton that can retrieve secrets from somewhere and pass it into the configuration method for AddDaprJobClient
so
we can retrieve our Dapr API token from somewhere else for registration here:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<SecretRetriever>();
builder.Services.AddDaprPubSubClient((serviceProvider, daprPubSubClientBuilder) => {
var secretRetriever = serviceProvider.GetRequiredService<SecretRetriever>();
var daprApiToken = secretRetriever.GetSecret("DaprApiToken").Value;
daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
daprPubSubClientBuilder.UseHttpEndpoint("http://localhost:8512");
});
var app = builder.Build();
Use the Dapr PubSub client using IConfiguration
It’s possible to configure the Dapr PubSub client using the values in your registered IConfiguration
as well without
explicitly specifying each of the value overrides using the DaprPublishSubscribeClientBuilder
as demonstrated in the previous
section. Rather, by populating an IConfiguration
made available through dependency injection the AddDaprPubSubClient()
registration will automatically use these values over their respective defaults.
Start by populating the values in your configuration. This can be done in several different ways as demonstrated below.
Configuration via ConfigurationBuilder
Application settings can be configured without using a configuration source and by instead populating the value in-memory
using a ConfigurationBuilder
instance:
var builder = WebApplication.CreateBuilder();
//Create the configuration
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string> {
{ "DAPR_HTTP_ENDPOINT", "http://localhost:54321" },
{ "DAPR_API_TOKEN", "abc123" }
})
.Build();
builder.Configuration.AddConfiguration(configuration);
builder.Services.AddDaprPubSubClient(); //This will automatically populate the HTTP endpoint and API token values from the IConfiguration
Configuration via Environment Variables
Application settings can be accessed from environment variables available to your application.
The following environment variables will be used to populate both the HTTP endpoint and API token used to register the
Dapr PubSub client.
Key |
Value |
DAPR_HTTP_ENDPOINT |
http://localhost:54321 |
DAPR_API_TOKEN |
abc123 |
var builder = WebApplication.CreateBuilder();
builder.Configuration.AddEnvironmentVariables();
builder.Services.AddDaprPubSubClient();
The Dapr PubSub client will be configured to use both the HTTP endpoint http://localhost:54321
and populate all outbound
requests with the API token header abc123
.
Configuration via prefixed Environment Variables
However, in shared-host scenarios where there are multiple applications all running on the same machine without using
containers or in development environments, it’s not uncommon to prefix environment variables. The following example
assumes that both the HTTP endpoint and the API token will be pulled from environment variables prefixed with the
value “myapp_”. The two environment variables used in this scenario are as follows:
Key |
Value |
myapp_DAPR_HTTP_ENDPOINT |
http://localhost:54321 |
myapp_DAPR_API_TOKEN |
abc123 |
These environment variables will be loaded into the registered configuration in the following example and made available
without the prefix attached.
var builder = WebApplication.CreateBuilder();
builder.Configuration.AddEnvironmentVariables(prefix: "myapp_");
builder.Services.AddDaprPubSubClient();
The Dapr PubSub client will be configured to use both the HTTP endpoint http://localhost:54321
and populate all outbound
requests with the API token header abc123
.
Use the Dapr PubSub client without relying on dependency injection
While the use of dependency injection simplifies the use of complex types in .NET and makes it easier to
deal with complicated configurations, you’re not required to register the DaprPublishSubscribeClient
in this way.
Rather, you can also elect to create an instance of it from a DaprPublishSubscribeClientBuilder
instance as
demonstrated below:
public class MySampleClass
{
public void DoSomething()
{
var daprPubSubClientBuilder = new DaprPublishSubscribeClientBuilder();
var daprPubSubClient = daprPubSubClientBuilder.Build();
//Do something with the `daprPubSubClient`
}
}
Set up message handler
The streaming subscription implementation in Dapr gives you greater control over handling backpressure from events by
leaving the messages in the Dapr runtime until your application is ready to accept them. The .NET SDK supports a
high-performance queue for maintaining a local cache of these messages in your application while processing is pending.
These messages will persist in the queue until processing either times out for each one or a response action is taken
for each (typically after processing succeeds or fails). Until this response action is received by the Dapr runtime,
the messages will be persisted by Dapr and made available in case of a service failure.
The various response actions available are as follows:
Response Action |
Description |
Retry |
The event should be delivered again in the future. |
Drop |
The event should be deleted (or forwarded to a dead letter queue, if configured) and not attempted again. |
Success |
The event should be deleted as it was successfully processed. |
The handler will receive only one message at a time and if a cancellation token is provided to the subscription,
this token will be provided during the handler invocation.
The handler must be configured to return a Task<TopicResponseAction>
indicating one of these operations, even if from
a try/catch block. If an exception is not caught by your handler, the subscription will use the response action configured
in the options during subscription registration.
The following demonstrates the sample message handler provided in the example:
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}
Configuration of the streaming subscription requires the name of the PubSub component registered with Dapr, the name
of the topic or queue being subscribed to, the DaprSubscriptionOptions
providing the configuration for the subscription,
the message handler and an optional cancellation token. The only required argument to the DaprSubscriptionOptions
is
the default MessageHandlingPolicy
which consists of a per-event timeout and the TopicResponseAction
to take when
that timeout occurs.
Other options are as follows:
Property Name |
Description |
Metadata |
Additional subscription metadata |
DeadLetterTopic |
The optional name of the dead-letter topic to send dropped messages to. |
MaximumQueuedMessages |
By default, there is no maximum boundary enforced for the internal queue, but setting this |
property would impose an upper limit. |
|
MaximumCleanupTimeout |
When the subscription is disposed of or the token flags a cancellation request, this specifies |
the maximum amount of time available to process the remaining messages in the internal queue. |
|
Subscription is then configured as in the following example:
var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60)); //Override the default of 30 seconds
var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry));
var subscription = await messagingClient.SubscribeAsync("pubsub", "mytopic", options, HandleMessageAsync, cancellationTokenSource.Token);
Terminate and clean up subscription
When you’ve finished with your subscription and wish to stop receiving new events, simply await a call to
DisposeAsync()
on your subscription instance. This will cause the client to unregister from additional events and
proceed to finish processing all the events still leftover in the backpressure queue, if any, before disposing of any
internal resources. This cleanup will be limited to the timeout interval provided in the DaprSubscriptionOptions
when
the subscription was registered and by default, this is set to 30 seconds.
2 - DaprPublishSubscribeClient usage
Essential tips and advice for using DaprPublishSubscribeClient
Lifetime management
A DaprPublishSubscribeClient
is a version of the Dapr client that is dedicated to interacting with the Dapr Messaging API.
It can be registered alongside a DaprClient
and other Dapr clients without issue.
It maintains access to networking resources in the form of TCP sockets used to communicate with the Dapr sidecar and implements
IAsyncDisposable
to support the eager cleanup of resources.
For best performance, create a single long-lived instance of DaprPublishSubscribeClient
and provide access to that shared
instance throughout your application. DaprPublishSubscribeClient
instances are thread-safe and intended to be shared.
This can be aided by utilizing the dependency injection functionality. The registration method supports registration using
as a singleton, a scoped instance or as transient (meaning it’s recreated every time it’s injected), but also enables
registration to utilize values from an IConfiguration
or other injected service in a way that’s impractical when
creating the client from scratch in each of your classes.
Avoid creating a DaprPublishSubscribeClient
for each operation and disposing it when the operation is complete. It’s
intended that the DaprPublishSubscribeClient
should only be disposed when you no longer wish to receive events on the
subscription as disposing it will cancel the ongoing receipt of new events.
Configuring DaprPublishSubscribeClient via the DaprPublishSubscribeClientBuilder
A DaprPublishSubscribeClient
can be configured by invoking methods on the DaprPublishSubscribeClientBuilder
class
before calling .Build()
to create the client itself. The settings for each DaprPublishSubscribeClient
are separate
and cannot be changed after calling .Build()
.
var daprPubsubClient = new DaprPublishSubscribeClientBuilder()
.UseDaprApiToken("abc123") // Specify the API token used to authenticate to other Dapr sidecars
.Build();
The DaprPublishSubscribeClientBuilder
contains settings for:
- The HTTP endpoint of the Dapr sidecar
- The gRPC endpoint of the Dapr sidecar
- The
JsonSerializerOptions
object used to configure JSON serialization
- The
GrpcChannelOptions
object used to configure gRPC
- The API token used to authenticate requests to the sidecar
- The factory method used to create the
HttpClient
instance used by the SDK
- The timeout used for the
HttpClient
instance when making requests to the sidecar
The SDK will read the following environment variables to configure the default values:
DAPR_HTTP_ENDPOINT
: used to find the HTTP endpoint of the Dapr sidecar, example: https://dapr-api.mycompany.com
DAPR_GRPC_ENDPOINT
: used to find the gRPC endpoint of the Dapr sidecar, example: https://dapr-grpc-api.mycompany.com
DAPR_HTTP_PORT
: if DAPR_HTTP_ENDPOINT
is not set, this is used to find the HTTP local endpoint of the Dapr sidecar
DAPR_GRPC_PORT
: if DAPR_GRPC_ENDPOINT
is not set, this is used to find the gRPC local endpoint of the Dapr sidecar
DAPR_API_TOKEN
: used to set the API token
Configuring gRPC channel options
Dapr’s use of CancellationToken
for cancellation relies on the configuration of the gRPC channel options. If you
need to configure these options yourself, make sure to enable the ThrowOperationCanceledOnCancellation setting.
var daprPubsubClient = new DaprPublishSubscribeClientBuilder()
.UseGrpcChannelOptions(new GrpcChannelOptions { ... ThrowOperationCanceledOnCancellation = true })
.Build();
Using cancellation with DaprPublishSubscribeClient
The APIs on DaprPublishSubscribeClient
perform asynchronous operations and accept an optional CancellationToken
parameter. This follows a standard .NET practice for cancellable operations. Note that when cancellation occurs, there is
no guarantee that the remote endpoint stops processing the request, only that the client has stopped waiting for completion.
When an operation is cancelled, it will throw an OperationCancelledException
.
Configuring DaprPublishSubscribeClient
via dependency injection
Using the built-in extension methods for registering the DaprPublishSubscribeClient
in a dependency injection container
can provide the benefit of registering the long-lived service a single time, centralize complex configuration and improve
performance by ensuring similarly long-lived resources are re-purposed when possible (e.g. HttpClient
instances).
There are three overloads available to give the developer the greatest flexibility in configuring the client for their
scenario. Each of these will register the IHttpClientFactory
on your behalf if not already registered, and configure
the DaprPublishSubscribeClientBuilder
to use it when creating the HttpClient
instance in order to re-use the same
instance as much as possible and avoid socket exhaustion and other issues.
In the first approach, there’s no configuration done by the developer and the DaprPublishSubscribeClient
is configured with
the default settings.
var builder = WebApplication.CreateBuilder(args);
builder.Services.DaprPublishSubscribeClient(); //Registers the `DaprPublishSubscribeClient` to be injected as needed
var app = builder.Build();
Sometimes the developer will need to configure the created client using the various configuration options detailed above. This is done through an overload that passes in the DaprJobsClientBuiler
and exposes methods for configuring the necessary options.
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprJobsClient((_, daprPubSubClientBuilder) => {
//Set the API token
daprPubSubClientBuilder.UseDaprApiToken("abc123");
//Specify a non-standard HTTP endpoint
daprPubSubClientBuilder.UseHttpEndpoint("http://dapr.my-company.com");
});
var app = builder.Build();
Finally, it’s possible that the developer may need to retrieve information from another service in order to populate these configuration values. That value may be provided from a DaprClient
instance, a vendor-specific SDK or some local service, but as long as it’s also registered in DI, it can be injected into this configuration operation via the last overload:
var builder = WebApplication.CreateBuilder(args);
//Register a fictional service that retrieves secrets from somewhere
builder.Services.AddSingleton<SecretService>();
builder.Services.AddDaprPublishSubscribeClient((serviceProvider, daprPubSubClientBuilder) => {
//Retrieve an instance of the `SecretService` from the service provider
var secretService = serviceProvider.GetRequiredService<SecretService>();
var daprApiToken = secretService.GetSecret("DaprApiToken").Value;
//Configure the `DaprPublishSubscribeClientBuilder`
daprPubSubClientBuilder.UseDaprApiToken(daprApiToken);
});
var app = builder.Build();