1 - 可插拔组件概述

可插拔组件的结构和支持的组件类型概述

可插拔组件是指那些不包含在运行时中的组件,与 dapr init 中的内置组件相对。您可以配置 Dapr 使用这些可插拔组件,它们利用构建块 API,但注册方式与内置 Dapr 组件不同。

可插拔组件与内置组件

Dapr 提供了两种注册和创建组件的方法:

这两种注册选项虽然都利用了 Dapr 的构建块 API,但实现过程不同。

组件详情内置组件可插拔组件
语言只能用 Go 编写可以用任何支持 gRPC 的语言编写
运行位置作为 Dapr 运行时可执行文件的一部分作为 pod 中的独立进程或容器运行,与 Dapr 本身分开运行。
与 Dapr 的注册方式包含在 Dapr 代码库中通过 Unix 域套接字(使用 gRPC)与 Dapr 注册
分发随 Dapr 版本发布。组件的新功能与 Dapr 版本保持一致独立于 Dapr 本身分发。可以在需要时添加新功能,并遵循自己的发布周期。
组件激活方式Dapr 启动运行组件(自动)用户启动组件(手动)

为什么创建可插拔组件?

在以下场景中,可插拔组件非常有用:

  • 您需要一个私有组件。
  • 您希望将组件与 Dapr 的发布过程分开。
  • 您对 Go 不太熟悉,或者用 Go 实现组件并不理想。

特性

实现可插拔组件

要实现可插拔组件,您需要在组件中实现一个 gRPC 服务。实现 gRPC 服务需要三个步骤:

  1. 找到 proto 定义文件
  2. 创建服务脚手架
  3. 定义服务

了解更多关于如何开发和实现可插拔组件

为组件利用多个构建块

除了从同一组件实现多个 gRPC 服务(例如 StateStoreQueriableStateStoreTransactionalStateStore 等),可插拔组件还可以为其他组件接口提供实现。这意味着单个可插拔组件可以同时作为状态存储、pub/sub 和输入或输出绑定工作。换句话说,您可以将多个组件接口实现到一个可插拔组件中,并将其作为 gRPC 服务公开。

虽然在同一个可插拔组件上公开多个组件接口降低了部署多个组件的操作负担,但这使得实现和调试组件变得更加困难。如果不确定,请坚持“关注点分离”,仅在必要时将多个组件接口合并到同一个可插拔组件中。

如何使可插拔组件投入使用

内置组件和可插拔组件有一个共同点:都需要一个组件规范。内置组件不需要任何额外步骤即可使用:Dapr 已准备好自动使用它们。

相反,可插拔组件在与 Dapr 通信之前需要额外的步骤。您需要首先运行组件并促进 Dapr-组件通信以启动注册过程。

下一步

2 - 如何:实现可插拔组件

学习如何编写和实现可插拔组件

在本指南中,您将学习实现可插拔组件的原因和方法。要了解如何配置和注册可插拔组件,请参阅如何:注册可插拔组件

实现可插拔组件

要实现可插拔组件,需在组件中实现 gRPC 服务。实现 gRPC 服务需要三个步骤:

找到 proto 定义文件

每个支持的服务接口(如状态存储、发布订阅、绑定、密钥存储)都提供了 proto 定义。

目前支持以下组件 API:

  • 状态存储
  • 发布订阅
  • 绑定
  • 密钥存储
组件类型gRPC 定义内置参考实现文档
状态存储statestate.protoRedis概念, 如何, API 规范
发布订阅pubsubpubsub.protoRedis概念, 如何, API 规范
绑定bindingsbindings.protoKafka概念, 输入如何, 输出如何, API 规范
密钥存储secretstoressecretstore.protoHashicorp/Vault概念, 如何-secrets, API 规范

以下是可插拔组件状态存储的 gRPC 服务定义片段([state.proto]):

// StateStore 服务为状态存储组件提供 gRPC 接口。
service StateStore {
  // 使用给定的元数据初始化状态存储组件。
  rpc Init(InitRequest) returns (InitResponse) {}
  // 返回已实现的状态存储功能列表。
  rpc Features(FeaturesRequest) returns (FeaturesResponse) {}
  // Ping 状态存储。用于活跃性目的。
  rpc Ping(PingRequest) returns (PingResponse) {}
  
  // 从状态存储中删除指定的键。
  rpc Delete(DeleteRequest) returns (DeleteResponse) {}
  // 从给定的键获取数据。
  rpc Get(GetRequest) returns (GetResponse) {}
  // 设置指定键的值。
  rpc Set(SetRequest) returns (SetResponse) {}

  // 一次删除多个键。
  rpc BulkDelete(BulkDeleteRequest) returns (BulkDeleteResponse) {}
  // 一次检索多个键。
  rpc BulkGet(BulkGetRequest) returns (BulkGetResponse) {}
  // 一次设置多个键的值。
  rpc BulkSet(BulkSetRequest) returns (BulkSetResponse) {}
}

StateStore 服务接口总共公开了 9 个方法:

  • 2 个用于初始化和组件能力广告的方法(Init 和 Features)
  • 1 个用于健康或活跃性检查的方法(Ping)
  • 3 个用于 CRUD 的方法(Get、Set、Delete)
  • 3 个用于批量 CRUD 操作的方法(BulkGet、BulkSet、BulkDelete)

