This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Getting started with the Dapr pluggable components Go SDK

How to get up and running with the Dapr pluggable components Go SDK

Dapr offers packages to help with the development of Go pluggable components.

Prerequisites

Application creation

Creating a pluggable component starts with an empty Go application.

mkdir example
cd example
go mod init example

Import Dapr packages

Import the Dapr pluggable components SDK package.

go get github.com/dapr-sandbox/components-go-sdk@v0.1.0

Create main package

In main.go, import the Dapr plugggable components package and run the application.

package main

import (
	dapr "github.com/dapr-sandbox/components-go-sdk"
)

func main() {
	dapr.MustRun()
}

This creates an application with no components. You will need to implement and register one or more components.

Implement and register components

Test components locally

Create the Dapr components socket directory

Dapr communicates with pluggable components via Unix Domain Sockets files in a common directory. By default, both Dapr and pluggable components use the /tmp/dapr-components-sockets directory. You should create this directory if it does not already exist.

mkdir /tmp/dapr-components-sockets

Start the pluggable component

Pluggable components can be tested by starting the application on the command line.

To start the component, in the application directory:

go run main.go

Configure Dapr to use the pluggable component

To configure Dapr to use the component, create a component YAML file in the resources directory. For example, for a state store component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: <component name>
spec:
  type: state.<socket name>
  version: v1
  metadata:
  - name: key1
    value: value1
  - name: key2
    value: value2

Any metadata properties will be passed to the component via its Store.Init(metadata state.Metadata) method when the component is instantiated.

Start Dapr

To start Dapr (and, optionally, the service making use of the service):

dapr run --app-id <app id> --resources-path <resources path> ...

At this point, the Dapr sidecar will have started and connected via Unix Domain Socket to the component. You can then interact with the component either:

  • Through the service using the component (if started), or
  • By using the Dapr HTTP or gRPC API directly

Create container

Pluggable components are deployed as containers that run as sidecars to the application (like Dapr itself). A typical Dockerfile for creating a Docker image for a Go application might look like:

FROM golang:1.20-alpine AS builder

WORKDIR /usr/src/app

# Download dependencies
COPY go.mod go.sum ./
RUN go mod download && go mod verify

# Build the application
COPY . .
RUN go build -v -o /usr/src/bin/app .

FROM alpine:latest

# Setup non-root user and permissions
RUN addgroup -S app && adduser -S app -G app
RUN mkdir /tmp/dapr-components-sockets && chown app /tmp/dapr-components-sockets

# Copy application to runtime image
COPY --from=builder --chown=app /usr/src/bin/app /app

USER app

CMD ["/app"]

Build the image:

docker build -f Dockerfile -t <image name>:<tag> .

Next steps

1 - Implementing a Go input/output binding component

How to create an input/output binding with the Dapr pluggable components Go SDK

Creating a binding component requires just a few basic steps.

Import bindings packages

Create the file components/inputbinding.go and add import statements for the state store related packages.

package components

import (
	"context"
	"github.com/dapr/components-contrib/bindings"
)

Input bindings: Implement the InputBinding interface

Create a type that implements the InputBinding interface.

type MyInputBindingComponent struct {
}

func (component *MyInputBindingComponent) Init(meta bindings.Metadata) error {
	// Called to initialize the component with its configured metadata...
}

func (component *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
	// Until canceled, check the underlying store for messages and deliver them to the Dapr runtime...
}

Calls to the Read() method are expected to set up a long-lived mechanism for retrieving messages but immediately return nil (or an error, if that mechanism could not be set up). The mechanism should end when canceled (for example, via the ctx.Done() or ctx.Err() != nil). As messages are read from the underlying store of the component, they are delivered to the Dapr runtime via the handler callback, which does not return until the application (served by the Dapr runtime) acknowledges processing of the message.

func (b *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
	go func() {
		for {
			err := ctx.Err()

			if err != nil {
				return
			}
	
			messages := // Poll for messages...

            for _, message := range messages {
                handler(ctx, &bindings.ReadResponse{
                    // Set the message content...
                })
            }

			select {
				case <-ctx.Done():
				case <-time.After(5 * time.Second):
			} 
		}
	}()

	return nil
}

