This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

开始使用 Dapr 可插拔组件 .NET SDK

如何使用 Dapr 可插拔组件 .NET SDK 快速上手

Dapr 提供了用于开发 .NET 可插拔组件的 NuGet 包。

前提条件

创建项目

要创建一个可插拔组件,首先从一个空的 ASP.NET 项目开始。

dotnet new web --name <project name>

添加 NuGet 包

添加 Dapr .NET 可插拔组件的 NuGet 包。

dotnet add package Dapr.PluggableComponents.AspNetCore

创建应用程序和服务

创建 Dapr 可插拔组件应用程序类似于创建 ASP.NET 应用程序。在 Program.cs 中,将 WebApplication 相关代码替换为 Dapr DaprPluggableComponentsApplication 的等效代码。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        // 使用此服务注册一个或多个组件。
    });

app.Run();

这将创建一个包含单个服务的应用程序。每个服务:

  • 对应一个 Unix 域套接字
  • 可以托管一个或多个组件类型

实现和注册组件

本地测试组件

可插拔组件可以通过在命令行启动应用程序并配置一个 Dapr sidecar 来进行测试。

要启动组件,在应用程序目录中:

dotnet run

要配置 Dapr 使用该组件,在资源路径目录中:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: <component name>
spec:
  type: state.<socket name>
  version: v1
  metadata:
  - name: key1
    value: value1
  - name: key2
    value: value2

任何 metadata 属性将在组件实例化时通过其 IPluggableComponent.InitAsync() 方法传递给组件。

要启动 Dapr(以及可选地使用该服务的服务):

dapr run --app-id <app id> --resources-path <resources path> ...

此时,Dapr sidecar 将已启动并通过 Unix 域套接字连接到组件。然后您可以通过以下方式与组件交互:

  • 通过使用该组件的服务(如果已启动),或
  • 直接使用 Dapr HTTP 或 gRPC API

创建容器

有几种方法可以为您的组件创建容器以便最终部署。

使用 .NET SDK

.NET 7 及更高版本的 SDK 允许您为应用程序创建基于 .NET 的容器 无需 Dockerfile,即使是针对早期版本的 .NET SDK。这可能是目前为您的组件生成容器的最简单方法。

Microsoft.NET.Build.Containers NuGet 包添加到组件项目中。

dotnet add package Microsoft.NET.Build.Containers

将应用程序发布为容器:

dotnet publish --os linux --arch x64 /t:PublishContainer -c Release

有关更多配置选项,例如控制容器名称、标签和基础镜像,请参阅 .NET 作为容器发布指南

使用 Dockerfile

虽然有工具可以为 .NET 应用程序生成 Dockerfile,但 .NET SDK 本身并不提供。一个典型的 Dockerfile 可能如下所示:

FROM mcr.microsoft.com/dotnet/aspnet:<runtime> AS base
WORKDIR /app

# 创建一个具有显式 UID 的非 root 用户,并添加访问 /app 文件夹的权限
# 更多信息,请参阅 https://aka.ms/vscode-docker-dotnet-configure-containers
RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app
USER appuser

FROM mcr.microsoft.com/dotnet/sdk:<runtime> AS build
WORKDIR /src
COPY ["<application>.csproj", "<application folder>/"]
RUN dotnet restore "<application folder>/<application>.csproj"
COPY . .
WORKDIR "/src/<application folder>"
RUN dotnet build "<application>.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "<application>.csproj" -c Release -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "<application>.dll"]

构建镜像:

docker build -f Dockerfile -t <image name>:<tag> .

1 - 实现 .NET 输入/输出绑定组件

如何使用 Dapr 可插拔组件 .NET SDK 创建输入/输出绑定

创建绑定组件只需几个基本步骤。

添加绑定相关的命名空间

为绑定相关的命名空间添加 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.Bindings;

输入绑定:实现 IInputBinding

创建一个实现 IInputBinding 接口的类。

internal sealed class MyBinding : IInputBinding
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        // 在取消之前,检查底层存储中的消息并将其传递给 Dapr 运行时...
    }
}

ReadAsync() 方法的调用是“长时间运行”的,因为在取消之前不期望返回(例如,通过 cancellationToken)。当从组件的底层存储中读取消息时,它们通过 deliveryHandler 回调传递给 Dapr 运行时。这样,组件可以在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。

    public async Task ReadAsync(MessageDeliveryHandler<InputBindingReadRequest, InputBindingReadResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        TimeSpan pollInterval = // 轮询间隔(例如,从初始化元数据中获取)...

        // 在取消之前轮询底层存储...
        while (!cancellationToken.IsCancellationRequested)
        {
            var messages = // 从底层存储中轮询消息...

            foreach (var message in messages)
            {
                // 将消息传递给 Dapr 运行时...
                await deliveryHandler(
                    new InputBindingReadResponse
                    {
                        // 设置消息内容...
                    },
                    // 当应用程序确认消息时调用的回调...
                    async request =>
                    {
                        // 处理响应数据或错误消息...
                    });
            }

            // 等待下次轮询(或取消)...
            await Task.Delay(pollInterval, cancellationToken);
        }
    }