创建服务脚手架

使用 protocol buffers 和 gRPC 工具生成服务的脚手架。通过 gRPC 概念文档了解更多关于这些工具的信息。

这些工具生成针对任何 gRPC 支持的语言的代码。此代码作为您的服务器的基础,并提供:

  • 处理客户端调用的功能
  • 基础设施以:
    • 解码传入请求
    • 执行服务方法
    • 编码服务响应

生成的代码是不完整的。它缺少:

  • 您的目标服务定义的方法的具体实现(您可插拔组件的核心)。
  • 关于如何处理 Unix Socket Domain 集成的代码,这是 Dapr 特有的。
  • 处理与下游服务集成的代码。

在下一步中了解更多关于填补这些空白的信息。

定义服务

提供所需服务的具体实现。每个组件都有一个 gRPC 服务定义,用于其核心功能,与核心组件接口相同。例如:

  • 状态存储

    可插拔状态存储必须提供 StateStore 服务接口的实现。

    除了这个核心功能外,一些组件可能还会在其他可选服务下公开功能。例如,您可以通过定义 QueriableStateStore 服务和 TransactionalStateStore 服务的实现来添加额外功能。

  • 发布订阅

    可插拔发布订阅组件只有一个核心服务接口定义 pubsub.proto。它们没有可选的服务接口。

  • 绑定

    可插拔输入和输出绑定在 bindings.proto 上有一个核心服务定义。它们没有可选的服务接口。

  • 密钥存储

    可插拔密钥存储在 secretstore.proto 上有一个核心服务定义。它们没有可选的服务接口。

在使用 gRPC 和 protocol buffers 工具生成上述状态存储示例的服务脚手架代码后,您可以为 service StateStore 下定义的 9 个方法定义具体实现,以及初始化和与依赖项通信的代码。

这个具体实现和辅助代码是您可插拔组件的核心。它们定义了您的组件在处理来自 Dapr 的 gRPC 请求时的行为。

返回语义错误

返回语义错误也是可插拔组件协议的一部分。组件必须返回对用户应用程序具有语义意义的特定 gRPC 代码,这些错误用于从并发要求到仅信息的各种情况。

错误gRPC 错误代码源组件描述
ETag 不匹配codes.FailedPrecondition状态存储错误映射以满足并发要求
ETag 无效codes.InvalidArgument状态存储
批量删除行不匹配codes.Internal状态存储

状态管理概述中了解更多关于并发要求的信息。

以下示例演示了如何在您自己的可插拔组件中返回错误,并根据需要更改消息。

重要提示: 为了使用 .NET 进行错误映射,首先安装 Google.Api.CommonProtos NuGet 包

ETag 不匹配

var badRequest = new BadRequest();
var des = "提供的 ETag 字段与存储中的不匹配";
badRequest.FieldViolations.Add(    
   new Google.Rpc.BadRequest.Types.FieldViolation
       {        
         Field = "etag",
         Description = des
       });

var baseStatusCode = Grpc.Core.StatusCode.FailedPrecondition;
var status = new Google.Rpc.Status{    
   Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

ETag 无效

var badRequest = new BadRequest();
var des = "ETag 字段只能包含字母数字字符";
badRequest.FieldViolations.Add(
   new Google.Rpc.BadRequest.Types.FieldViolation
   {
      Field = "etag",
      Description = des
   });

var baseStatusCode = Grpc.Core.StatusCode.InvalidArgument;
var status = new Google.Rpc.Status
{
   Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

批量删除行不匹配

var errorInfo = new Google.Rpc.ErrorInfo();

errorInfo.Metadata.Add("expected", "100");
errorInfo.Metadata.Add("affected", "99");

var baseStatusCode = Grpc.Core.StatusCode.Internal;
var status = new Google.Rpc.Status{
    Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(errorInfo));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

就像 Dapr Java SDK 一样,Java 可插拔组件 SDK 使用 Project Reactor,它为 Java 提供了异步 API。

错误可以通过以下方式直接返回:

  1. 在您的方法返回的 MonoFlux 中调用 .error() 方法
  2. 提供适当的异常作为参数。

您也可以引发异常,只要它被捕获并反馈到您结果的 MonoFlux 中。

ETag 不匹配

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.FAILED_PRECONDITION.value())
    .setMessage("fake-err-msg-for-etag-mismatch")
    .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
        .setField("etag")
        .setDescription("提供的 ETag 字段与存储中的不匹配")
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

ETag 无效

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.INVALID_ARGUMENT.value())
    .setMessage("fake-err-msg-for-invalid-etag")
    .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
        .setField("etag")
        .setDescription("ETag 字段只能包含字母数字字符")
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

