This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

使用 Dapr 可插拔组件 Go SDK 入门

如何使用 Dapr 可插拔组件 Go SDK 快速上手

Dapr 提供了一些工具包,帮助开发者创建 Go 可插拔组件。

前置条件

创建应用程序

要创建一个可插拔组件,首先需要一个空的 Go 应用程序。

mkdir example
cd component
go mod init example

导入 Dapr 包

导入 Dapr 可插拔组件 SDK 包。

go get github.com/dapr-sandbox/components-go-sdk@v0.1.0

创建主包

main.go 中,导入 Dapr 可插拔组件包并运行应用程序。

package main

import (
	dapr "github.com/dapr-sandbox/components-go-sdk"
)

func main() {
	dapr.MustRun()
}

这会创建一个没有组件的应用程序,您需要实现并注册一个或多个组件。

实现和注册组件

本地测试组件

创建 Dapr 组件套接字目录

Dapr 通过 Unix 域套接字文件在一个公共目录中与可插拔组件通信。默认情况下,Dapr 和可插拔组件都使用 /tmp/dapr-components-sockets 目录。如果该目录尚不存在,您应该创建它。

mkdir /tmp/dapr-components-sockets

启动可插拔组件

可以通过在命令行启动应用程序来测试可插拔组件。

要启动组件,在应用程序目录中:

go run main.go

配置 Dapr 使用可插拔组件

要配置 Dapr 使用组件,请在资源目录中创建一个组件 YAML 文件。例如,对于一个状态存储组件:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: <component name>
spec:
  type: state.<socket name>
  version: v1
  metadata:
  - name: key1
    value: value1
  - name: key2
    value: value2

任何 metadata 属性将在组件实例化时通过其 Store.Init(metadata state.Metadata) 方法传递给组件。

启动 Dapr

要启动 Dapr(以及可选的使用该服务的服务):

dapr run --app-id <app id> --resources-path <resources path> ...

此时,Dapr sidecar 将已启动并通过 Unix 域套接字连接到组件。然后,您可以通过以下方式与组件交互:

  • 通过使用该组件的服务(如果已启动),或
  • 直接使用 Dapr HTTP 或 gRPC API

创建容器

可插拔组件作为容器部署,作为应用程序的 sidecar 运行(如同 Dapr 本身)。一个典型的用于创建 Go 应用程序 Docker 镜像的 Dockerfile 可能如下所示:

FROM golang:1.20-alpine AS builder

WORKDIR /usr/src/app

# 下载依赖
COPY go.mod go.sum ./
RUN go mod download && go mod verify

# 构建应用程序
COPY . .
RUN go build -v -o /usr/src/bin/app .

FROM alpine:latest

# 设置非 root 用户和权限
RUN addgroup -S app && adduser -S app -G app
RUN mkdir /tmp/dapr-components-sockets && chown app /tmp/dapr-components-sockets

# 将应用程序复制到运行时镜像
COPY --from=builder --chown=app /usr/src/bin/app /app

USER app

CMD ["/app"]

构建镜像:

docker build -f Dockerfile -t <image name>:<tag> .

下一步

1 - 实现一个 Go pub/sub 组件

如何使用 Dapr 可插拔组件 Go SDK 创建一个 pub/sub 组件

创建一个 pub/sub 组件只需几个基本步骤。

导入 pub/sub 包

创建文件 components/pubsub.go 并添加 import 语句以导入与 pub/sub 相关的包。

package components

import (
	"context"
	"github.com/dapr/components-contrib/pubsub"
)

实现 PubSub 接口

创建一个实现 PubSub 接口的类型。

type MyPubSubComponent struct {
}

func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
	// 使用配置的元数据初始化组件...
}

func (component *MyPubSubComponent) Close() error {
    // 不用于可插拔组件...
	return nil
}

func (component *MyPubSubComponent) Features() []pubsub.Feature {
	// 返回组件支持的功能列表...
}

func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
	// 将消息发送到 "topic"...
}

func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
	// 设置一个长时间运行的机制来检索消息,直到取消为止,并将其传递给 Dapr 运行时...
}

调用 Subscribe() 方法时,需要设置一个长时间运行的机制来检索消息,并立即返回 nil(如果无法设置该机制,则返回错误)。该机制应在取消时结束(例如,通过 ctx.Done()ctx.Err() != nil)。消息的 “topic” 是通过 req 参数传递的,而传递给 Dapr 运行时的消息则通过 handler 回调来处理。回调在应用程序(由 Dapr 运行时服务)确认处理消息之前不会返回。