Output bindings: Implement the OutputBinding interface

Create a type that implements the OutputBinding interface.

type MyOutputBindingComponent struct {
}

func (component *MyOutputBindingComponent) Init(meta bindings.Metadata) error {
	// Called to initialize the component with its configured metadata...
}

func (component *MyOutputBindingComponent) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
	// Called to invoke a specific operation...
}

func (component *MyOutputBindingComponent) Operations() []bindings.OperationKind {
	// Called to list the operations that can be invoked.
}

Input and output binding components

A component can be both an input and output binding. Simply implement both interfaces and register the component as both binding types.

Register binding component

In the main application file (for example, main.go), register the binding component with the application.

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/bindings/v1"
)

func main() {
	// Register an import binding...
	dapr.Register("my-inputbinding", dapr.WithInputBinding(func() bindings.InputBinding {
		return &components.MyInputBindingComponent{}
	}))

	// Register an output binding...
	dapr.Register("my-outputbinding", dapr.WithOutputBinding(func() bindings.OutputBinding {
		return &components.MyOutputBindingComponent{}
	}))

	dapr.MustRun()
}

Next steps

2 - Implementing a Go pub/sub component

How to create a pub/sub component with the Dapr pluggable components Go SDK

Creating a pub/sub component requires just a few basic steps.

Import pub/sub packages

Create the file components/pubsub.go and add import statements for the pub/sub related packages.

package components

import (
	"context"
	"github.com/dapr/components-contrib/pubsub"
)

Implement the PubSub interface

Create a type that implements the PubSub interface.

type MyPubSubComponent struct {
}

func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
	// Called to initialize the component with its configured metadata...
}

func (component *MyPubSubComponent) Close() error {
    // Not used with pluggable components...
	return nil
}

func (component *MyPubSubComponent) Features() []pubsub.Feature {
	// Return a list of features supported by the component...
}

func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
	// Send the message to the "topic"...
}

func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
	// Until canceled, check the topic for messages and deliver them to the Dapr runtime...
}

Calls to the Subscribe() method are expected to set up a long-lived mechanism for retrieving messages but immediately return nil (or an error, if that mechanism could not be set up). The mechanism should end when canceled (for example, via the ctx.Done() or ctx.Err() != nil). The “topic” from which messages should be pulled is passed via the req argument, while the delivery to the Dapr runtime is performed via the handler callback. The callback doesn’t return until the application (served by the Dapr runtime) acknowledges processing of the message.

func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
	go func() {
		for {
			err := ctx.Err()

			if err != nil {
				return
			}
	
			messages := // Poll for messages...

            for _, message := range messages {
                handler(ctx, &pubsub.NewMessage{
                    // Set the message content...
                })
            }

			select {
				case <-ctx.Done():
				case <-time.After(5 * time.Second):
			} 
		}
	}()

	return nil
}

Register pub/sub component

In the main application file (for example, main.go), register the pub/sub component with the application.

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
)

func main() {
	dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
		return &components.MyPubSubComponent{}
	}))

	dapr.MustRun()
}

Next steps

3 - Implementing a Go state store component

How to create a state store with the Dapr pluggable components Go SDK

Creating a state store component requires just a few basic steps.

Import state store packages

Create the file components/statestore.go and add import statements for the state store related packages.

package components

import (
	"context"
	"github.com/dapr/components-contrib/state"
)

Implement the Store interface

Create a type that implements the Store interface.

type MyStateStore struct {
}

func (store *MyStateStore) Init(metadata state.Metadata) error {
	// Called to initialize the component with its configured metadata...
}

func (store *MyStateStore) GetComponentMetadata() map[string]string {
    // Not used with pluggable components...
	return map[string]string{}
}

func (store *MyStateStore) Features() []state.Feature {
	// Return a list of features supported by the state store...
}

func (store *MyStateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
	// Delete the requested key from the state store...
}

func (store *MyStateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
	// Get the requested key value from the state store, else return an empty response...
}