批量删除行不匹配

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.INTERNAL.value())
    .setMessage("fake-err-msg-for-bulk-delete-row-mismatch")
    .addDetails(Any.pack(ErrorInfo.newBuilder()
        .putAllMetadata(Map.ofEntries(
            Map.entry("affected", "99"),
            Map.entry("expected", "100")
        ))
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

ETag 不匹配

st := status.New(codes.FailedPrecondition, "fake-err-msg")
desc := "提供的 ETag 字段与存储中的不匹配"
v := &errdetails.BadRequest_FieldViolation{
	Field:       etagField,
	Description: desc,
}
br := &errdetails.BadRequest{}
br.FieldViolations = append(br.FieldViolations, v)
st, err := st.WithDetails(br)

ETag 无效

st := status.New(codes.InvalidArgument, "fake-err-msg")
desc := "ETag 字段只能包含字母数字字符"
v := &errdetails.BadRequest_FieldViolation{
	Field:       etagField,
	Description: desc,
}
br := &errdetails.BadRequest{}
br.FieldViolations = append(br.FieldViolations, v)
st, err := st.WithDetails(br)

批量删除行不匹配

st := status.New(codes.Internal, "fake-err-msg")
br := &errdetails.ErrorInfo{}
br.Metadata = map[string]string{
	affected: "99",
	expected: "100",
}
st, err := st.WithDetails(br)

下一步

3 - 可插拔组件的SDK

使用您喜欢的语言开发可插拔组件

Dapr SDK 是帮助您轻松创建可插拔组件的最佳工具。选择您喜欢的编程语言,几分钟内即可开始开发组件。

可插拔组件的SDK

语言进度
Go正在开发
.NET正在开发

3.1 - 开始使用 Dapr 可插拔组件 .NET SDK

如何使用 Dapr 可插拔组件 .NET SDK 快速上手

Dapr 提供了用于开发 .NET 可插拔组件的 NuGet 包。

前提条件

创建项目

要创建一个可插拔组件,首先从一个空的 ASP.NET 项目开始。

dotnet new web --name <project name>

添加 NuGet 包

添加 Dapr .NET 可插拔组件的 NuGet 包。

dotnet add package Dapr.PluggableComponents.AspNetCore

创建应用程序和服务

创建 Dapr 可插拔组件应用程序类似于创建 ASP.NET 应用程序。在 Program.cs 中,将 WebApplication 相关代码替换为 Dapr DaprPluggableComponentsApplication 的等效代码。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        // 使用此服务注册一个或多个组件。
    });

app.Run();

这将创建一个包含单个服务的应用程序。每个服务:

  • 对应一个 Unix 域套接字
  • 可以托管一个或多个组件类型

实现和注册组件

本地测试组件

可插拔组件可以通过在命令行启动应用程序并配置一个 Dapr sidecar 来进行测试。

要启动组件,在应用程序目录中:

dotnet run

要配置 Dapr 使用该组件,在资源路径目录中:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: <component name>
spec:
  type: state.<socket name>
  version: v1
  metadata:
  - name: key1
    value: value1
  - name: key2
    value: value2

任何 metadata 属性将在组件实例化时通过其 IPluggableComponent.InitAsync() 方法传递给组件。

要启动 Dapr(以及可选地使用该服务的服务):

dapr run --app-id <app id> --resources-path <resources path> ...

此时,Dapr sidecar 将已启动并通过 Unix 域套接字连接到组件。然后您可以通过以下方式与组件交互:

  • 通过使用该组件的服务(如果已启动),或
  • 直接使用 Dapr HTTP 或 gRPC API

创建容器

有几种方法可以为您的组件创建容器以便最终部署。

使用 .NET SDK

.NET 7 及更高版本的 SDK 允许您为应用程序创建基于 .NET 的容器 无需 Dockerfile,即使是针对早期版本的 .NET SDK。这可能是目前为您的组件生成容器的最简单方法。

Microsoft.NET.Build.Containers NuGet 包添加到组件项目中。

dotnet add package Microsoft.NET.Build.Containers

将应用程序发布为容器:

dotnet publish --os linux --arch x64 /t:PublishContainer -c Release

有关更多配置选项,例如控制容器名称、标签和基础镜像,请参阅 .NET 作为容器发布指南

使用 Dockerfile

虽然有工具可以为 .NET 应用程序生成 Dockerfile,但 .NET SDK 本身并不提供。一个典型的 Dockerfile 可能如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:<runtime> AS base
WORKDIR /app

# 创建一个具有显式 UID 的非 root 用户,并添加访问 /app 文件夹的权限
# 更多信息,请参阅 https://aka.ms/vscode-docker-dotnet-configure-containers
RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app
USER appuser

FROM mcr.microsoft.com/dotnet/sdk:<runtime> AS build
WORKDIR /src
COPY ["<application>.csproj", "<application folder>/"]
RUN dotnet restore "<application folder>/<application>.csproj"
COPY . .
WORKDIR "/src/<application folder>"
RUN dotnet build "<application>.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "<application>.csproj" -c Release -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "<application>.dll"]

构建镜像:

docker build -f Dockerfile -t <image name>:<tag> .

3.1.1 - 实现 .NET 输入/输出绑定组件

如何使用 Dapr 可插拔组件 .NET SDK 创建输入/输出绑定

创建绑定组件只需几个基本步骤。

添加绑定相关的命名空间

为绑定相关的命名空间添加 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.Bindings;

输入绑定:实现 IInputBinding

创建一个实现 IInputBinding 接口的类。

internal sealed class MyBinding : IInputBinding
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        // 在取消之前,检查底层存储中的消息并将其传递给 Dapr 运行时...
    }
}

