The Dapr SDKs are the easiest way for you to create pluggable components. Choose your favorite language and start creating components in minutes.
Pluggable components SDKs
Language | Status |
---|---|
Go | In development |
.NET | In development |
This is the multi-page printable view of this section. Click here to print.
Dapr offers NuGet packages to help with the development of .NET pluggable components.
Creating a pluggable component starts with an empty ASP.NET project.
dotnet new web --name <project name>
Add the Dapr .NET pluggable components NuGet package.
dotnet add package Dapr.PluggableComponents.AspNetCore
Creating a Dapr pluggable component application is similar to creating an ASP.NET application. In Program.cs
, replace the WebApplication
related code with the Dapr DaprPluggableComponentsApplication
equivalent.
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
// Register one or more components with this service.
});
app.Run();
This creates an application with a single service. Each service:
Pluggable components can be tested by starting the application on the command line and configuring a Dapr sidecar to use it.
To start the component, in the application directory:
dotnet run
To configure Dapr to use the component, in the resources path directory:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: <component name>
spec:
type: state.<socket name>
version: v1
metadata:
- name: key1
value: value1
- name: key2
value: value2
Any metadata
properties will be passed to the component via its IPluggableComponent.InitAsync()
method when the component is instantiated.
To start Dapr (and, optionally, the service making use of the service):
dapr run --app-id <app id> --resources-path <resources path> ...
At this point, the Dapr sidecar will have started and connected via Unix Domain Socket to the component. You can then interact with the component either:
There are several ways to create a container for your component for eventual deployment.
The .NET 7 and later SDKs enable you to create a .NET-based container for your application without a Dockerfile
, even for those targeting earlier versions of the .NET SDK. This is probably the simplest way of generating a container for your component today.
Currently, the .NET 7 SDK requires Docker Desktop on the local machine, a special NuGet package, and Docker Desktop on the local machine to build containers. Future versions of .NET SDK plan to eliminate those requirements.
Multiple versions of the .NET SDK can be installed on the local machine at the same time.
Add the Microsoft.NET.Build.Containers
NuGet package to the component project.
dotnet add package Microsoft.NET.Build.Containers
Publish the application as a container:
dotnet publish --os linux --arch x64 /t:PublishContainer -c Release
--arch x64
matches that of the component’s ultimate deployment target. By default, the architecture of the generated container matches that of the local machine. For example, if the local machine is ARM64-based (for example, a M1 or M2 Mac) and the argument is omitted, an ARM64 container will be generated which may not be compatible with deployment targets expecting an AMD64 container.For more configuration options, such as controlling the container name, tag, and base image, see the .NET publish as container guide.
While there are tools that can generate a Dockerfile
for a .NET application, the .NET SDK itself does not. A typical Dockerfile
might look like:
FROM mcr.microsoft.com/dotnet/aspnet:<runtime> AS base
WORKDIR /app
# Creates a non-root user with an explicit UID and adds permission to access the /app folder
# For more info, please refer to https://aka.ms/vscode-docker-dotnet-configure-containers
RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app
USER appuser
FROM mcr.microsoft.com/dotnet/sdk:<runtime> AS build
WORKDIR /src
COPY ["<application>.csproj", "<application folder>/"]
RUN dotnet restore "<application folder>/<application>.csproj"
COPY . .
WORKDIR "/src/<application folder>"
RUN dotnet build "<application>.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "<application>.csproj" -c Release -o /app/publish /p:UseAppHost=false
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "<application>.dll"]
Build the image:
docker build -f Dockerfile -t <image name>:<tag> .
COPY
operations in the Dockerfile
are relative to the Docker context passed when building the image, while the Docker context itself will vary depending on the needs of the project being built (for example, if it has referenced projects). In the example above, the assumption is that the Docker context is the component project directory.Watch this video for a demo on building pluggable components with .NET:
Creating a binding component requires just a few basic steps.
Add using
statements for the bindings related namespaces.
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.Bindings;
IInputBinding
Create a class that implements the IInputBinding
interface.
internal sealed class MyBinding : IInputBinding
{
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// Called to initialize the component with its configured metadata...
}
public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
// Until canceled, check the underlying store for messages and deliver them to the Dapr runtime...
}
}
Calls to the ReadAsync()
method are “long-lived”, in that the method is not expected to return until canceled (for example, via the cancellationToken
). As messages are read from the underlying store of the component, they are delivered to the Dapr runtime via the deliveryHandler
callback. Delivery allows the component to receive notification if/when the application (served by the Dapr runtime) acknowledges processing of the message.
public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
TimeSpan pollInterval = // Polling interval (e.g. from initalization metadata)...
// Poll the underlying store until canceled...
while (!cancellationToken.IsCancellationRequested)
{
var messages = // Poll underlying store for messages...
foreach (var message in messages)
{
// Deliver the message to the Dapr runtime...
await deliveryHandler(
new InputBindingReadResponse
{
// Set the message content...
},
// Callback invoked when application acknowledges the message...
async request =>
{
// Process response data or error message...
})
}
// Wait for the next poll (or cancellation)...
await Task.Delay(pollInterval, cancellationToken);
}
}
IOutputBinding
Create a class that implements the IOutputBinding
interface.
internal sealed class MyBinding : IOutputBinding
{
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// Called to initialize the component with its configured metadata...
}
public Task<OutputBindingInvokeResponse> InvokeAsync(OutputBindingInvokeRequest request, CancellationToken cancellationToken = default)
{
// Called to invoke a specific operation...
}
public Task<string[]> ListOperationsAsync(CancellationToken cancellationToken = default)
{
// Called to list the operations that can be invoked.
}
}
A component can be both an input and output binding, simply by implementing both interfaces.
internal sealed class MyBinding : IInputBinding, IOutputBinding
{
// IInputBinding Implementation...
// IOutputBinding Implementation...
}
In the main program file (for example, Program.cs
), register the binding component in an application service.
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
serviceBuilder.RegisterBinding<MyBinding>();
});
app.Run();
IInputBinding
and IOutputBinding
will be registered as both an input and output binding.Creating a pub/sub component requires just a few basic steps.
Add using
statements for the pub/sub related namespaces.
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.PubSub;
IPubSub
Create a class that implements the IPubSub
interface.
internal sealed class MyPubSub : IPubSub
{
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// Called to initialize the component with its configured metadata...
}
public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
{
// Send the message to the "topic"...
}
public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
// Until canceled, check the topic for messages and deliver them to the Dapr runtime...
}
}
Calls to the PullMessagesAsync()
method are “long-lived”, in that the method is not expected to return until canceled (for example, via the cancellationToken
). The “topic” from which messages should be pulled is passed via the topic
argument, while the delivery to the Dapr runtime is performed via the deliveryHandler
callback. Delivery allows the component to receive notification if/when the application (served by the Dapr runtime) acknowledges processing of the message.
public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
TimeSpan pollInterval = // Polling interval (e.g. from initalization metadata)...
// Poll the topic until canceled...
while (!cancellationToken.IsCancellationRequested)
{
var messages = // Poll topic for messages...
foreach (var message in messages)
{
// Deliver the message to the Dapr runtime...
await deliveryHandler(
new PubSubPullMessagesResponse(topicName)
{
// Set the message content...
},
// Callback invoked when application acknowledges the message...
async errorMessage =>
{
// An empty message indicates the application successfully processed the message...
if (String.IsNullOrEmpty(errorMessage))
{
// Delete the message from the topic...
}
})
}
// Wait for the next poll (or cancellation)...
await Task.Delay(pollInterval, cancellationToken);
}
}
In the main program file (for example, Program.cs
), register the pub/sub component with an application service.
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
serviceBuilder.RegisterPubSub<MyPubSub>();
});
app.Run();
Creating a state store component requires just a few basic steps.
Add using
statements for the state store related namespaces.
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.StateStore;
IStateStore
Create a class that implements the IStateStore
interface.
internal sealed class MyStateStore : IStateStore
{
public Task DeleteAsync(StateStoreDeleteRequest request, CancellationToken cancellationToken = default)
{
// Delete the requested key from the state store...
}
public Task<StateStoreGetResponse?> GetAsync(StateStoreGetRequest request, CancellationToken cancellationToken = default)
{
// Get the requested key value from from the state store, else return null...
}
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// Called to initialize the component with its configured metadata...
}
public Task SetAsync(StateStoreSetRequest request, CancellationToken cancellationToken = default)
{
// Set the requested key to the specified value in the state store...
}
}
In the main program file (for example, Program.cs
), register the state store with an application service.
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
State stores that intend to support bulk operations should implement the optional IBulkStateStore
interface. Its methods mirror those of the base IStateStore
interface, but include multiple requested values.
IBulkStateStore
by calling its operations individually.internal sealed class MyStateStore : IStateStore, IBulkStateStore
{
// ...
public Task BulkDeleteAsync(StateStoreDeleteRequest[] requests, CancellationToken cancellationToken = default)
{
// Delete all of the requested values from the state store...
}
public Task<StateStoreBulkStateItem[]> BulkGetAsync(StateStoreGetRequest[] requests, CancellationToken cancellationToken = default)
{
// Return the values of all of the requested values from the state store...
}
public Task BulkSetAsync(StateStoreSetRequest[] requests, CancellationToken cancellationToken = default)
{
// Set all of the values of the requested keys in the state store...
}
}
State stores that intend to support transactions should implement the optional ITransactionalStateStore
interface. Its TransactAsync()
method is passed a request with a sequence of delete and/or set operations to be performed within a transaction. The state store should iterate over the sequence and call each operation’s Visit()
method, passing callbacks that represent the action to take for each type of operation.
internal sealed class MyStateStore : IStateStore, ITransactionalStateStore
{
// ...
public async Task TransactAsync(StateStoreTransactRequest request, CancellationToken cancellationToken = default)
{
// Start transaction...
try
{
foreach (var operation in request.Operations)
{
await operation.Visit(
async deleteRequest =>
{
// Process delete request...
},
async setRequest =>
{
// Process set request...
});
}
}
catch
{
// Rollback transaction...
throw;
}
// Commit transaction...
}
}
State stores that intend to support queries should implement the optional IQueryableStateStore
interface. Its QueryAsync()
method is passed details about the query, such as the filter(s), result limits and pagination, and sort order(s) of the results. The state store should use those details to generate a set of values to return as part of its response.
internal sealed class MyStateStore : IStateStore, IQueryableStateStore
{
// ...
public Task<StateStoreQueryResponse> QueryAsync(StateStoreQueryRequest request, CancellationToken cancellationToken = default)
{
// Generate and return results...
}
}
The Dapr runtime has additional handling of certain error conditions resulting from some state store operations. State stores can indicate such conditions by throwing specific exceptions from its operation logic:
Exception | Applicable Operations | Description |
---|---|---|
ETagInvalidException | Delete, Set, Bulk Delete, Bulk Set | When an ETag is invalid |
ETagMismatchException | Delete, Set, Bulk Delete, Bulk Set | When an ETag does not match an expected value |
BulkDeleteRowMismatchException | Bulk Delete | When the number of affected rows does not match the expected rows |
While not typically needed by most, these guides show advanced ways to can configure your .NET pluggable components.
A .NET Dapr pluggable component application can be configured for dependency injection, logging, and configuration values similarly to ASP.NET applications. The DaprPluggableComponentsApplication
exposes a similar set of configuration properties to that exposed by WebApplicationBuilder
.
Components registered with services can participate in dependency injection. Arguments in the components constructor will be injected during creation, assuming those types have been registered with the application. You can register them through the IServiceCollection
exposed by DaprPluggableComponentsApplication
.
var app = DaprPluggableComponentsApplication.Create();
// Register MyService as the singleton implementation of IService.
app.Services.AddSingleton<IService, MyService>();
app.RegisterService(
"<service name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
interface IService
{
// ...
}
class MyService : IService
{
// ...
}
class MyStateStore : IStateStore
{
// Inject IService on creation of the state store.
public MyStateStore(IService service)
{
// ...
}
// ...
}
IServiceCollection.AddScoped()
is not recommended. Such instances’ lifetimes are bound to a single gRPC method call, which does not match the lifetime of an individual component instance..NET Dapr pluggable components can use the standard .NET logging mechanisms. The DaprPluggableComponentsApplication
exposes an ILoggingBuilder
, through which it can be configured.
ILogger<T>
) are pre-registered.var app = DaprPluggableComponentsApplication.Create();
// Reset the default loggers and setup new ones.
app.Logging.ClearProviders();
app.Logging.AddConsole();
app.RegisterService(
"<service name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
class MyStateStore : IStateStore
{
// Inject a logger on creation of the state store.
public MyStateStore(ILogger<MyStateStore> logger)
{
// ...
}
// ...
}
Since .NET pluggable components are built on ASP.NET, they can use its standard configuration mechanisms and default to the same set of pre-registered providers. The DaprPluggableComponentsApplication
exposes an IConfigurationManager
through which it can be configured.
var app = DaprPluggableComponentsApplication.Create();
// Reset the default configuration providers and add new ones.
((IConfigurationBuilder)app.Configuration).Sources.Clear();
app.Configuration.AddEnvironmentVariables();
// Get configuration value on startup.
const value = app.Configuration["<name>"];
app.RegisterService(
"<service name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
class MyStateStore : IStateStore
{
// Inject the configuration on creation of the state store.
public MyStateStore(IConfiguration configuration)
{
// ...
}
// ...
}
There are two ways to register a component:
Components registered by type are singletons: one instance will serve all configured components of that type associated with that socket. This approach is best when only a single component of that type exists and is shared amongst Dapr applications.
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"service-a",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<SingletonStateStore>();
});
app.Run();
class SingletonStateStore : IStateStore
{
// ...
}
Components can be registered by passing a “factory method”. This method will be called for each configured component of that type associated with that socket. The method returns the instance to associate with that component (whether shared or not). This approach is best when multiple components of the same type may be configured with different sets of metadata, when component operations need to be isolated from one another, etc.
The factory method will be passed context, such as the ID of the configured Dapr component, that can be used to differentiate component instances.
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"service-a",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore(
context =>
{
return new MultiStateStore(context.InstanceId);
});
});
app.Run();
class MultiStateStore : IStateStore
{
private readonly string instanceId;
public MultiStateStore(string instanceId)
{
this.instanceId = instanceId;
}
// ...
}
A pluggable component can host multiple components of varying types. You might do this:
Each Unix Domain Socket can manage calls to one component of each type. To host multiple components of the same type, you can spread those types across multiple sockets. The SDK binds each socket to a “service”, with each service composed of one or more component types.
Each call to RegisterService()
binds a socket to a set of registered components, where one of each type of component can be registered per service.
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"service-a",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyDatabaseStateStore>();
serviceBuilder.RegisterBinding<MyDatabaseOutputBinding>();
});
app.RegisterService(
"service-b",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<AnotherStateStore>();
});
app.Run();
class MyDatabaseStateStore : IStateStore
{
// ...
}
class MyDatabaseOutputBinding : IOutputBinding
{
// ...
}
class AnotherStateStore : IStateStore
{
// ...
}
Configuring Dapr to use the hosted components is the same as for any single component - the component YAML refers to the associated socket.
#
# This component uses the state store associated with socket `state-store-a`
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-a
spec:
type: state.service-a
version: v1
metadata: []
#
# This component uses the state store associated with socket `state-store-b`
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-b
spec:
type: state.service-b
version: v1
metadata: []
Dapr offers packages to help with the development of Go pluggable components.
Creating a pluggable component starts with an empty Go application.
mkdir example
cd example
go mod init example
Import the Dapr pluggable components SDK package.
go get github.com/dapr-sandbox/components-go-sdk@v0.1.0
In main.go
, import the Dapr plugggable components package and run the application.
package main
import (
dapr "github.com/dapr-sandbox/components-go-sdk"
)
func main() {
dapr.MustRun()
}
This creates an application with no components. You will need to implement and register one or more components.
Dapr communicates with pluggable components via Unix Domain Sockets files in a common directory. By default, both Dapr and pluggable components use the /tmp/dapr-components-sockets
directory. You should create this directory if it does not already exist.
mkdir /tmp/dapr-components-sockets
Pluggable components can be tested by starting the application on the command line.
To start the component, in the application directory:
go run main.go
To configure Dapr to use the component, create a component YAML file in the resources directory. For example, for a state store component:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: <component name>
spec:
type: state.<socket name>
version: v1
metadata:
- name: key1
value: value1
- name: key2
value: value2
Any metadata
properties will be passed to the component via its Store.Init(metadata state.Metadata)
method when the component is instantiated.
To start Dapr (and, optionally, the service making use of the service):
dapr run --app-id <app id> --resources-path <resources path> ...
At this point, the Dapr sidecar will have started and connected via Unix Domain Socket to the component. You can then interact with the component either:
Pluggable components are deployed as containers that run as sidecars to the application (like Dapr itself). A typical Dockerfile
for creating a Docker image for a Go application might look like:
FROM golang:1.20-alpine AS builder
WORKDIR /usr/src/app
# Download dependencies
COPY go.mod go.sum ./
RUN go mod download && go mod verify
# Build the application
COPY . .
RUN go build -v -o /usr/src/bin/app .
FROM alpine:latest
# Setup non-root user and permissions
RUN addgroup -S app && adduser -S app -G app
RUN mkdir /tmp/dapr-components-sockets && chown app /tmp/dapr-components-sockets
# Copy application to runtime image
COPY --from=builder --chown=app /usr/src/bin/app /app
USER app
CMD ["/app"]
Build the image:
docker build -f Dockerfile -t <image name>:<tag> .
COPY
operations in the Dockerfile
are relative to the Docker context passed when building the image, while the Docker context itself will vary depending on the needs of the application being built. In the example above, the assumption is that the Docker context is the component application directory.Creating a binding component requires just a few basic steps.
Create the file components/inputbinding.go
and add import
statements for the state store related packages.
package components
import (
"context"
"github.com/dapr/components-contrib/bindings"
)
InputBinding
interfaceCreate a type that implements the InputBinding
interface.
type MyInputBindingComponent struct {
}
func (component *MyInputBindingComponent) Init(meta bindings.Metadata) error {
// Called to initialize the component with its configured metadata...
}
func (component *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
// Until canceled, check the underlying store for messages and deliver them to the Dapr runtime...
}
Calls to the Read()
method are expected to set up a long-lived mechanism for retrieving messages but immediately return nil
(or an error, if that mechanism could not be set up). The mechanism should end when canceled (for example, via the ctx.Done() or ctx.Err() != nil
). As messages are read from the underlying store of the component, they are delivered to the Dapr runtime via the handler
callback, which does not return until the application (served by the Dapr runtime) acknowledges processing of the message.
func (b *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
go func() {
for {
err := ctx.Err()
if err != nil {
return
}
messages := // Poll for messages...
for _, message := range messages {
handler(ctx, &bindings.ReadResponse{
// Set the message content...
})
}
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
}
}
}()
return nil
}
OutputBinding
interfaceCreate a type that implements the OutputBinding
interface.
type MyOutputBindingComponent struct {
}
func (component *MyOutputBindingComponent) Init(meta bindings.Metadata) error {
// Called to initialize the component with its configured metadata...
}
func (component *MyOutputBindingComponent) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
// Called to invoke a specific operation...
}
func (component *MyOutputBindingComponent) Operations() []bindings.OperationKind {
// Called to list the operations that can be invoked.
}
A component can be both an input and output binding. Simply implement both interfaces and register the component as both binding types.
In the main application file (for example, main.go
), register the binding component with the application.
package main
import (
"example/components"
dapr "github.com/dapr-sandbox/components-go-sdk"
"github.com/dapr-sandbox/components-go-sdk/bindings/v1"
)
func main() {
// Register an import binding...
dapr.Register("my-inputbinding", dapr.WithInputBinding(func() bindings.InputBinding {
return &components.MyInputBindingComponent{}
}))
// Register an output binding...
dapr.Register("my-outputbinding", dapr.WithOutputBinding(func() bindings.OutputBinding {
return &components.MyOutputBindingComponent{}
}))
dapr.MustRun()
}
Creating a pub/sub component requires just a few basic steps.
Create the file components/pubsub.go
and add import
statements for the pub/sub related packages.
package components
import (
"context"
"github.com/dapr/components-contrib/pubsub"
)
PubSub
interfaceCreate a type that implements the PubSub
interface.
type MyPubSubComponent struct {
}
func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
// Called to initialize the component with its configured metadata...
}
func (component *MyPubSubComponent) Close() error {
// Not used with pluggable components...
return nil
}
func (component *MyPubSubComponent) Features() []pubsub.Feature {
// Return a list of features supported by the component...
}
func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
// Send the message to the "topic"...
}
func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
// Until canceled, check the topic for messages and deliver them to the Dapr runtime...
}
Calls to the Subscribe()
method are expected to set up a long-lived mechanism for retrieving messages but immediately return nil
(or an error, if that mechanism could not be set up). The mechanism should end when canceled (for example, via the ctx.Done()
or ctx.Err() != nil
). The “topic” from which messages should be pulled is passed via the req
argument, while the delivery to the Dapr runtime is performed via the handler
callback. The callback doesn’t return until the application (served by the Dapr runtime) acknowledges processing of the message.
func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
go func() {
for {
err := ctx.Err()
if err != nil {
return
}
messages := // Poll for messages...
for _, message := range messages {
handler(ctx, &pubsub.NewMessage{
// Set the message content...
})
}
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
}
}
}()
return nil
}
In the main application file (for example, main.go
), register the pub/sub component with the application.
package main
import (
"example/components"
dapr "github.com/dapr-sandbox/components-go-sdk"
"github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
)
func main() {
dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
return &components.MyPubSubComponent{}
}))
dapr.MustRun()
}
Creating a state store component requires just a few basic steps.
Create the file components/statestore.go
and add import
statements for the state store related packages.
package components
import (
"context"
"github.com/dapr/components-contrib/state"
)
Store
interfaceCreate a type that implements the Store
interface.
type MyStateStore struct {
}
func (store *MyStateStore) Init(metadata state.Metadata) error {
// Called to initialize the component with its configured metadata...
}
func (store *MyStateStore) GetComponentMetadata() map[string]string {
// Not used with pluggable components...
return map[string]string{}
}
func (store *MyStateStore) Features() []state.Feature {
// Return a list of features supported by the state store...
}
func (store *MyStateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
// Delete the requested key from the state store...
}
func (store *MyStateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
// Get the requested key value from the state store, else return an empty response...
}
func (store *MyStateStore) Set(ctx context.Context, req *state.SetRequest) error {
// Set the requested key to the specified value in the state store...
}
func (store *MyStateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// Get the requested key values from the state store...
}
func (store *MyStateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
// Delete the requested keys from the state store...
}
func (store *MyStateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
// Set the requested keys to their specified values in the state store...
}
In the main application file (for example, main.go
), register the state store with an application service.
package main
import (
"example/components"
dapr "github.com/dapr-sandbox/components-go-sdk"
"github.com/dapr-sandbox/components-go-sdk/state/v1"
)
func main() {
dapr.Register("<socket name>", dapr.WithStateStore(func() state.Store {
return &components.MyStateStoreComponent{}
}))
dapr.MustRun()
}
While state stores are required to support the bulk operations, their implementations sequentially delegate to the individual operation methods.
State stores that intend to support transactions should implement the optional TransactionalStore
interface. Its Multi()
method receives a request with a sequence of delete
and/or set
operations to be performed within a transaction. The state store should iterate over the sequence and apply each operation.
func (store *MyStateStoreComponent) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
// Start transaction...
for _, operation := range request.Operations {
switch operation.Operation {
case state.Delete:
deleteRequest := operation.Request.(state.DeleteRequest)
// Process delete request...
case state.Upsert:
setRequest := operation.Request.(state.SetRequest)
// Process set request...
}
}
// End (or rollback) transaction...
return nil
}
State stores that intend to support queries should implement the optional Querier
interface. Its Query()
method is passed details about the query, such as the filter(s), result limits, pagination, and sort order(s) of the results. The state store uses those details to generate a set of values to return as part of its response.
func (store *MyStateStoreComponent) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
// Generate and return results...
}
The Dapr runtime has additional handling of certain error conditions resulting from some state store operations. State stores can indicate such conditions by returning specific errors from its operation logic:
Error | Applicable Operations | Description |
---|---|---|
NewETagError(state.ETagInvalid, ...) | Delete, Set, Bulk Delete, Bulk Set | When an ETag is invalid |
NewETagError(state.ETagMismatch, ...) | Delete, Set, Bulk Delete, Bulk Set | When an ETag does not match an expected value |
NewBulkDeleteRowMismatchError(...) | Bulk Delete | When the number of affected rows does not match the expected rows |
While not typically needed by most, these guides show advanced ways you can configure your Go pluggable components.
Pluggable components are registered by passing a “factory method” that is called for each configured Dapr component of that type associated with that socket. The method returns the instance associated with that Dapr component (whether shared or not). This allows multiple Dapr components of the same type to be configured with different sets of metadata, when component operations need to be isolated from one another, etc.
Each call to Register()
binds a socket to a registered pluggable component. One of each component type (input/output binding, pub/sub, and state store) can be registered per socket.
func main() {
dapr.Register("service-a", dapr.WithStateStore(func() state.Store {
return &components.MyDatabaseStoreComponent{}
}))
dapr.Register("service-a", dapr.WithOutputBinding(func() bindings.OutputBinding {
return &components.MyDatabaseOutputBindingComponent{}
}))
dapr.Register("service-b", dapr.WithStateStore(func() state.Store {
return &components.MyDatabaseStoreComponent{}
}))
dapr.MustRun()
}
In the example above, a state store and output binding is registered with the socket service-a
while another state store is registered with the socket service-b
.
Configuring Dapr to use the hosted components is the same as for any single component - the component YAML refers to the associated socket. For example, to configure Dapr state stores for the two components registered above (to sockets service-a
and service-b
), you create two configuration files, each referencing their respective socket.
#
# This component uses the state store associated with socket `service-a`
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-a
spec:
type: state.service-a
version: v1
metadata: []
#
# This component uses the state store associated with socket `service-b`
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-b
spec:
type: state.service-b
version: v1
metadata: []