输出绑定:实现 IOutputBinding

创建一个实现 IOutputBinding 接口的类。

internal sealed class MyBinding : IOutputBinding
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task<OutputBindingInvokeResponse> InvokeAsync(OutputBindingInvokeRequest request, CancellationToken cancellationToken = default)
    {
        // 执行特定操作...
    }

    public Task<string[]> ListOperationsAsync(CancellationToken cancellationToken = default)
    {
        // 列出可以调用的操作。
    }
}

输入和输出绑定组件

一个组件可以同时是输入和输出绑定,只需实现这两个接口即可。

internal sealed class MyBinding : IInputBinding, IOutputBinding
{
    // IInputBinding 实现...

    // IOutputBinding 实现...
}

注册绑定组件

在主程序文件中(例如,Program.cs),在应用程序服务中注册绑定组件。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterBinding<MyBinding>();
    });

app.Run();

2 - 实现 .NET 发布/订阅组件

如何使用 Dapr 可插拔组件 .NET SDK 创建发布/订阅

创建发布/订阅组件只需几个基本步骤。

添加发布/订阅命名空间

添加与发布/订阅相关的命名空间的 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.PubSub;

实现 IPubSub

创建一个实现 IPubSub 接口的类。

internal sealed class MyPubSub : IPubSub
{
    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task PublishAsync(PubSubPublishRequest request, CancellationToken cancellationToken = default)
    {
        // 将消息发送到指定的“主题”...
    }

    public Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        // 持续检查主题中的消息并将其传递给 Dapr 运行时,直到取消为止...
    }
}

PullMessagesAsync() 方法是一个“长时间运行”的调用,因为在取消之前不期望返回(例如,通过 cancellationToken)。需要从中提取消息的“主题”通过 topic 参数传递,而传递到 Dapr 运行时是通过 deliveryHandler 回调执行的。传递机制允许组件在应用程序(由 Dapr 运行时服务)确认消息处理时接收通知。

    public async Task PullMessagesAsync(PubSubPullMessagesTopic topic, MessageDeliveryHandler<string?, PubSubPullMessagesResponse> deliveryHandler, CancellationToken cancellationToken = default)
    {
        TimeSpan pollInterval = // 轮询间隔(可以从初始化元数据中获取)...

        // 持续轮询主题直到取消...
        while (!cancellationToken.IsCancellationRequested)
        {
            var messages = // 从主题中轮询获取消息...

            foreach (var message in messages)
            {
                // 将消息传递给 Dapr 运行时...
                await deliveryHandler(
                    new PubSubPullMessagesResponse(topicName)
                    {
                        // 设置消息内容...
                    },
                    // 当应用程序确认消息时调用的回调...
                    async errorMessage =>
                    {
                        // 空消息表示应用程序成功处理了消息...
                        if (String.IsNullOrEmpty(errorMessage))
                        {
                            // 从主题中删除消息...
                        }
                    });
            }

            // 等待下一个轮询(或取消)...
            await Task.Delay(pollInterval, cancellationToken);
        }
    }

注册发布/订阅组件

在主程序文件中(例如,Program.cs),使用应用程序服务注册发布/订阅组件。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterPubSub<MyPubSub>();
    });

app.Run();

3 - 实现 .NET 状态存储组件

如何使用 Dapr 可插拔组件 .NET SDK 创建状态存储

创建状态存储组件只需几个基本步骤。

添加状态存储命名空间

为状态存储相关的命名空间添加 using 语句。

using Dapr.PluggableComponents.Components;
using Dapr.PluggableComponents.Components.StateStore;

实现 IStateStore

创建一个实现 IStateStore 接口的类。

internal sealed class MyStateStore : IStateStore
{
    public Task DeleteAsync(StateStoreDeleteRequest request, CancellationToken cancellationToken = default)
    {
        // 从状态存储中删除请求的键...
    }

    public Task<StateStoreGetResponse?> GetAsync(StateStoreGetRequest request, CancellationToken cancellationToken = default)
    {
        // 从状态存储中获取请求的键值,否则返回 null...
    }