ReadAsync() 方法的调用是“长时间运行”的,因为在取消之前不期望返回(例如,通过 cancellationToken)。当从组件的底层存储中读取消息时,它们通过 deliveryHandler 回调传递给 Dapr 运行时。这样,组件可以在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。

    public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        TimeSpan pollInterval = // 轮询间隔(例如,从初始化元数据中获取)...

        // 在取消之前轮询底层存储...
        while (!cancellationToken.IsCancellationRequested)
        {
            var messages = // 从底层存储中轮询消息...

            foreach (var message in messages)
            {
                // 将消息传递给 Dapr 运行时...
                await deliveryHandler(
                    new InputBindingReadResponse
                    {
                        // 设置消息内容...
                    },
                    // 当应用程序确认消息时调用的回调...
                    async request =>
                    {
                        // 处理响应数据或错误消息...
                    });
            }

            // 等待下次轮询(或取消)...
            await Task.Delay(pollInterval, cancellationToken);
        }
    }

输出绑定:实现 IOutputBinding

创建一个实现 IOutputBinding 接口的类。

internal sealed class MyBinding : IOutputBinding
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task<OutputBindingInvokeResponse> InvokeAsync(OutputBindingInvokeRequest request, CancellationToken cancellationToken = default)
    {
        // 执行特定操作...
    }

    public Task<string[]> ListOperationsAsync(CancellationToken cancellationToken = default)
    {
        // 列出可以调用的操作。
    }
}

输入和输出绑定组件

一个组件可以同时是输入和输出绑定,只需实现这两个接口即可。

internal sealed class MyBinding : IInputBinding, IOutputBinding
{
    // IInputBinding 实现...

    // IOutputBinding 实现...
}

注册绑定组件

在主程序文件中(例如,Program.cs),在应用程序服务中注册绑定组件。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterBinding<MyBinding>();
    });

app.Run();

3.1.2 - 实现 .NET 发布/订阅组件

如何使用 Dapr 可插拔组件 .NET SDK 创建发布/订阅

创建发布/订阅组件只需几个基本步骤。

添加发布/订阅命名空间

添加与发布/订阅相关的命名空间的 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.PubSub;

实现 IPubSub

创建一个实现 IPubSub 接口的类。

internal sealed class MyPubSub : IPubSub
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
    {
        // 将消息发送到指定的“主题”...
    }

    public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        // 持续检查主题中的消息并将其传递给 Dapr 运行时,直到取消为止...
    }
}

PullMessagesAsync() 方法是一个“长时间运行”的调用,因为在取消之前不期望返回(例如,通过 cancellationToken)。需要从中提取消息的“主题”通过 topic 参数传递,而传递到 Dapr 运行时是通过 deliveryHandler 回调执行的。传递机制允许组件在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。

    public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        TimeSpan pollInterval = // 轮询间隔(可以从初始化元数据中获取)...

        // 持续轮询主题直到取消...
        while (!cancellationToken.IsCancellationRequested)
        {
            var messages = // 从主题中轮询获取消息...

            foreach (var message in messages)
            {
                // 将消息传递给 Dapr 运行时...
                await deliveryHandler(
                    new PubSubPullMessagesResponse(topicName)
                    {
                        // 设置消息内容...
                    },
                    // 当应用程序确认消息时调用的回调...
                    async errorMessage =>
                    {
                        // 空消息表示应用程序成功处理了消息...
                        if (String.IsNullOrEmpty(errorMessage))
                        {
                            // 从主题中删除消息...
                        }
                    });
            }

            // 等待下一个轮询(或取消)...
            await Task.Delay(pollInterval, cancellationToken);
        }
    }

注册发布/订阅组件

在主程序文件中(例如,Program.cs),使用应用程序服务注册发布/订阅组件。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterPubSub<MyPubSub>();
    });

app.Run();

3.1.3 - 实现 .NET 状态存储组件

如何使用 Dapr 可插拔组件 .NET SDK 创建状态存储

创建状态存储组件只需几个基本步骤。

添加状态存储命名空间

为状态存储相关的命名空间添加 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.StateStore;

实现 IStateStore

创建一个实现 IStateStore 接口的类。

internal sealed class MyStateStore : IStateStore
{
    public Task DeleteAsync(StateStoreDeleteRequest request, CancellationToken cancellationToken = default)
    {
        // 从状态存储中删除请求的键...
    }

    public Task<StateStoreGetResponse?> GetAsync(StateStoreGetRequest request, CancellationToken cancellationToken = default)
    {
        // 从状态存储中获取请求的键值,否则返回 null...
    }

    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task SetAsync(StateStoreSetRequest request, CancellationToken cancellationToken = default)
    {
        // 在状态存储中将请求的键设置为指定的值...
    }
}

注册状态存储组件

在主程序文件中(如 Program.cs),通过应用服务注册状态存储。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

支持批量操作的状态存储

如果状态存储打算支持批量操作,应实现可选的 IBulkStateStore 接口。其方法与基础 IStateStore 接口的方法相似,但包含多个请求的值。

internal sealed class MyStateStore : IStateStore, IBulkStateStore
{
    // ...

    public Task BulkDeleteAsync(StateStoreDeleteRequest[] requests, CancellationToken cancellationToken = default)
    {
        // 从状态存储中删除所有请求的值...
    }

    public Task<StateStoreBulkStateItem[]> BulkGetAsync(StateStoreGetRequest[] requests, CancellationToken cancellationToken = default)
    {
        // 返回状态存储中所有请求的值...
    }

    public Task BulkSetAsync(StateStoreSetRequest[] requests, CancellationToken cancellationToken = default)
    {
        // 在状态存储中设置所有请求键的值...
    }
}