func (store *MyStateStore) Set(ctx context.Context, req *state.SetRequest) error {
	// Set the requested key to the specified value in the state store...
}

func (store *MyStateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
	// Get the requested key values from the state store...
}

func (store *MyStateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
	// Delete the requested keys from the state store...
}

func (store *MyStateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
	// Set the requested keys to their specified values in the state store...
}

Register state store component

In the main application file (for example, main.go), register the state store with an application service.

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/state/v1"
)

func main() {
	dapr.Register("<socket name>", dapr.WithStateStore(func() state.Store {
		return &components.MyStateStoreComponent{}
	}))

	dapr.MustRun()
}

Bulk state stores

While state stores are required to support the bulk operations, their implementations sequentially delegate to the individual operation methods.

Transactional state stores

State stores that intend to support transactions should implement the optional TransactionalStore interface. Its Multi() method receives a request with a sequence of delete and/or set operations to be performed within a transaction. The state store should iterate over the sequence and apply each operation.

func (store *MyStateStoreComponent) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
    // Start transaction...

    for _, operation := range request.Operations {
		switch operation.Operation {
		case state.Delete:
			deleteRequest := operation.Request.(state.DeleteRequest)
			// Process delete request...
		case state.Upsert:
			setRequest := operation.Request.(state.SetRequest)
			// Process set request...
		}
	}

    // End (or rollback) transaction...

	return nil
}

Queryable state stores

State stores that intend to support queries should implement the optional Querier interface. Its Query() method is passed details about the query, such as the filter(s), result limits, pagination, and sort order(s) of the results. The state store uses those details to generate a set of values to return as part of its response.

func (store *MyStateStoreComponent) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
	// Generate and return results...
}

ETag and other semantic error handling

The Dapr runtime has additional handling of certain error conditions resulting from some state store operations. State stores can indicate such conditions by returning specific errors from its operation logic:

ErrorApplicable OperationsDescription
NewETagError(state.ETagInvalid, ...)Delete, Set, Bulk Delete, Bulk SetWhen an ETag is invalid
NewETagError(state.ETagMismatch, ...)Delete, Set, Bulk Delete, Bulk SetWhen an ETag does not match an expected value
NewBulkDeleteRowMismatchError(...)Bulk DeleteWhen the number of affected rows does not match the expected rows

Next steps

4 - Advanced uses of the Dapr pluggable components .Go SDK

How to use advanced techniques with the Dapr pluggable components Go SDK

While not typically needed by most, these guides show advanced ways you can configure your Go pluggable components.

Component lifetime

Pluggable components are registered by passing a “factory method” that is called for each configured Dapr component of that type associated with that socket. The method returns the instance associated with that Dapr component (whether shared or not). This allows multiple Dapr components of the same type to be configured with different sets of metadata, when component operations need to be isolated from one another, etc.

Registering multiple services

Each call to Register() binds a socket to a registered pluggable component. One of each component type (input/output binding, pub/sub, and state store) can be registered per socket.

func main() {
	dapr.Register("service-a", dapr.WithStateStore(func() state.Store {
		return &components.MyDatabaseStoreComponent{}
	}))

	dapr.Register("service-a", dapr.WithOutputBinding(func() bindings.OutputBinding {
		return &components.MyDatabaseOutputBindingComponent{}
	}))

	dapr.Register("service-b", dapr.WithStateStore(func() state.Store {
		return &components.MyDatabaseStoreComponent{}
	}))

	dapr.MustRun()
}

In the example above, a state store and output binding is registered with the socket service-a while another state store is registered with the socket service-b.

Configuring Multiple Components

Configuring Dapr to use the hosted components is the same as for any single component - the component YAML refers to the associated socket. For example, to configure Dapr state stores for the two components registered above (to sockets service-a and service-b), you create two configuration files, each referencing their respective socket.

#
# This component uses the state store associated with socket `service-a`
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-a
spec:
  type: state.service-a
  version: v1
  metadata: []
#
# This component uses the state store associated with socket `service-b`
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-b
spec:
  type: state.service-b
  version: v1
  metadata: []

Next steps