    public Task InitAsync(MetadataRequest request, CancellationToken cancellationToken = default)
    {
        // 使用配置的元数据初始化组件...
    }

    public Task SetAsync(StateStoreSetRequest request, CancellationToken cancellationToken = default)
    {
        // 在状态存储中将请求的键设置为指定的值...
    }
}

注册状态存储组件

在主程序文件中(如 Program.cs),通过应用服务注册状态存储。

using Dapr.PluggableComponents;

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "<socket name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

支持批量操作的状态存储

如果状态存储打算支持批量操作,应实现可选的 IBulkStateStore 接口。其方法与基础 IStateStore 接口的方法相似,但包含多个请求的值。

internal sealed class MyStateStore : IStateStore, IBulkStateStore
{
    // ...

    public Task BulkDeleteAsync(StateStoreDeleteRequest[] requests, CancellationToken cancellationToken = default)
    {
        // 从状态存储中删除所有请求的值...
    }

    public Task<StateStoreBulkStateItem[]> BulkGetAsync(StateStoreGetRequest[] requests, CancellationToken cancellationToken = default)
    {
        // 返回状态存储中所有请求的值...
    }

    public Task BulkSetAsync(StateStoreSetRequest[] requests, CancellationToken cancellationToken = default)
    {
        // 在状态存储中设置所有请求键的值...
    }
}

事务性状态存储

如果状态存储打算支持事务,应实现可选的 ITransactionalStateStore 接口。其 TransactAsync() 方法接收一个请求,其中包含要在事务中执行的删除和/或设置操作序列。状态存储应遍历这些操作,并调用每个操作的 Visit() 方法,传递相应的回调以处理每种操作类型。

internal sealed class MyStateStore : IStateStore, ITransactionalStateStore
{
    // ...

    public async Task TransactAsync(StateStoreTransactRequest request, CancellationToken cancellationToken = default)
    {
        // 开始事务...

        try
        {
            foreach (var operation in request.Operations)
            {
                await operation.Visit(
                    async deleteRequest =>
                    {
                        // 处理删除请求...

                    },
                    async setRequest =>
                    {
                        // 处理设置请求...
                    });
            }
        }
        catch
        {
            // 回滚事务...

            throw;
        }

        // 提交事务...
    }
}

可查询状态存储

如果状态存储打算支持查询,应实现可选的 IQueryableStateStore 接口。其 QueryAsync() 方法接收有关查询的详细信息,例如过滤器、结果限制和分页,以及结果的排序顺序。状态存储应使用这些详细信息生成一组值并作为响应的一部分返回。

internal sealed class MyStateStore : IStateStore, IQueryableStateStore
{
    // ...

    public Task<StateStoreQueryResponse> QueryAsync(StateStoreQueryRequest request, CancellationToken cancellationToken = default)
    {
        // 生成并返回结果...
    }
}

ETag 和其他语义错误处理

Dapr 运行时对某些状态存储操作导致的特定错误条件有额外的处理。状态存储可以通过从其操作逻辑中抛出特定异常来指示这些条件:

异常适用操作描述
ETagInvalidException删除、设置、批量删除、批量设置当 ETag 无效时
ETagMismatchException删除、设置、批量删除、批量设置当 ETag 与预期值不匹配时
BulkDeleteRowMismatchException批量删除当受影响的行数与预期行数不匹配时

4 - .NET SDK 的 Dapr 可插拔组件高级用法

如何在 Dapr 可插拔组件 .NET SDK 中使用高级技术

尽管大多数情况下不需要,但这些指南提供了配置 .NET 可插拔组件的高级方法。

4.1 - 在 .NET Dapr 可插拔组件中使用多个服务

如何从 .NET 可插拔组件中暴露多个服务

一个可插拔组件可以托管多种类型的组件。您可能会这样做:

  • 以减少集群中运行的sidecar数量
  • 以便将可能共享库和实现的相关组件进行分组,例如:
    • 一个既作为通用状态存储又作为
    • 允许更具体操作的输出绑定。

每个Unix域套接字可以管理对每种类型的一个组件的调用。要托管多个相同类型的组件,您可以将这些类型分布在多个套接字上。SDK将每个套接字绑定到一个“服务”,每个服务由一个或多个组件类型组成。

注册多个服务

每次调用RegisterService()都会将一个套接字绑定到一组注册的组件,其中每种类型的组件每个服务可以注册一个。

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "service-a",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyDatabaseStateStore>();
        serviceBuilder.RegisterBinding<MyDatabaseOutputBinding>();
    });

app.RegisterService(
    "service-b",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<AnotherStateStore>();
    });

app.Run();

class MyDatabaseStateStore : IStateStore
{
    // ...
}