func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
	go func() {
		for {
			if ctx.Err() != nil {
				return
			}
	
			messages := // 轮询消息...

            for _, message := range messages {
                handler(ctx, &pubsub.NewMessage{
                    // 设置消息内容...
                })
            }

			select {
				case <-ctx.Done():
				case <-time.After(5 * time.Second):
			} 
		}
	}()

	return nil
}

注册 pub/sub 组件

在主应用程序文件中(例如,main.go),注册 pub/sub 组件。

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
)

func main() {
	dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
		return &components.MyPubSubComponent{}
	}))

	dapr.MustRun()
}

下一步

2 - 实现一个Go输入/输出绑定组件

如何使用Dapr可插拔组件Go SDK创建一个输入/输出绑定

创建绑定组件只需几个基本步骤。

导入绑定包

创建文件 components/inputbinding.go 并添加与状态存储相关的包的 import 语句。

package components

import (
	"context"
	"github.com/dapr/components-contrib/bindings"
)

输入绑定:实现 InputBinding 接口

创建一个实现 InputBinding 接口的类型。

type MyInputBindingComponent struct {
}

func (component *MyInputBindingComponent) Init(meta bindings.Metadata) error {
	// 用于初始化组件的配置元数据...
}

func (component *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
	// 设置一个长期机制来检索消息,直到取消为止...
}

调用 Read() 方法时,应该设置一个长期运行的机制来检索消息,并立即返回 nil(如果无法设置该机制,则返回错误)。当取消时(例如,通过 ctx.Done()ctx.Err() != nil),该机制应停止。当从组件的底层存储读取消息时,它们通过 handler 回调传递给Dapr运行时,直到应用程序(由Dapr运行时服务)确认消息处理后才返回。

func (b *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
	go func() {
		for {
			if ctx.Err() != nil {
				return
			}
	
			messages := // 轮询消息...

            for _, message := range messages {
                handler(ctx, &bindings.ReadResponse{
                    // 设置消息内容...
                })
            }

			select {
				case <-ctx.Done():
				case <-time.After(5 * time.Second):
			} 
		}
	}()

	return nil
}

输出绑定:实现 OutputBinding 接口

创建一个实现 OutputBinding 接口的类型。

type MyOutputBindingComponent struct {
}

func (component *MyOutputBindingComponent) Init(meta bindings.Metadata) error {
	// 用于初始化组件的配置元数据...
}

func (component *MyOutputBindingComponent) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
	// 调用特定操作时执行...
}

func (component *MyOutputBindingComponent) Operations() []bindings.OperationKind {
	// 列出可以调用的操作。
}

输入和输出绑定组件

一个组件可以同时作为输入和输出绑定。只需实现两个接口,并将组件注册为两种绑定类型即可。

注册绑定组件

在主应用程序文件中(例如,main.go),将绑定组件注册到应用程序中。

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/bindings/v1"
)

func main() {
	// 注册一个输入绑定...
	dapr.Register("my-inputbinding", dapr.WithInputBinding(func() bindings.InputBinding {
		return &components.MyInputBindingComponent{}
	}))

	// 注册一个输出绑定...
	dapr.Register("my-outputbinding", dapr.WithOutputBinding(func() bindings.OutputBinding {
		return &components.MyOutputBindingComponent{}
	}))

	dapr.MustRun()
}

下一步

3 - 实现一个 Go 状态存储组件

如何使用 Dapr 可插拔组件 Go SDK 创建一个状态存储

创建状态存储组件只需几个基本步骤。

导入状态存储包

创建文件 components/statestore.go 并添加与状态存储相关的包的 import 语句。

package components

import (
	"context"
	"github.com/dapr/components-contrib/state"
)

实现 Store 接口

创建一个实现 Store 接口的类型。

type MyStateStore struct {
}

func (store *MyStateStore) Init(metadata state.Metadata) error {
	// 使用配置的元数据初始化组件...
}

func (store *MyStateStore) GetComponentMetadata() map[string]string {
    // 不用于可插拔组件...
	return map[string]string{}
}

func (store *MyStateStore) Features() []state.Feature {
	// 返回状态存储支持的功能列表...
}

func (store *MyStateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
	// 从状态存储中删除请求的键...
}

func (store *MyStateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
	// 从状态存储中获取请求的键值,否则返回空响应...
}

