Dapr SDK 是帮助您轻松创建可插拔组件的最佳工具。选择您喜欢的编程语言,几分钟内即可开始开发组件。
可插拔组件的SDK
语言 | 进度 |
---|---|
Go | 正在开发 |
.NET | 正在开发 |
This is the multi-page printable view of this section. Click here to print.
Dapr 提供了用于开发 .NET 可插拔组件的 NuGet 包。
要创建一个可插拔组件,首先从一个空的 ASP.NET 项目开始。
dotnet new web --name <project name>
添加 Dapr .NET 可插拔组件的 NuGet 包。
dotnet add package Dapr.PluggableComponents.AspNetCore
创建 Dapr 可插拔组件应用程序类似于创建 ASP.NET 应用程序。在 Program.cs
中,将 WebApplication
相关代码替换为 Dapr DaprPluggableComponentsApplication
的等效代码。
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
// 使用此服务注册一个或多个组件。
});
app.Run();
这将创建一个包含单个服务的应用程序。每个服务:
可插拔组件可以通过在命令行启动应用程序并配置一个 Dapr sidecar 来进行测试。
要启动组件,在应用程序目录中:
dotnet run
要配置 Dapr 使用该组件,在资源路径目录中:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: <component name>
spec:
type: state.<socket name>
version: v1
metadata:
- name: key1
value: value1
- name: key2
value: value2
任何 metadata
属性将在组件实例化时通过其 IPluggableComponent.InitAsync()
方法传递给组件。
要启动 Dapr(以及可选地使用该服务的服务):
dapr run --app-id <app id> --resources-path <resources path> ...
此时,Dapr sidecar 将已启动并通过 Unix 域套接字连接到组件。然后您可以通过以下方式与组件交互:
有几种方法可以为您的组件创建容器以便最终部署。
.NET 7 及更高版本的 SDK 允许您为应用程序创建基于 .NET 的容器 无需 Dockerfile
,即使是针对早期版本的 .NET SDK。这可能是目前为您的组件生成容器的最简单方法。
目前,.NET 7 SDK 需要本地机器上的 Docker Desktop、一个特殊的 NuGet 包,以及本地机器上的 Docker Desktop 来构建容器。未来版本的 .NET SDK 计划消除这些要求。
可以在本地机器上同时安装多个版本的 .NET SDK。
将 Microsoft.NET.Build.Containers
NuGet 包添加到组件项目中。
dotnet add package Microsoft.NET.Build.Containers
将应用程序发布为容器:
dotnet publish --os linux --arch x64 /t:PublishContainer -c Release
--arch x64
与组件的最终部署目标匹配。默认情况下,生成的容器的架构与本地机器的架构匹配。例如,如果本地机器是基于 ARM64 的(例如,M1 或 M2 Mac)并且省略了参数,则将生成一个 ARM64 容器,这可能与期望 AMD64 容器的部署目标不兼容。有关更多配置选项,例如控制容器名称、标签和基础镜像,请参阅 .NET 作为容器发布指南。
虽然有工具可以为 .NET 应用程序生成 Dockerfile
,但 .NET SDK 本身并不提供。一个典型的 Dockerfile
可能如下所示:
FROM mcr.microsoft.com/dotnet/aspnet:<runtime> AS base
WORKDIR /app
# 创建一个具有显式 UID 的非 root 用户,并添加访问 /app 文件夹的权限
# 更多信息,请参阅 https://aka.ms/vscode-docker-dotnet-configure-containers
RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app
USER appuser
FROM mcr.microsoft.com/dotnet/sdk:<runtime> AS build
WORKDIR /src
COPY ["<application>.csproj", "<application folder>/"]
RUN dotnet restore "<application folder>/<application>.csproj"
COPY . .
WORKDIR "/src/<application folder>"
RUN dotnet build "<application>.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "<application>.csproj" -c Release -o /app/publish /p:UseAppHost=false
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "<application>.dll"]
构建镜像:
docker build -f Dockerfile -t <image name>:<tag> .
Dockerfile
中 COPY
操作的路径是相对于构建镜像时传递的 Docker 上下文的,而 Docker 上下文本身会根据所构建项目的需求而有所不同(例如,如果它有引用的项目)。在上面的示例中,假设 Docker 上下文是组件项目目录。创建绑定组件只需几个基本步骤。
为绑定相关的命名空间添加 using
语句。
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.Bindings;
IInputBinding
创建一个实现 IInputBinding
接口的类。
internal sealed class MyBinding : IInputBinding
{
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// 使用配置的元数据初始化组件...
}
public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
// 在取消之前,检查底层存储中的消息并将其传递给 Dapr 运行时...
}
}
ReadAsync()
方法的调用是“长时间运行”的,因为在取消之前不期望返回(例如,通过 cancellationToken
)。当从组件的底层存储中读取消息时,它们通过 deliveryHandler
回调传递给 Dapr 运行时。这样,组件可以在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。
public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
TimeSpan pollInterval = // 轮询间隔(例如,从初始化元数据中获取)...
// 在取消之前轮询底层存储...
while (!cancellationToken.IsCancellationRequested)
{
var messages = // 从底层存储中轮询消息...
foreach (var message in messages)
{
// 将消息传递给 Dapr 运行时...
await deliveryHandler(
new InputBindingReadResponse
{
// 设置消息内容...
},
// 当应用程序确认消息时调用的回调...
async request =>
{
// 处理响应数据或错误消息...
});
}
// 等待下次轮询(或取消)...
await Task.Delay(pollInterval, cancellationToken);
}
}
IOutputBinding
创建一个实现 IOutputBinding
接口的类。
internal sealed class MyBinding : IOutputBinding
{
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// 使用配置的元数据初始化组件...
}
public Task<OutputBindingInvokeResponse> InvokeAsync(OutputBindingInvokeRequest request, CancellationToken cancellationToken = default)
{
// 执行特定操作...
}
public Task<string[]> ListOperationsAsync(CancellationToken cancellationToken = default)
{
// 列出可以调用的操作。
}
}
一个组件可以同时是输入和输出绑定,只需实现这两个接口即可。
internal sealed class MyBinding : IInputBinding, IOutputBinding
{
// IInputBinding 实现...
// IOutputBinding 实现...
}
在主程序文件中(例如,Program.cs
),在应用程序服务中注册绑定组件。
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
serviceBuilder.RegisterBinding<MyBinding>();
});
app.Run();
IInputBinding
和 IOutputBinding
的组件将被注册为输入和输出绑定。创建发布/订阅组件只需几个基本步骤。
添加与发布/订阅相关的命名空间的 using
语句。
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.PubSub;
IPubSub
创建一个实现 IPubSub
接口的类。
internal sealed class MyPubSub : IPubSub
{
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// 使用配置的元数据初始化组件...
}
public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
{
// 将消息发送到指定的“主题”...
}
public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
// 持续检查主题中的消息并将其传递给 Dapr 运行时,直到取消为止...
}
}
PullMessagesAsync()
方法是一个“长时间运行”的调用,因为在取消之前不期望返回(例如,通过 cancellationToken
)。需要从中提取消息的“主题”通过 topic
参数传递,而传递到 Dapr 运行时是通过 deliveryHandler
回调执行的。传递机制允许组件在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。
public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
{
TimeSpan pollInterval = // 轮询间隔(可以从初始化元数据中获取)...
// 持续轮询主题直到取消...
while (!cancellationToken.IsCancellationRequested)
{
var messages = // 从主题中轮询获取消息...
foreach (var message in messages)
{
// 将消息传递给 Dapr 运行时...
await deliveryHandler(
new PubSubPullMessagesResponse(topicName)
{
// 设置消息内容...
},
// 当应用程序确认消息时调用的回调...
async errorMessage =>
{
// 空消息表示应用程序成功处理了消息...
if (String.IsNullOrEmpty(errorMessage))
{
// 从主题中删除消息...
}
});
}
// 等待下一个轮询(或取消)...
await Task.Delay(pollInterval, cancellationToken);
}
}
在主程序文件中(例如,Program.cs
),使用应用程序服务注册发布/订阅组件。
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
serviceBuilder.RegisterPubSub<MyPubSub>();
});
app.Run();
创建状态存储组件只需几个基本步骤。
为状态存储相关的命名空间添加 using
语句。
using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.StateStore;
IStateStore
创建一个实现 IStateStore
接口的类。
internal sealed class MyStateStore : IStateStore
{
public Task DeleteAsync(StateStoreDeleteRequest request, CancellationToken cancellationToken = default)
{
// 从状态存储中删除请求的键...
}
public Task<StateStoreGetResponse?> GetAsync(StateStoreGetRequest request, CancellationToken cancellationToken = default)
{
// 从状态存储中获取请求的键值,否则返回 null...
}
public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
{
// 使用配置的元数据初始化组件...
}
public Task SetAsync(StateStoreSetRequest request, CancellationToken cancellationToken = default)
{
// 在状态存储中将请求的键设置为指定的值...
}
}
在主程序文件中(如 Program.cs
),通过应用服务注册状态存储。
using Dapr.PluggableComponents;
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"<socket name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
如果状态存储打算支持批量操作,应实现可选的 IBulkStateStore
接口。其方法与基础 IStateStore
接口的方法相似,但包含多个请求的值。
IBulkStateStore
的状态存储,Dapr 运行时将通过单独调用其操作来模拟批量状态存储操作。internal sealed class MyStateStore : IStateStore, IBulkStateStore
{
// ...
public Task BulkDeleteAsync(StateStoreDeleteRequest[] requests, CancellationToken cancellationToken = default)
{
// 从状态存储中删除所有请求的值...
}
public Task<StateStoreBulkStateItem[]> BulkGetAsync(StateStoreGetRequest[] requests, CancellationToken cancellationToken = default)
{
// 返回状态存储中所有请求的值...
}
public Task BulkSetAsync(StateStoreSetRequest[] requests, CancellationToken cancellationToken = default)
{
// 在状态存储中设置所有请求键的值...
}
}
如果状态存储打算支持事务,应实现可选的 ITransactionalStateStore
接口。其 TransactAsync()
方法接收一个请求,其中包含要在事务中执行的删除和/或设置操作序列。状态存储应遍历这些操作,并调用每个操作的 Visit()
方法,传递相应的回调以处理每种操作类型。
internal sealed class MyStateStore : IStateStore, ITransactionalStateStore
{
// ...
public async Task TransactAsync(StateStoreTransactRequest request, CancellationToken cancellationToken = default)
{
// 开始事务...
try
{
foreach (var operation in request.Operations)
{
await operation.Visit(
async deleteRequest =>
{
// 处理删除请求...
},
async setRequest =>
{
// 处理设置请求...
});
}
}
catch
{
// 回滚事务...
throw;
}
// 提交事务...
}
}
如果状态存储打算支持查询,应实现可选的 IQueryableStateStore
接口。其 QueryAsync()
方法接收有关查询的详细信息,例如过滤器、结果限制和分页,以及结果的排序顺序。状态存储应使用这些详细信息生成一组值并作为响应的一部分返回。
internal sealed class MyStateStore : IStateStore, IQueryableStateStore
{
// ...
public Task<StateStoreQueryResponse> QueryAsync(StateStoreQueryRequest request, CancellationToken cancellationToken = default)
{
// 生成并返回结果...
}
}
Dapr 运行时对某些状态存储操作导致的特定错误条件有额外的处理。状态存储可以通过从其操作逻辑中抛出特定异常来指示这些条件:
异常 | 适用操作 | 描述 |
---|---|---|
ETagInvalidException | 删除、设置、批量删除、批量设置 | 当 ETag 无效时 |
ETagMismatchException | 删除、设置、批量删除、批量设置 | 当 ETag 与预期值不匹配时 |
BulkDeleteRowMismatchException | 批量删除 | 当受影响的行数与预期行数不匹配时 |
尽管大多数情况下不需要,但这些指南提供了配置 .NET 可插拔组件的高级方法。
一个可插拔组件可以托管多种类型的组件。您可能会这样做:
每个Unix域套接字可以管理对每种类型的一个组件的调用。要托管多个相同类型的组件,您可以将这些类型分布在多个套接字上。SDK将每个套接字绑定到一个“服务”,每个服务由一个或多个组件类型组成。
每次调用RegisterService()
都会将一个套接字绑定到一组注册的组件,其中每种类型的组件每个服务可以注册一个。
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"service-a",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyDatabaseStateStore>();
serviceBuilder.RegisterBinding<MyDatabaseOutputBinding>();
});
app.RegisterService(
"service-b",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<AnotherStateStore>();
});
app.Run();
class MyDatabaseStateStore : IStateStore
{
// ...
}
class MyDatabaseOutputBinding : IOutputBinding
{
// ...
}
class AnotherStateStore : IStateStore
{
// ...
}
配置Dapr以使用托管组件与任何单个组件相同 - 组件YAML引用关联的套接字。
#
# 此组件使用与套接字 `state-store-a` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-a
spec:
type: state.service-a
version: v1
metadata: []
#
# 此组件使用与套接字 `state-store-b` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-b
spec:
type: state.service-b
version: v1
metadata: []
.NET Dapr 插件组件应用可以配置依赖注入、日志记录和配置值,类似于 ASP.NET 应用。DaprPluggableComponentsApplication
提供了一组与 WebApplicationBuilder
类似的配置属性。
注册到服务的组件可以参与依赖注入。组件构造函数中的参数会在创建时被注入,前提是这些类型已在应用中注册。你可以通过 DaprPluggableComponentsApplication
提供的 IServiceCollection
来注册它们。
var app = DaprPluggableComponentsApplication.Create();
// 将 MyService 注册为 IService 的单例实现。
app.Services.AddSingleton<IService, MyService>();
app.RegisterService(
"<service name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
interface IService
{
// ...
}
class MyService : IService
{
// ...
}
class MyStateStore : IStateStore
{
// 在创建 state 存储时注入 IService。
public MyStateStore(IService service)
{
// ...
}
// ...
}
IServiceCollection.AddScoped()
。因为此类实例的生命周期仅限于单个 gRPC 方法调用,这与组件实例的生命周期不一致。.NET Dapr 插件组件可以使用标准 .NET 日志机制。DaprPluggableComponentsApplication
提供了一个 ILoggingBuilder
,可以通过它进行配置。
ILogger<T>
)已预先注册。var app = DaprPluggableComponentsApplication.Create();
// 清除默认日志记录器并添加新的。
app.Logging.ClearProviders();
app.Logging.AddConsole();
app.RegisterService(
"<service name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
class MyStateStore : IStateStore
{
// 在创建 state 存储时注入日志记录器。
public MyStateStore(ILogger<MyStateStore> logger)
{
// ...
}
// ...
}
由于 .NET 插件组件是基于 ASP.NET 构建的,它们可以使用其标准配置机制,并默认使用相同的一组预注册提供者。DaprPluggableComponentsApplication
提供了一个 IConfigurationManager
,可以通过它进行配置。
var app = DaprPluggableComponentsApplication.Create();
// 清除默认配置提供者并添加新的。
((IConfigurationBuilder)app.Configuration).Sources.Clear();
app.Configuration.AddEnvironmentVariables();
// 在启动时获取配置值。
const value = app.Configuration["<name>"];
app.RegisterService(
"<service name>",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<MyStateStore>();
});
app.Run();
class MyStateStore : IStateStore
{
// 在创建 state 存储时注入配置。
public MyStateStore(IConfiguration configuration)
{
// ...
}
// ...
}
在 .NET Dapr 中,注册组件有两种方式:
按类型注册的组件将作为单例运行:一个实例将为与该 socket 关联的所有配置组件提供服务。当仅存在一个该类型的组件并在 Dapr 应用程序之间共享时,这种方法是最佳选择。
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"service-a",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore<SingletonStateStore>();
});
app.Run();
class SingletonStateStore : IStateStore
{
// ...
}
可以通过传递“工厂方法”来注册组件。对于与该 socket 关联的每个配置组件,该方法将被调用。该方法返回要与该组件关联的实例(无论是否共享)。当多个相同类型的组件可能配置有不同的元数据集时,或者当组件操作需要彼此隔离时,这种方法是最佳选择。
工厂方法会接收上下文信息,例如配置的 Dapr 组件的 ID,这些信息可用于区分不同的组件实例。
var app = DaprPluggableComponentsApplication.Create();
app.RegisterService(
"service-a",
serviceBuilder =>
{
serviceBuilder.RegisterStateStore(
context =>
{
return new MultiStateStore(context.InstanceId);
});
});
app.Run();
class MultiStateStore : IStateStore
{
private readonly string instanceId;
public MultiStateStore(string instanceId)
{
this.instanceId = instanceId;
}
// ...
}
Dapr 提供了一些工具包,帮助开发者创建 Go 可插拔组件。
要创建一个可插拔组件,首先需要一个空的 Go 应用程序。
mkdir example
cd component
go mod init example
导入 Dapr 可插拔组件 SDK 包。
go get github.com/dapr-sandbox/components-go-sdk@v0.1.0
在 main.go
中,导入 Dapr 可插拔组件包并运行应用程序。
package main
import (
dapr "github.com/dapr-sandbox/components-go-sdk"
)
func main() {
dapr.MustRun()
}
这会创建一个没有组件的应用程序,您需要实现并注册一个或多个组件。
Dapr 通过 Unix 域套接字文件在一个公共目录中与可插拔组件通信。默认情况下,Dapr 和可插拔组件都使用 /tmp/dapr-components-sockets
目录。如果该目录尚不存在,您应该创建它。
mkdir /tmp/dapr-components-sockets
可以通过在命令行启动应用程序来测试可插拔组件。
要启动组件,在应用程序目录中:
go run main.go
要配置 Dapr 使用组件,请在资源目录中创建一个组件 YAML 文件。例如,对于一个状态存储组件:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: <component name>
spec:
type: state.<socket name>
version: v1
metadata:
- name: key1
value: value1
- name: key2
value: value2
任何 metadata
属性将在组件实例化时通过其 Store.Init(metadata state.Metadata)
方法传递给组件。
要启动 Dapr(以及可选的使用该服务的服务):
dapr run --app-id <app id> --resources-path <resources path> ...
此时,Dapr sidecar 将已启动并通过 Unix 域套接字连接到组件。然后,您可以通过以下方式与组件交互:
可插拔组件作为容器部署,作为应用程序的 sidecar 运行(如同 Dapr 本身)。一个典型的用于创建 Go 应用程序 Docker 镜像的 Dockerfile
可能如下所示:
FROM golang:1.20-alpine AS builder
WORKDIR /usr/src/app
# 下载依赖
COPY go.mod go.sum ./
RUN go mod download && go mod verify
# 构建应用程序
COPY . .
RUN go build -v -o /usr/src/bin/app .
FROM alpine:latest
# 设置非 root 用户和权限
RUN addgroup -S app && adduser -S app -G app
RUN mkdir /tmp/dapr-components-sockets && chown app /tmp/dapr-components-sockets
# 将应用程序复制到运行时镜像
COPY --from=builder --chown=app /usr/src/bin/app /app
USER app
CMD ["/app"]
构建镜像:
docker build -f Dockerfile -t <image name>:<tag> .
Dockerfile
中 COPY
操作的路径是相对于构建镜像时传递的 Docker 上下文的,而 Docker 上下文本身会根据所构建应用程序的需求而有所不同。在上面的示例中,假设 Docker 上下文是组件应用程序目录。创建一个 pub/sub 组件只需几个基本步骤。
创建文件 components/pubsub.go
并添加 import
语句以导入与 pub/sub 相关的包。
package components
import (
"context"
"github.com/dapr/components-contrib/pubsub"
)
PubSub
接口创建一个实现 PubSub
接口的类型。
type MyPubSubComponent struct {
}
func (component *MyPubSubComponent) Init(metadata pubsub.Metadata) error {
// 使用配置的元数据初始化组件...
}
func (component *MyPubSubComponent) Close() error {
// 不用于可插拔组件...
return nil
}
func (component *MyPubSubComponent) Features() []pubsub.Feature {
// 返回组件支持的功能列表...
}
func (component *MyPubSubComponent) Publish(req *pubsub.PublishRequest) error {
// 将消息发送到 "topic"...
}
func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
// 设置一个长时间运行的机制来检索消息,直到取消为止,并将其传递给 Dapr 运行时...
}
调用 Subscribe()
方法时,需要设置一个长时间运行的机制来检索消息,并立即返回 nil
(如果无法设置该机制,则返回错误)。该机制应在取消时结束(例如,通过 ctx.Done()
或 ctx.Err() != nil
)。消息的 “topic” 是通过 req
参数传递的,而传递给 Dapr 运行时的消息则通过 handler
回调来处理。回调在应用程序(由 Dapr 运行时服务)确认处理消息之前不会返回。
func (component *MyPubSubComponent) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
go func() {
for {
if ctx.Err() != nil {
return
}
messages := // 轮询消息...
for _, message := range messages {
handler(ctx, &pubsub.NewMessage{
// 设置消息内容...
})
}
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
}
}
}()
return nil
}
在主应用程序文件中(例如,main.go
),注册 pub/sub 组件。
package main
import (
"example/components"
dapr "github.com/dapr-sandbox/components-go-sdk"
"github.com/dapr-sandbox/components-go-sdk/pubsub/v1"
)
func main() {
dapr.Register("<socket name>", dapr.WithPubSub(func() pubsub.PubSub {
return &components.MyPubSubComponent{}
}))
dapr.MustRun()
}
创建绑定组件只需几个基本步骤。
创建文件 components/inputbinding.go
并添加与状态存储相关的包的 import
语句。
package components
import (
"context"
"github.com/dapr/components-contrib/bindings"
)
InputBinding
接口创建一个实现 InputBinding
接口的类型。
type MyInputBindingComponent struct {
}
func (component *MyInputBindingComponent) Init(meta bindings.Metadata) error {
// 用于初始化组件的配置元数据...
}
func (component *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
// 设置一个长期机制来检索消息,直到取消为止...
}
调用 Read()
方法时,应该设置一个长期运行的机制来检索消息,并立即返回 nil
(如果无法设置该机制,则返回错误)。当取消时(例如,通过 ctx.Done()
或 ctx.Err() != nil
),该机制应停止。当从组件的底层存储读取消息时,它们通过 handler
回调传递给Dapr运行时,直到应用程序(由Dapr运行时服务)确认消息处理后才返回。
func (b *MyInputBindingComponent) Read(ctx context.Context, handler bindings.Handler) error {
go func() {
for {
if ctx.Err() != nil {
return
}
messages := // 轮询消息...
for _, message := range messages {
handler(ctx, &bindings.ReadResponse{
// 设置消息内容...
})
}
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
}
}
}()
return nil
}
OutputBinding
接口创建一个实现 OutputBinding
接口的类型。
type MyOutputBindingComponent struct {
}
func (component *MyOutputBindingComponent) Init(meta bindings.Metadata) error {
// 用于初始化组件的配置元数据...
}
func (component *MyOutputBindingComponent) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
// 调用特定操作时执行...
}
func (component *MyOutputBindingComponent) Operations() []bindings.OperationKind {
// 列出可以调用的操作。
}
一个组件可以同时作为输入和输出绑定。只需实现两个接口,并将组件注册为两种绑定类型即可。
在主应用程序文件中(例如,main.go
),将绑定组件注册到应用程序中。
package main
import (
"example/components"
dapr "github.com/dapr-sandbox/components-go-sdk"
"github.com/dapr-sandbox/components-go-sdk/bindings/v1"
)
func main() {
// 注册一个输入绑定...
dapr.Register("my-inputbinding", dapr.WithInputBinding(func() bindings.InputBinding {
return &components.MyInputBindingComponent{}
}))
// 注册一个输出绑定...
dapr.Register("my-outputbinding", dapr.WithOutputBinding(func() bindings.OutputBinding {
return &components.MyOutputBindingComponent{}
}))
dapr.MustRun()
}
创建状态存储组件只需几个基本步骤。
创建文件 components/statestore.go
并添加与状态存储相关的包的 import
语句。
package components
import (
"context"
"github.com/dapr/components-contrib/state"
)
Store
接口创建一个实现 Store
接口的类型。
type MyStateStore struct {
}
func (store *MyStateStore) Init(metadata state.Metadata) error {
// 使用配置的元数据初始化组件...
}
func (store *MyStateStore) GetComponentMetadata() map[string]string {
// 不用于可插拔组件...
return map[string]string{}
}
func (store *MyStateStore) Features() []state.Feature {
// 返回状态存储支持的功能列表...
}
func (store *MyStateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
// 从状态存储中删除请求的键...
}
func (store *MyStateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
// 从状态存储中获取请求的键值,否则返回空响应...
}
func (store *MyStateStore) Set(ctx context.Context, req *state.SetRequest) error {
// 在状态存储中将请求的键设置为指定的值...
}
func (store *MyStateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// 从状态存储中获取请求的键值...
}
func (store *MyStateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
// 从状态存储中删除请求的键...
}
func (store *MyStateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
// 在状态存储中将请求的键设置为其指定的值...
}
在主应用程序文件(例如,main.go
)中,将状态存储注册到应用程序服务中。
package main
import (
"example/components"
dapr "github.com/dapr-sandbox/components-go-sdk"
"github.com/dapr-sandbox/components-go-sdk/state/v1"
)
func main() {
dapr.Register("<socket name>", dapr.WithStateStore(func() state.Store {
return &components.MyStateStoreComponent{}
}))
dapr.MustRun()
}
虽然状态存储需要支持批量操作,但它们的实现会顺序委托给各个操作方法。
如果状态存储计划支持事务,则应实现可选的 TransactionalStore
接口。其 Multi()
方法接收一个包含一系列 delete
和/或 set
操作的请求,以在事务中执行。状态存储应遍历序列并应用每个操作。
func (store *MyStateStoreComponent) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
// 开始事务...
for _, operation := range request.Operations {
switch operation.Operation {
case state.Delete:
deleteRequest := operation.Request.(state.DeleteRequest)
// 处理删除请求...
case state.Upsert:
setRequest := operation.Request.(state.SetRequest)
// 处理设置请求...
}
}
// 结束(或回滚)事务...
return nil
}
如果状态存储计划支持查询,则应实现可选的 Querier
接口。其 Query()
方法传递有关查询的详细信息,例如过滤器、结果限制、分页和结果的排序顺序。状态存储使用这些详细信息生成一组值作为响应的一部分返回。
func (store *MyStateStoreComponent) Query(ctx context.Context, req *state.QueryRequest) (*state.QueryResponse, error) {
// 生成并返回结果...
}
Dapr 运行时对某些状态存储操作导致的特定错误条件有额外的处理。状态存储可以通过从其操作逻辑中返回特定错误来指示这些条件:
错误 | 适用操作 | 描述 |
---|---|---|
NewETagError(state.ETagInvalid, ...) | Delete, Set, Bulk Delete, Bulk Set | 当 ETag 无效时 |
NewETagError(state.ETagMismatch, ...) | Delete, Set, Bulk Delete, Bulk Set | 当 ETag 与预期值不匹配时 |
NewBulkDeleteRowMismatchError(...) | Bulk Delete | 当受影响的行数与预期行数不匹配时 |
尽管大多数情况下不需要使用这些高级配置方法,但本指南将展示如何在 Go 中配置 Dapr 的可插拔组件。
可插拔组件通过传递一个“工厂方法”进行注册,该方法会在每个与 socket 关联的 Dapr 组件配置中被调用。这个方法返回与该 Dapr 组件相关联的实例(无论是否共享)。这使得多个相同类型的 Dapr 组件可以使用不同的元数据集进行配置,尤其是在需要组件操作相互隔离的情况下。
每次调用 Register()
都会将一个 socket 绑定到一个注册的可插拔组件。每种组件类型(输入/输出绑定、pub/sub 和状态存储)可以在每个 socket 上注册一个。
func main() {
dapr.Register("service-a", dapr.WithStateStore(func() state.Store {
return &components.MyDatabaseStoreComponent{}
}))
dapr.Register("service-a", dapr.WithOutputBinding(func() bindings.OutputBinding {
return &components.MyDatabaseOutputBindingComponent{}
}))
dapr.Register("service-b", dapr.WithStateStore(func() state.Store {
return &components.MyDatabaseStoreComponent{}
}))
dapr.MustRun()
}
在上面的示例中,状态存储和输出绑定被注册到 socket service-a
,而另一个状态存储被注册到 socket service-b
。
配置 Dapr 使用托管组件与配置单个组件的方式相同 - 组件的 YAML 文件中需要指明关联的 socket。例如,要为上面注册的两个组件(分别注册到 socket service-a
和 service-b
)配置 Dapr 状态存储,您需要创建两个配置文件,每个文件引用其各自的 socket。
#
# 此组件使用与 socket `service-a` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-a
spec:
type: state.service-a
version: v1
metadata: []
#
# 此组件使用与 socket `service-b` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: state-store-b
spec:
type: state.service-b
version: v1
metadata: []