事务性状态存储

如果状态存储打算支持事务,应实现可选的 ITransactionalStateStore 接口。其 TransactAsync() 方法接收一个请求,其中包含要在事务中执行的删除和/或设置操作序列。状态存储应遍历这些操作,并调用每个操作的 Visit() 方法,传递相应的回调以处理每种操作类型。

internal sealed class MyStateStore : IStateStore, ITransactionalStateStore
{
    // ...

    public async Task TransactAsync(StateStoreTransactRequest request, CancellationToken cancellationToken = default)
    {
        // 开始事务...

        try
        {
            foreach (var operation in request.Operations)
            {
                await operation.Visit(
                    async deleteRequest =>
                    {
                        // 处理删除请求...

                    },
                    async setRequest =>
                    {
                        // 处理设置请求...
                    });
            }
        }
        catch
        {
            // 回滚事务...

            throw;
        }

        // 提交事务...
    }
}

可查询状态存储

如果状态存储打算支持查询,应实现可选的 IQueryableStateStore 接口。其 QueryAsync() 方法接收有关查询的详细信息,例如过滤器、结果限制和分页,以及结果的排序顺序。状态存储应使用这些详细信息生成一组值并作为响应的一部分返回。

internal sealed class MyStateStore : IStateStore, IQueryableStateStore
{
    // ...

    public Task<StateStoreQueryResponse> QueryAsync(StateStoreQueryRequest request, CancellationToken cancellationToken = default)
    {
        // 生成并返回结果...
    }
}

ETag 和其他语义错误处理

Dapr 运行时对某些状态存储操作导致的特定错误条件有额外的处理。状态存储可以通过从其操作逻辑中抛出特定异常来指示这些条件:

异常适用操作描述
ETagInvalidException删除、设置、批量删除、批量设置当 ETag 无效时
ETagMismatchException删除、设置、批量删除、批量设置当 ETag 与预期值不匹配时
BulkDeleteRowMismatchException批量删除当受影响的行数与预期行数不匹配时

3.1.4 - .NET SDK 的 Dapr 可插拔组件高级用法

如何在 Dapr 可插拔组件 .NET SDK 中使用高级技术

尽管大多数情况下不需要,但这些指南提供了配置 .NET 可插拔组件的高级方法。

3.1.4.1 - 在 .NET Dapr 可插拔组件中使用多个服务

如何从 .NET 可插拔组件中暴露多个服务

一个可插拔组件可以托管多种类型的组件。您可能会这样做:

  • 以减少集群中运行的sidecar数量
  • 以便将可能共享库和实现的相关组件进行分组,例如:
    • 一个既作为通用状态存储又作为
    • 允许更具体操作的输出绑定。

每个Unix域套接字可以管理对每种类型的一个组件的调用。要托管多个相同类型的组件,您可以将这些类型分布在多个套接字上。SDK将每个套接字绑定到一个“服务”,每个服务由一个或多个组件类型组成。

注册多个服务

每次调用RegisterService()都会将一个套接字绑定到一组注册的组件,其中每种类型的组件每个服务可以注册一个。

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "service-a",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyDatabaseStateStore>();
        serviceBuilder.RegisterBinding<MyDatabaseOutputBinding>();
    });

app.RegisterService(
    "service-b",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<AnotherStateStore>();
    });

app.Run();

class MyDatabaseStateStore : IStateStore
{
    // ...
}

class MyDatabaseOutputBinding : IOutputBinding
{
    // ...
}

class AnotherStateStore : IStateStore
{
    // ...
}

配置多个组件

配置Dapr以使用托管组件与任何单个组件相同 - 组件YAML引用关联的套接字。

#
# 此组件使用与套接字 `state-store-a` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-a
spec:
  type: state.service-a
  version: v1
  metadata: []
#
# 此组件使用与套接字 `state-store-b` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-b
spec:
  type: state.service-b
  version: v1
  metadata: []

3.1.4.2 - .NET Dapr 插件组件的应用环境配置

如何配置 .NET 插件组件的环境

.NET Dapr 插件组件应用可以配置依赖注入、日志记录和配置值,类似于 ASP.NET 应用。DaprPluggableComponentsApplication 提供了一组与 WebApplicationBuilder 类似的配置属性。

依赖注入

注册到服务的组件可以参与依赖注入。组件构造函数中的参数会在创建时被注入,前提是这些类型已在应用中注册。你可以通过 DaprPluggableComponentsApplication 提供的 IServiceCollection 来注册它们。

var app = DaprPluggableComponentsApplication.Create();

// 将 MyService 注册为 IService 的单例实现。
app.Services.AddSingleton<IService, MyService>();