class MyDatabaseOutputBinding : IOutputBinding
{
    // ...
}

class AnotherStateStore : IStateStore
{
    // ...
}

配置多个组件

配置Dapr以使用托管组件与任何单个组件相同 - 组件YAML引用关联的套接字。

#
# 此组件使用与套接字 `state-store-a` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-a
spec:
  type: state.service-a
  version: v1
  metadata: []
#
# 此组件使用与套接字 `state-store-b` 关联的状态存储
#
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: state-store-b
spec:
  type: state.service-b
  version: v1
  metadata: []

4.2 - .NET Dapr 插件组件的应用环境配置

如何配置 .NET 插件组件的环境

.NET Dapr 插件组件应用可以配置依赖注入、日志记录和配置值,类似于 ASP.NET 应用。DaprPluggableComponentsApplication 提供了一组与 WebApplicationBuilder 类似的配置属性。

依赖注入

注册到服务的组件可以参与依赖注入。组件构造函数中的参数会在创建时被注入,前提是这些类型已在应用中注册。你可以通过 DaprPluggableComponentsApplication 提供的 IServiceCollection 来注册它们。

var app = DaprPluggableComponentsApplication.Create();

// 将 MyService 注册为 IService 的单例实现。
app.Services.AddSingleton<IService, MyService>();

app.RegisterService(
    "<service name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

interface IService
{
    // ...
}

class MyService : IService
{
    // ...
}

class MyStateStore : IStateStore
{
    // 在创建 state 存储时注入 IService。
    public MyStateStore(IService service)
    {
        // ...
    }

    // ...
}

日志记录

.NET Dapr 插件组件可以使用标准 .NET 日志机制DaprPluggableComponentsApplication 提供了一个 ILoggingBuilder,可以通过它进行配置。

var app = DaprPluggableComponentsApplication.Create();

// 清除默认日志记录器并添加新的。
app.Logging.ClearProviders();
app.Logging.AddConsole();

app.RegisterService(
    "<service name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

class MyStateStore : IStateStore
{
    // 在创建 state 存储时注入日志记录器。
    public MyStateStore(ILogger<MyStateStore> logger)
    {
        // ...
    }

    // ...
}

配置值

由于 .NET 插件组件是基于 ASP.NET 构建的,它们可以使用其标准配置机制,并默认使用相同的一组预注册提供者DaprPluggableComponentsApplication 提供了一个 IConfigurationManager,可以通过它进行配置。

var app = DaprPluggableComponentsApplication.Create();

// 清除默认配置提供者并添加新的。
((IConfigurationBuilder)app.Configuration).Sources.Clear();
app.Configuration.AddEnvironmentVariables();

// 在启动时获取配置值。
const value = app.Configuration["<name>"];

app.RegisterService(
    "<service name>",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<MyStateStore>();
    });

app.Run();

class MyStateStore : IStateStore
{
    // 在创建 state 存储时注入配置。
    public MyStateStore(IConfiguration configuration)
    {
        // ...
    }

    // ...
}

4.3 - .NET Dapr 可插拔组件的生命周期

如何控制 .NET 可插拔组件的生命周期

在 .NET Dapr 中,注册组件有两种方式:

  • 组件作为单例运行,其生命周期由 SDK 管理
  • 组件的生命周期由可插拔组件决定,可以是多实例或单例,视需要而定

单例组件

按类型注册的组件将作为单例运行:一个实例将为与该 socket 关联的所有配置组件提供服务。当仅存在一个该类型的组件并在 Dapr 应用程序之间共享时,这种方法是最佳选择。

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "service-a",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore<SingletonStateStore>();
    });

app.Run();

class SingletonStateStore : IStateStore
{
    // ...
}

多实例组件

可以通过传递“工厂方法”来注册组件。对于与该 socket 关联的每个配置组件,该方法将被调用。该方法返回要与该组件关联的实例(无论是否共享)。当多个相同类型的组件可能配置有不同的元数据集时,或者当组件操作需要彼此隔离时,这种方法是最佳选择。

工厂方法会接收上下文信息,例如配置的 Dapr 组件的 ID,这些信息可用于区分不同的组件实例。

var app = DaprPluggableComponentsApplication.Create();

app.RegisterService(
    "service-a",
    serviceBuilder =>
    {
        serviceBuilder.RegisterStateStore(
            context =>
            {
                return new MultiStateStore(context.InstanceId);
            });
    });

app.Run();

class MultiStateStore : IStateStore
{
    private readonly string instanceId;

    public MultiStateStore(string instanceId)
    {
        this.instanceId = instanceId;
    }

    // ...
}