func (store *MyStateStore) Set(ctx context.Context, req *state.SetRequest) error {
	// 在状态存储中将请求的键设置为指定的值...
}

func (store *MyStateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
	// 从状态存储中获取请求的键值...
}

func (store *MyStateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
	// 从状态存储中删除请求的键...
}

func (store *MyStateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
	// 在状态存储中将请求的键设置为其指定的值...
}

注册状态存储组件

在主应用程序文件(例如,main.go)中,将状态存储注册到应用程序服务中。

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/state/v1"
)

func main() {
	dapr.Register("<socket name>", dapr.WithStateStore(func() state.Store {
		return &components.MyStateStoreComponent{}
	}))

	dapr.MustRun()
}

批量操作的状态存储

虽然状态存储需要支持批量操作,但它们的实现会顺序委托给各个操作方法。

事务性状态存储

如果状态存储计划支持事务,则应实现可选的 TransactionalStore 接口。其 Multi() 方法接收一个包含一系列 delete 和/或 set 操作的请求,以在事务中执行。状态存储应遍历序列并应用每个操作。

func (store *MyStateStoreComponent) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
    // 开始事务...

    for _, operation := range request.Operations {
		switch operation.Operation {
		case state.Delete:
			deleteRequest := operation.Request.(state.DeleteRequest)
			// 处理删除请求...
		case state.Upsert:
			setRequest := operation.Request.(state.SetRequest)
			// 处理设置请求...
		}
	}

    // 结束(或回滚)事务...

	return nil
}

可查询的状态存储

如果状态存储计划支持查询,则应实现可选的 Querier 接口。其 Query() 方法传递有关查询的详细信息,例如过滤器、结果限制、分页和结果的排序顺序。状态存储使用这些详细信息生成一组值作为响应的一部分返回。

func (store *MyStateStoreComponent) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
	// 生成并返回结果...
}

ETag 和其他错误处理

Dapr 运行时对某些状态存储操作导致的特定错误条件有额外的处理。状态存储可以通过从其操作逻辑中返回特定错误来指示这些条件:

错误适用操作描述
NewETagError(state.ETagInvalid, ...)Delete, Set, Bulk Delete, Bulk Set当 ETag 无效时
NewETagError(state.ETagMismatch, ...)Delete, Set, Bulk Delete, Bulk Set当 ETag 与预期值不匹配时
NewBulkDeleteRowMismatchError(...)Bulk Delete当受影响的行数与预期行数不匹配时

下一步

4 - Dapr 可插拔组件 Go SDK 的高级用法

如何使用 Dapr 可插拔组件 Go SDK 的高级技术

尽管大多数情况下不需要使用这些高级配置方法,但本指南将展示如何在 Go 中配置 Dapr 的可插拔组件。

组件生命周期

可插拔组件通过传递一个“工厂方法”进行注册,该方法会在每个与 socket 关联的 Dapr 组件配置中被调用。这个方法返回与该 Dapr 组件相关联的实例(无论是否共享)。这使得多个相同类型的 Dapr 组件可以使用不同的元数据集进行配置,尤其是在需要组件操作相互隔离的情况下。

注册多个服务

每次调用 Register() 都会将一个 socket 绑定到一个注册的可插拔组件。每种组件类型(输入/输出绑定、pub/sub 和状态存储)可以在每个 socket 上注册一个。

func main() {
	dapr.Register("service-a", dapr.WithStateStore(func() state.Store {
		return &components.MyDatabaseStoreComponent{}
	}))

	dapr.Register("service-a", dapr.WithOutputBinding(func() bindings.OutputBinding {
		return &components.MyDatabaseOutputBindingComponent{}
	}))

	dapr.Register("service-b", dapr.WithStateStore(func() state.Store {
		return &components.MyDatabaseStoreComponent{}
	}))

	dapr.MustRun()
}

在上面的示例中,状态存储和输出绑定被注册到 socket service-a,而另一个状态存储被注册到 socket service-b

配置多个组件

配置 Dapr 使用托管组件与配置单个组件的方式相同 - 组件的 YAML 文件中需要指明关联的 socket。例如,要为上面注册的两个组件(分别注册到 socket service-aservice-b)配置 Dapr 状态存储,您需要创建两个配置文件,每个文件引用其各自的 socket。

#
# 此组件使用与 socket `service-a` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-a
spec:
  type: state.service-a
  version: v1
  metadata: []
#
# 此组件使用与 socket `service-b` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-b
spec:
  type: state.service-b
  version: v1
  metadata: []

下一步