app.RegisterService(
    "<service name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

interface IService
{
    // ...
}

class MyService : IService
{
    // ...
}

class MyStateStore : IStateStore
{
    // 在创建 state 存储时注入 IService。
    public MyStateStore(IService service)
    {
        // ...
    }

    // ...
}

日志记录

.NET Dapr 插件组件可以使用标准 .NET 日志机制DaprPluggableComponentsApplication 提供了一个 ILoggingBuilder,可以通过它进行配置。

var app = DaprPluggableComponentsApplication.Create();

// 清除默认日志记录器并添加新的。
app.Logging.ClearProviders();
app.Logging.AddConsole();

app.RegisterService(
    "<service name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

class MyStateStore : IStateStore
{
    // 在创建 state 存储时注入日志记录器。
    public MyStateStore(ILogger<MyStateStore> logger)
    {
        // ...
    }

    // ...
}

配置值

由于 .NET 插件组件是基于 ASP.NET 构建的,它们可以使用其标准配置机制,并默认使用相同的一组预注册提供者DaprPluggableComponentsApplication 提供了一个 IConfigurationManager,可以通过它进行配置。

var app = DaprPluggableComponentsApplication.Create();

// 清除默认配置提供者并添加新的。
((IConfigurationBuilder)app.Configuration).Sources.Clear();
app.Configuration.AddEnvironmentVariables();

// 在启动时获取配置值。
const value = app.Configuration["<name>"];

app.RegisterService(
    "<service name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

class MyStateStore : IStateStore
{
    // 在创建 state 存储时注入配置。
    public MyStateStore(IConfiguration configuration)
    {
        // ...
    }

    // ...
}

3.1.4.3 - .NET Dapr 可插拔组件的生命周期

如何控制 .NET 可插拔组件的生命周期

在 .NET Dapr 中,注册组件有两种方式:

  • 组件作为单例运行,其生命周期由 SDK 管理
  • 组件的生命周期由可插拔组件决定,可以是多实例或单例,视需要而定

单例组件

按类型注册的组件将作为单例运行:一个实例将为与该 socket 关联的所有配置组件提供服务。当仅存在一个该类型的组件并在 Dapr 应用程序之间共享时,这种方法是最佳选择。

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "service-a",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<SingletonStateStore>();
    });

app.Run();

class SingletonStateStore : IStateStore
{
    // ...
}

多实例组件

可以通过传递“工厂方法”来注册组件。对于与该 socket 关联的每个配置组件,该方法将被调用。该方法返回要与该组件关联的实例(无论是否共享)。当多个相同类型的组件可能配置有不同的元数据集时,或者当组件操作需要彼此隔离时,这种方法是最佳选择。

工厂方法会接收上下文信息,例如配置的 Dapr 组件的 ID,这些信息可用于区分不同的组件实例。

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "service-a",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore(
            context =>
            {
                return new MultiStateStore(context.InstanceId);
            });
    });

app.Run();

class MultiStateStore : IStateStore
{
    private readonly string instanceId;

    public MultiStateStore(string instanceId)
    {
        this.instanceId = instanceId;
    }

    // ...
}

3.2 - 使用 Dapr 可插拔组件 Go SDK 入门

如何使用 Dapr 可插拔组件 Go SDK 快速上手

Dapr 提供了一些工具包,帮助开发者创建 Go 可插拔组件。

前置条件

创建应用程序

要创建一个可插拔组件,首先需要一个空的 Go 应用程序。

mkdir example
cd component
go mod init example

导入 Dapr 包

导入 Dapr 可插拔组件 SDK 包。

go get github.com/dapr-sandbox/components-go-sdk@v0.1.0

创建主包

main.go 中,导入 Dapr 可插拔组件包并运行应用程序。

package main

import (
	dapr "github.com/dapr-sandbox/components-go-sdk"
)

func main() {
	dapr.MustRun()
}

这会创建一个没有组件的应用程序,您需要实现并注册一个或多个组件。

实现和注册组件

本地测试组件

创建 Dapr 组件套接字目录

Dapr 通过 Unix 域套接字文件在一个公共目录中与可插拔组件通信。默认情况下,Dapr 和可插拔组件都使用 /tmp/dapr-components-sockets 目录。如果该目录尚不存在,您应该创建它。

mkdir /tmp/dapr-components-sockets

启动可插拔组件

可以通过在命令行启动应用程序来测试可插拔组件。

要启动组件,在应用程序目录中:

go run main.go

配置 Dapr 使用可插拔组件

要配置 Dapr 使用组件,请在资源目录中创建一个组件 YAML 文件。例如,对于一个状态存储组件:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: <component name>
spec:
  type: state.<socket name>
  version: v1
  metadata:
  - name: key1
    value: value1
  - name: key2
    value: value2

任何 metadata 属性将在组件实例化时通过其 Store.Init(metadata state.Metadata) 方法传递给组件。

启动 Dapr

要启动 Dapr(以及可选的使用该服务的服务):

dapr run --app-id <app id> --resources-path <resources path> ...

此时,Dapr sidecar 将已启动并通过 Unix 域套接字连接到组件。然后,您可以通过以下方式与组件交互:

  • 通过使用该组件的服务(如果已启动),或
  • 直接使用 Dapr HTTP 或 gRPC API

创建容器

可插拔组件作为容器部署,作为应用程序的 sidecar 运行(如同 Dapr 本身)。一个典型的用于创建 Go 应用程序 Docker 镜像的 Dockerfile 可能如下所示:

FROM golang:1.20-alpine AS builder

WORKDIR /usr/src/app

# 下载依赖
COPY go.mod go.sum ./
RUN go mod download && go mod verify

# 构建应用程序
COPY . .
RUN go build -v -o /usr/src/bin/app .

FROM alpine:latest

# 设置非 root 用户和权限
RUN addgroup -S app && adduser -S app -G app
RUN mkdir /tmp/dapr-components-sockets && chown app /tmp/dapr-components-sockets

# 将应用程序复制到运行时镜像
COPY --from=builder --chown=app /usr/src/bin/app /app

USER app

CMD ["/app"]

构建镜像:

docker build -f Dockerfile -t <image name>:<tag> .

下一步

3.2.1 - 实现一个 Go pub/sub 组件

如何使用 Dapr 可插拔组件 Go SDK 创建一个 pub/sub 组件

创建一个 pub/sub 组件只需几个基本步骤。

导入 pub/sub 包

创建文件 components/pubsub.go 并添加 import 语句以导入与 pub/sub 相关的包。

package components

import (
	"context"
	"github.com/dapr/components-contrib/pubsub"
)

实现 PubSub 接口

创建一个实现 PubSub 接口的类型。

type MyPubSubComponent struct {
}

func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
	// 使用配置的元数据初始化组件...
}

func (component *MyPubSubComponent) Close() error {
    // 不用于可插拔组件...
	return nil
}

func (component *MyPubSubComponent) Features() []pubsub.Feature {
	// 返回组件支持的功能列表...
}

func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
	// 将消息发送到 "topic"...
}

func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
	// 设置一个长时间运行的机制来检索消息,直到取消为止,并将其传递给 Dapr 运行时...
}

调用 Subscribe() 方法时,需要设置一个长时间运行的机制来检索消息,并立即返回 nil(如果无法设置该机制,则返回错误)。该机制应在取消时结束(例如,通过 ctx.Done()ctx.Err() != nil)。消息的 “topic” 是通过 req 参数传递的,而传递给 Dapr 运行时的消息则通过 handler 回调来处理。回调在应用程序(由 Dapr 运行时服务)确认处理消息之前不会返回。

func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
	go func() {
		for {
			if ctx.Err() != nil {
				return
			}
	
			messages := // 轮询消息...

            for _, message := range messages {
                handler(ctx, &pubsub.NewMessage{
                    // 设置消息内容...
                })
            }

			select {
				case <-ctx.Done():
				case <-time.After(5 * time.Second):
			} 
		}
	}()

	return nil
}

注册 pub/sub 组件

在主应用程序文件中(例如,main.go),注册 pub/sub 组件。

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
)

func main() {
	dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
		return &components.MyPubSubComponent{}
	}))

	dapr.MustRun()
}

下一步

3.2.2 - 实现一个Go输入/输出绑定组件

如何使用Dapr可插拔组件Go SDK创建一个输入/输出绑定

创建绑定组件只需几个基本步骤。

导入绑定包

创建文件 components/inputbinding.go 并添加与状态存储相关的包的 import 语句。

package components

import (
	"context"
	"github.com/dapr/components-contrib/bindings"
)

输入绑定:实现 InputBinding 接口

创建一个实现 InputBinding 接口的类型。

type MyInputBindingComponent struct {
}

func (component *MyInputBindingComponent) Init(meta bindings.Metadata) error {
	// 用于初始化组件的配置元数据...
}

func (component *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
	// 设置一个长期机制来检索消息,直到取消为止...
}

调用 Read() 方法时,应该设置一个长期运行的机制来检索消息,并立即返回 nil(如果无法设置该机制,则返回错误)。当取消时(例如,通过 ctx.Done()ctx.Err() != nil),该机制应停止。当从组件的底层存储读取消息时,它们通过 handler 回调传递给Dapr运行时,直到应用程序(由Dapr运行时服务)确认消息处理后才返回。

func (b *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
	go func() {
		for {
			if ctx.Err() != nil {
				return
			}
	
			messages := // 轮询消息...

            for _, message := range messages {
                handler(ctx, &bindings.ReadResponse{
                    // 设置消息内容...
                })
            }

			select {
				case <-ctx.Done():
				case <-time.After(5 * time.Second):
			} 
		}
	}()

	return nil
}

输出绑定:实现 OutputBinding 接口

创建一个实现 OutputBinding 接口的类型。

type MyOutputBindingComponent struct {
}

func (component *MyOutputBindingComponent) Init(meta bindings.Metadata) error {
	// 用于初始化组件的配置元数据...
}

func (component *MyOutputBindingComponent) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
	// 调用特定操作时执行...
}

func (component *MyOutputBindingComponent) Operations() []bindings.OperationKind {
	// 列出可以调用的操作。
}

输入和输出绑定组件

一个组件可以同时作为输入和输出绑定。只需实现两个接口,并将组件注册为两种绑定类型即可。

注册绑定组件

在主应用程序文件中(例如,main.go),将绑定组件注册到应用程序中。

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/bindings/v1"
)

func main() {
	// 注册一个输入绑定...
	dapr.Register("my-inputbinding", dapr.WithInputBinding(func() bindings.InputBinding {
		return &components.MyInputBindingComponent{}
	}))

	// 注册一个输出绑定...
	dapr.Register("my-outputbinding", dapr.WithOutputBinding(func() bindings.OutputBinding {
		return &components.MyOutputBindingComponent{}
	}))

	dapr.MustRun()
}

下一步

3.2.3 - 实现一个 Go 状态存储组件

如何使用 Dapr 可插拔组件 Go SDK 创建一个状态存储

创建状态存储组件只需几个基本步骤。

导入状态存储包

创建文件 components/statestore.go 并添加与状态存储相关的包的 import 语句。

package components

import (
	"context"
	"github.com/dapr/components-contrib/state"
)

实现 Store 接口

创建一个实现 Store 接口的类型。

type MyStateStore struct {
}

func (store *MyStateStore) Init(metadata state.Metadata) error {
	// 使用配置的元数据初始化组件...
}

func (store *MyStateStore) GetComponentMetadata() map[string]string {
    // 不用于可插拔组件...
	return map[string]string{}
}

func (store *MyStateStore) Features() []state.Feature {
	// 返回状态存储支持的功能列表...
}

func (store *MyStateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
	// 从状态存储中删除请求的键...
}

func (store *MyStateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
	// 从状态存储中获取请求的键值,否则返回空响应...
}

func (store *MyStateStore) Set(ctx context.Context, req *state.SetRequest) error {
	// 在状态存储中将请求的键设置为指定的值...
}

func (store *MyStateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
	// 从状态存储中获取请求的键值...
}

func (store *MyStateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
	// 从状态存储中删除请求的键...
}

func (store *MyStateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
	// 在状态存储中将请求的键设置为其指定的值...
}

注册状态存储组件

在主应用程序文件(例如,main.go)中,将状态存储注册到应用程序服务中。

package main

import (
	"example/components"
	dapr "github.com/dapr-sandbox/components-go-sdk"
	"github.com/dapr-sandbox/components-go-sdk/state/v1"
)

func main() {
	dapr.Register("<socket name>", dapr.WithStateStore(func() state.Store {
		return &components.MyStateStoreComponent{}
	}))

	dapr.MustRun()
}

批量操作的状态存储

虽然状态存储需要支持批量操作,但它们的实现会顺序委托给各个操作方法。

事务性状态存储

如果状态存储计划支持事务,则应实现可选的 TransactionalStore 接口。其 Multi() 方法接收一个包含一系列 delete 和/或 set 操作的请求,以在事务中执行。状态存储应遍历序列并应用每个操作。

func (store *MyStateStoreComponent) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
    // 开始事务...

    for _, operation := range request.Operations {
		switch operation.Operation {
		case state.Delete:
			deleteRequest := operation.Request.(state.DeleteRequest)
			// 处理删除请求...
		case state.Upsert:
			setRequest := operation.Request.(state.SetRequest)
			// 处理设置请求...
		}
	}

    // 结束(或回滚)事务...

	return nil
}

可查询的状态存储

如果状态存储计划支持查询,则应实现可选的 Querier 接口。其 Query() 方法传递有关查询的详细信息,例如过滤器、结果限制、分页和结果的排序顺序。状态存储使用这些详细信息生成一组值作为响应的一部分返回。

func (store *MyStateStoreComponent) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
	// 生成并返回结果...
}

ETag 和其他错误处理

Dapr 运行时对某些状态存储操作导致的特定错误条件有额外的处理。状态存储可以通过从其操作逻辑中返回特定错误来指示这些条件:

错误适用操作描述
NewETagError(state.ETagInvalid, ...)Delete, Set, Bulk Delete, Bulk Set当 ETag 无效时
NewETagError(state.ETagMismatch, ...)Delete, Set, Bulk Delete, Bulk Set当 ETag 与预期值不匹配时
NewBulkDeleteRowMismatchError(...)Bulk Delete当受影响的行数与预期行数不匹配时

下一步

3.2.4 - Dapr 可插拔组件 Go SDK 的高级用法

如何使用 Dapr 可插拔组件 Go SDK 的高级技术

尽管大多数情况下不需要使用这些高级配置方法,但本指南将展示如何在 Go 中配置 Dapr 的可插拔组件。

组件生命周期

可插拔组件通过传递一个“工厂方法”进行注册,该方法会在每个与 socket 关联的 Dapr 组件配置中被调用。这个方法返回与该 Dapr 组件相关联的实例(无论是否共享)。这使得多个相同类型的 Dapr 组件可以使用不同的元数据集进行配置,尤其是在需要组件操作相互隔离的情况下。

注册多个服务

每次调用 Register() 都会将一个 socket 绑定到一个注册的可插拔组件。每种组件类型(输入/输出绑定、pub/sub 和状态存储)可以在每个 socket 上注册一个。

func main() {
	dapr.Register("service-a", dapr.WithStateStore(func() state.Store {
		return &components.MyDatabaseStoreComponent{}
	}))

	dapr.Register("service-a", dapr.WithOutputBinding(func() bindings.OutputBinding {
		return &components.MyDatabaseOutputBindingComponent{}
	}))

	dapr.Register("service-b", dapr.WithStateStore(func() state.Store {
		return &components.MyDatabaseStoreComponent{}
	}))

	dapr.MustRun()
}

在上面的示例中,状态存储和输出绑定被注册到 socket service-a,而另一个状态存储被注册到 socket service-b

配置多个组件

配置 Dapr 使用托管组件与配置单个组件的方式相同 - 组件的 YAML 文件中需要指明关联的 socket。例如,要为上面注册的两个组件(分别注册到 socket service-aservice-b)配置 Dapr 状态存储,您需要创建两个配置文件,每个文件引用其各自的 socket。

#
# 此组件使用与 socket `service-a` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-a
spec:
  type: state.service-a
  version: v1
  metadata: []
#
# 此组件使用与 socket `service-b` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-b
spec:
  type: state.service-b
  version: v1
  metadata: []

下一步