This is the multi-page printable view of this section. Click here to print.
Actors
- 1: Actors 概述
- 2: Actor 的运行时特性
- 3: actor 运行时配置参数
- 4: 命名空间中的actor
- 5: actor 定时器和提醒
- 6: 如何启用actor提醒分区
- 7: 操作指南:使用脚本与虚拟actor交互
- 8: 如何:在Dapr中启用和使用actor重入
1 - Actors 概述
Actor 模型将 actor 描述为“计算的基本单位”。换句话说,您可以将代码编写在一个自包含的单元中(称为 actor),该单元接收消息并一次处理一条消息,而无需任何形式的并发或线程。
当您的代码处理一条消息时,它可以向其他 actor 发送一条或多条消息,或创建新的 actor。底层运行时管理每个 actor 的运行方式、时间和位置,并在 actor 之间路由消息。
大量的 actor 可以同时执行,并且 actor 彼此独立执行。
Dapr 中的 Actor 模型
Dapr 包含一个专门实现虚拟 Actor 模型的运行时。通过 Dapr 的实现,您可以根据 Actor 模型编写 Dapr actor,Dapr 利用底层平台提供的可扩展性和可靠性保证。
每个 actor 都被定义为 actor 类型的一个实例,与对象是类的一个实例相同。例如,可能有一个实现计算器功能的 actor 类型,并且可能有许多此类型的 actor 分布在集群的各个节点上。每个这样的 actor 都由一个 actor ID 唯一标识。

以下概述视频和演示展示了 Dapr 中的 actor 如何工作。
Dapr Actors 与 Dapr Workflow
Dapr actors 基于状态管理和服务调用 API 创建具有身份的有状态、长时间运行的对象。Dapr Workflow 和 Dapr Actors 是相关的,workflow 基于 actor 提供更高层次的抽象来编排一组 actor,实现常见的 workflow 模式并代表您管理 actor 的生命周期。
Dapr actors 旨在提供一种在分布式系统中封装状态和行为的方法。actor 可以按需由客户端应用程序激活。当 actor 被激活时,它被分配一个唯一的身份,这使得它能够在多次调用中保持其状态。这使得 actor 在构建有状态、可扩展和容错的分布式应用程序时非常有用。
另一方面,Dapr Workflow 提供了一种定义和编排涉及多个服务和组件的复杂 workflow 的方法。workflow 允许您定义需要按特定顺序执行的一系列步骤或任务,并可用于实现业务流程、事件驱动的 workflow 和其他类似场景。
如上所述,Dapr Workflow 基于 Dapr Actors 管理其激活和生命周期。
何时使用 Dapr Actors
与任何其他技术决策一样,您应该根据要解决的问题来决定是否使用 actor。例如,如果您正在构建一个聊天应用程序,您可能会使用 Dapr actors 来实现聊天室和用户之间的单个聊天会话,因为每个聊天会话都需要维护自己的状态并且具有可扩展性和容错性。
一般来说,如果您的问题空间涉及大量(数千个或更多)小型、独立和隔离的状态和逻辑单元,可以考虑使用 actor 模式来建模您的问题或场景。
- 您的问题空间涉及大量(数千个或更多)小型、独立和隔离的状态和逻辑单元。
- 您希望使用单线程对象,这些对象不需要外部组件的显著交互,包括跨一组 actor 查询状态。
- 您的 actor 实例不会通过发出 I/O 操作来阻塞调用者,导致不可预测的延迟。
何时使用 Dapr Workflow
当您需要定义和编排涉及多个服务和组件的复杂 workflow 时,您可以使用 Dapr Workflow。例如,使用前面提到的聊天应用程序示例,您可能会使用 Dapr Workflow 来定义应用程序的整体 workflow,例如如何注册新用户、如何发送和接收消息以及应用程序如何处理错误和异常。
了解有关 Dapr Workflow 的更多信息以及如何在应用程序中使用 workflow。
Actor 类型和 Actor ID
actor 被唯一定义为 actor 类型的一个实例,类似于对象是类的一个实例。例如,您可能有一个实现计算器功能的 actor 类型。可能有许多此类型的 actor 分布在集群的各个节点上。
每个 actor 都由一个 actor ID 唯一标识。actor ID 可以是您选择的任何字符串值。如果您不提供 actor ID,Dapr 会为您生成一个随机字符串作为 ID。
功能
命名空间化的 Actors
Dapr 支持命名空间化的 actor。actor 类型可以部署到不同的命名空间中。您可以在同一命名空间中调用这些 actor 的实例。
Actor 生命周期
由于 Dapr actors 是虚拟的,因此不需要显式创建或销毁。Dapr actor 运行时:
- 一旦收到该 actor ID 的初始请求,就会自动激活 actor。
- 垃圾收集未使用的 actor 的内存对象。
- 维护 actor 的存在信息,以防它稍后被重新激活。
actor 的状态超出了对象的生命周期,因为状态存储在为 Dapr 运行时配置的状态提供者中。
分布和故障转移
为了提供可扩展性和可靠性,actor 实例在整个集群中分布,Dapr 在整个集群中分布 actor 实例,并自动将它们迁移到健康的节点。
Actor 通信
您可以通过 HTTP 调用 actor 方法,如下面的通用示例所示。

- 服务调用 sidecar 上的 actor API。
- 使用来自放置服务的缓存分区信息,sidecar 确定哪个 actor 服务实例将托管 actor ID 3。调用被转发到适当的 sidecar。
- pod 2 中的 sidecar 实例调用服务实例以调用 actor 并执行 actor 方法。
并发
Dapr actor 运行时为访问 actor 方法提供了一个简单的轮流访问模型。轮流访问极大地简化了并发系统,因为不需要同步机制来进行数据访问。
状态
事务性状态存储可以用于存储 actor 状态。无论您是否打算在 actor 中存储任何状态,您都必须在状态存储组件的元数据部分中将属性 actorStateStore
的值指定为 true
。actor 状态以特定方案存储在事务性状态存储中,允许进行一致的查询。所有 actor 只能使用单个状态存储组件作为状态存储。阅读状态 API 参考和actors API 参考以了解有关 actor 状态存储的更多信息。
Actor 定时器和提醒
actor 可以通过注册定时器或提醒来安排定期工作。
定时器和提醒的功能非常相似。主要区别在于 Dapr actor 运行时在停用后不保留有关定时器的任何信息,而是使用 Dapr actor 状态提供者持久化有关提醒的信息。
这种区别允许用户在轻量级但无状态的定时器与更耗资源但有状态的提醒之间进行权衡。
以下概述视频和演示展示了 actor 定时器和提醒如何工作。
下一步
Actors 功能和概念 >>相关链接
2 - Actor 的运行时特性
在您已经从高层次上了解了 Actor 构建块之后,让我们深入探讨 Dapr 中 Actor 的特性和概念。
Actor 的生命周期
Dapr 中的 Actor 是虚拟的,这意味着它们的生命周期与内存中的表示无关。因此,不需要显式地创建或销毁它们。Dapr 的 Actor 运行时会在首次收到某个 Actor ID 的请求时自动激活该 Actor。如果某个 Actor 在一段时间内未被使用,Dapr 的 Actor 运行时会对其进行垃圾回收,但会保留其存在的信息,以便在需要时重新激活。
调用 Actor 方法、定时器和提醒会重置 Actor 的空闲时间。例如,提醒的触发会保持 Actor 的活跃状态。
- Actor 的提醒会在无论其活跃与否的情况下触发。如果提醒触发了一个不活跃的 Actor,它会先激活该 Actor。
- Actor 的定时器触发会重置空闲时间;然而,定时器仅在 Actor 活跃时触发。
Dapr 运行时用于判断 Actor 是否可以被垃圾回收的空闲超时和扫描间隔是可配置的。当 Dapr 运行时调用 Actor 服务以获取支持的 Actor 类型时,可以传递此信息。
这种虚拟 Actor 生命周期的抽象带来了一些注意事项,尽管 Dapr 的 Actor 实现有时会偏离这种模型。
Actor 在首次向其 Actor ID 发送消息时会自动激活(即构建 Actor 对象)。经过一段时间后,Actor 对象会被垃圾回收。将来再次使用该 Actor ID 会导致构建新的 Actor 对象。Actor 的状态超越对象的生命周期,因为状态存储在为 Dapr 运行时配置的状态提供者中。
分布和故障转移
为了提供可扩展性和可靠性,Actor 实例分布在整个集群中,Dapr 会根据需要自动将它们从故障节点迁移到健康节点。
Actor 分布在 Actor 服务的实例中,这些实例分布在集群中的节点上。每个服务实例包含给定 Actor 类型的一组 Actor。
Actor 放置服务
Dapr 的 Actor 运行时通过 Actor Placement
服务为您管理分布方案和键范围设置。当创建服务的新实例时:
- Sidecar 调用 Actor 服务以检索注册的 Actor 类型和配置设置。
- 相应的 Dapr 运行时注册它可以创建的 Actor 类型。
Placement
服务计算给定 Actor 类型的所有实例的分区。
每个 Actor 类型的分区数据表在环境中运行的每个 Dapr 实例中更新和存储,并且可以随着 Actor 服务的新实例的创建和销毁而动态变化。

当客户端调用具有特定 ID 的 Actor(例如,Actor ID 123)时,客户端的 Dapr 实例对 Actor 类型和 ID 进行哈希,并使用信息调用可以为该特定 Actor ID 提供请求的相应 Dapr 实例。因此,对于任何给定的 Actor ID,总是调用相同的分区(或服务实例)。这在下图中显示。

这简化了一些选择,但也带来了一些考虑:
- 默认情况下,Actor 随机放置到 Pod 中,导致均匀分布。
- 由于 Actor 是随机放置的,因此应预期 Actor 操作总是需要网络通信,包括方法调用数据的序列化和反序列化,从而产生延迟和开销。
注意
注意:Dapr 的 Actor Placement 服务仅用于 Actor 放置,因此如果您的服务不使用 Dapr Actor,则不需要。Placement 服务可以在所有 托管环境 中运行,包括 selfhost 和 Kubernetes。Actor 的通信
您可以通过调用 HTTP 端点与 Dapr 交互以调用 Actor 方法。
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>
您可以在请求体中为 Actor 方法提供任何数据,请求的响应将在响应体中,这是来自 Actor 调用的数据。
另一种可能更方便的与 Actor 交互的方式是通过 SDK。Dapr 目前支持 .NET、Java 和 Python 的 Actor SDK。
有关更多详细信息,请参阅 Dapr Actor 特性。
并发
Dapr 的 Actor 运行时为访问 Actor 方法提供了简单的轮转访问模型。这意味着在任何时候,Actor 对象的代码中最多只能有一个线程处于活动状态。轮转访问极大地简化了并发系统,因为不需要同步机制来进行数据访问。这也意味着系统必须针对每个 Actor 实例的单线程访问特性进行特殊设计。
单个 Actor 实例不能同时处理多个请求。如果期望 Actor 实例处理并发请求,它可能会导致吞吐量瓶颈。
如果在两个 Actor 之间存在循环请求,同时对其中一个 Actor 发出外部请求,Actor 可能会相互死锁。Dapr 的 Actor 运行时会自动在 Actor 调用上超时,并向调用者抛出异常以中断可能的死锁情况。

重入
要允许 Actor “重入” 并调用自身的方法,请参阅 Actor 重入。
轮转访问
轮转包括响应其他 Actor 或客户端请求的 Actor 方法的完整执行,或定时器/提醒回调的完整执行。即使这些方法和回调是异步的,Dapr 的 Actor 运行时也不会交错它们。一个轮转必须完全完成后,才允许新的轮转。换句话说,当前正在执行的 Actor 方法或定时器/提醒回调必须完全完成后,才允许对方法或回调的新调用。方法或回调被认为已完成,如果执行已从方法或回调返回,并且方法或回调返回的任务已完成。值得强调的是,即使在不同的方法、定时器和回调之间,也要尊重轮转并发性。
Dapr 的 Actor 运行时通过在轮转开始时获取每个 Actor 锁,并在轮转结束时释放锁来强制执行轮转并发性。因此,轮转并发性是在每个 Actor 的基础上强制执行的,而不是跨 Actor。Actor 方法和定时器/提醒回调可以代表不同的 Actor 同时执行。
以下示例说明了上述概念。考虑一个实现了两个异步方法(例如,Method1 和 Method2)、一个定时器和一个提醒的 Actor 类型。下图显示了代表属于此 Actor 类型的两个 Actor(ActorId1 和 ActorId2)的方法和回调执行时间线的示例。

下一步
定时器和提醒 >>相关链接
3 - actor 运行时配置参数
您可以使用以下配置参数来调整 Dapr actor 的默认运行时行为。
参数 | 描述 | 默认值 |
---|---|---|
entities |
此主机支持的 actor 类型。 | N/A |
actorIdleTimeout |
空闲 actor 的停用超时时间。每隔 actorScanInterval 时间间隔检查一次。 |
60 分钟 |
actorScanInterval |
指定扫描空闲 actor 的时间间隔。超过 actorIdleTimeout 的 actor 将被停用。 |
30 秒 |
drainOngoingCallTimeout |
在重新平衡 actor 时,指定当前活动 actor 方法的完成超时时间。如果没有正在进行的方法调用,则忽略此项。 | 60 秒 |
drainRebalancedActors |
如果设置为 true,Dapr 将在 drainOngoingCallTimeout 时间内等待当前 actor 调用完成,然后再尝试停用 actor。 |
true |
reentrancy (ActorReentrancyConfig ) |
配置 actor 的重入行为。如果未提供,则重入功能被禁用。 | 禁用,false |
remindersStoragePartitions |
配置 actor 的提醒分区数量。如果未提供,所有提醒将作为 actor 状态存储中的单个记录保存。 | 0 |
entitiesConfig |
使用配置数组单独配置每个 actor 类型。任何在单个实体配置中指定的实体也必须在顶级 entities 字段中列出。 |
N/A |
示例
// 在 Startup.cs 中
public void ConfigureServices(IServiceCollection services)
{
// 使用 DI 注册 actor 运行时
services.AddActors(options =>
{
// 注册 actor 类型并配置 actor 设置
options.Actors.RegisterActor<MyActor>();
// 配置默认设置
options.ActorIdleTimeout = TimeSpan.FromMinutes(60);
options.ActorScanInterval = TimeSpan.FromSeconds(30);
options.DrainOngoingCallTimeout = TimeSpan.FromSeconds(60);
options.DrainRebalancedActors = true;
options.RemindersStoragePartitions = 7;
options.ReentrancyConfig = new() { Enabled = false };
// 为特定 actor 类型添加配置。
// 此 actor 类型必须在基础级别的 'entities' 字段中有匹配值。如果没有,配置将被忽略。
// 如果有匹配的实体,这里的值将用于覆盖根配置中指定的任何值。
// 在此示例中,`ReentrantActor` 启用了重入;然而,'MyActor' 将不启用重入。
options.Actors.RegisterActor<ReentrantActor>(typeOptions: new()
{
ReentrancyConfig = new()
{
Enabled = true,
}
});
});
// 注册用于 actor 的其他服务
services.AddSingleton<BankService>();
}
import { CommunicationProtocolEnum, DaprClient, DaprServer } from "@dapr/dapr";
// 使用 DaprClientOptions 配置 actor 运行时。
const clientOptions = {
actor: {
actorIdleTimeout: "1h",
actorScanInterval: "30s",
drainOngoingCallTimeout: "1m",
drainRebalancedActors: true,
reentrancy: {
enabled: true,
maxStackDepth: 32,
},
remindersStoragePartitions: 0,
},
};
// 在创建 DaprServer 和 DaprClient 时使用这些选项。
// 注意,DaprServer 内部创建了一个 DaprClient,需要使用 clientOptions 进行配置。
const server = new DaprServer(serverHost, serverPort, daprHost, daprPort, clientOptions);
const client = new DaprClient(daprHost, daprPort, CommunicationProtocolEnum.HTTP, clientOptions);
from datetime import timedelta
from dapr.actor.runtime.config import ActorRuntimeConfig, ActorReentrancyConfig
ActorRuntime.set_actor_config(
ActorRuntimeConfig(
actor_idle_timeout=timedelta(hours=1),
actor_scan_interval=timedelta(seconds=30),
drain_ongoing_call_timeout=timedelta(minutes=1),
drain_rebalanced_actors=True,
reentrancy=ActorReentrancyConfig(enabled=False),
remindersStoragePartitions=7
)
)
// import io.dapr.actors.runtime.ActorRuntime;
// import java.time.Duration;
ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofMinutes(60));
ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(30));
ActorRuntime.getInstance().getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(60));
ActorRuntime.getInstance().getConfig().setDrainBalancedActors(true);
ActorRuntime.getInstance().getConfig().setActorReentrancyConfig(false, null);
ActorRuntime.getInstance().getConfig().setRemindersStoragePartitions(7);
const (
defaultActorType = "basicType"
reentrantActorType = "reentrantType"
)
type daprConfig struct {
Entities []string `json:"entities,omitempty"`
ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"`
ActorScanInterval string `json:"actorScanInterval,omitempty"`
DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"`
DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"`
Reentrancy config.ReentrancyConfig `json:"reentrancy,omitempty"`
EntitiesConfig []config.EntityConfig `json:"entitiesConfig,omitempty"`
}
var daprConfigResponse = daprConfig{
Entities: []string{defaultActorType, reentrantActorType},
ActorIdleTimeout: actorIdleTimeout,
ActorScanInterval: actorScanInterval,
DrainOngoingCallTimeout: drainOngoingCallTimeout,
DrainRebalancedActors: drainRebalancedActors,
Reentrancy: config.ReentrancyConfig{Enabled: false},
EntitiesConfig: []config.EntityConfig{
{
// 为特定 actor 类型添加配置。
// 此 actor 类型必须在基础级别的 'entities' 字段中有匹配值。如果没有,配置将被忽略。
// 如果有匹配的实体,这里的值将用于覆盖根配置中指定的任何值。
// 在此示例中,`reentrantActorType` 启用了重入;然而,'defaultActorType' 将不启用重入。
Entities: []string{reentrantActorType},
Reentrancy: config.ReentrancyConfig{
Enabled: true,
MaxStackDepth: &maxStackDepth,
},
},
},
}
func configHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(daprConfigResponse)
}
下一步
启用 actor reminder 分区 >>相关链接
4 - 命名空间中的actor
在Dapr中,命名空间用于提供隔离,从而支持多租户。通过为actor添加命名空间,相同的actor类型可以部署在不同的命名空间中。您可以在同一命名空间中使用这些actor的实例。
注意
每个命名空间中的actor部署必须使用独立的状态存储,特别是在相同的actor类型跨多个命名空间使用时。换句话说,actor记录中不包含任何命名空间信息,因此每个命名空间需要单独的状态存储。请参阅为命名空间配置actor状态存储部分以获取示例。创建和配置命名空间
您可以在自托管模式或Kubernetes上使用命名空间。
在自托管模式下,您可以通过设置NAMESPACE
环境变量为Dapr实例指定命名空间。
在Kubernetes上,您可以在部署actor应用程序时创建和配置命名空间。例如,使用以下kubectl
命令开始:
kubectl create namespace namespace-actorA
kubectl config set-context --current --namespace=namespace-actorA
然后,将您的actor应用程序部署到此命名空间中(在示例中为namespace-actorA
)。
为命名空间配置actor状态存储
每个命名空间中的actor部署必须使用独立的状态存储。虽然您可以为每个actor命名空间使用不同的物理数据库,但某些状态存储组件提供了一种通过表、前缀、集合等逻辑分隔数据的方法。这允许您在多个命名空间中使用相同的物理数据库,只要您在Dapr组件定义中提供逻辑分隔即可。
以下是一些示例。
示例1:通过etcd中的前缀
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.etcd
version: v2
metadata:
- name: endpoints
value: localhost:2379
- name: keyPrefixPath
value: namespace-actorA
- name: actorStateStore
value: "true"
示例2:通过SQLite中的表名
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.sqlite
version: v1
metadata:
- name: connectionString
value: "data.db"
- name: tableName
value: "namespace-actorA"
- name: actorStateStore
value: "true"
示例3:通过Redis中的逻辑数据库编号
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
- name: redisDB
value: "1"
- name: redisPassword
secretKeyRef:
name: redis-secret
key: redis-password
- name: actorStateStore
value: "true"
- name: redisDB
value: "1"
auth:
secretStore: <SECRET_STORE_NAME>
查看您的状态存储组件规格以了解其提供的功能。
注意
命名空间中的actor使用多租户Placement服务。在这个控制平面服务中,每个应用程序部署都有自己的命名空间,属于命名空间"ActorA"的应用程序的sidecar不会接收到命名空间"ActorB"的应用程序的placement信息。下一步
5 - actor 定时器和提醒
actor 可以通过注册定时器或提醒来安排周期性工作。
定时器和提醒的功能非常相似。主要区别在于 Dapr actor 运行时在停用后不会保留任何关于定时器的信息,而是使用 Dapr actor 状态提供程序持久化提醒的信息。
这种区别允许用户在轻量但无状态的定时器与更资源密集但有状态的提醒之间进行选择。
定时器和提醒的调度配置是相同的,概述如下:
dueTime
是一个可选参数,用于设置第一次调用回调的时间或时间间隔。如果省略 dueTime
,则在定时器/提醒注册后立即调用回调。
支持的格式:
- RFC3339 日期格式,例如
2020-10-02T15:00:00Z
- time.Duration 格式,例如
2h30m
- ISO 8601 持续时间 格式,例如
PT2H30M
period
是一个可选参数,用于设置两次连续回调调用之间的时间间隔。当以 ISO 8601-1 持续时间
格式指定时,您还可以配置重复次数以限制回调调用的总次数。
如果省略 period
,则回调只会被调用一次。
支持的格式:
- time.Duration 格式,例如
2h30m
- ISO 8601 持续时间 格式,例如
PT2H30M
,R5/PT1M30S
ttl
是一个可选参数,用于设置定时器/提醒将过期和删除的时间或时间间隔。如果省略 ttl
,则不应用任何限制。
支持的格式:
- RFC3339 日期格式,例如
2020-10-02T15:00:00Z
- time.Duration 格式,例如
2h30m
- ISO 8601 持续时间 格式。示例:
PT2H30M
actor 运行时验证调度配置的正确性,并在输入无效时返回错误。
当您同时指定 period
中的重复次数和 ttl
时,定时器/提醒将在任一条件满足时停止。
actor 定时器
您可以在 actor 上注册一个基于定时器执行的回调。
Dapr actor 运行时确保回调方法遵循基于轮次的并发保证。这意味着在此回调完成执行之前,不会有其他 actor 方法或定时器/提醒回调正在进行。
Dapr actor 运行时在回调完成时保存对 actor 状态所做的更改。如果在保存状态时发生错误,该 actor 对象将被停用,并激活一个新实例。
当 actor 作为垃圾回收的一部分被停用时,所有定时器都会停止。之后不会调用任何定时器回调。此外,Dapr actor 运行时不会保留关于停用前正在运行的定时器的任何信息。actor 需要在将来重新激活时注册所需的任何定时器。
您可以通过调用如下所示的 HTTP/gRPC 请求或通过 Dapr SDK 为 actor 创建定时器。
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
示例
定时器参数在请求体中指定。
以下请求体配置了一个 dueTime
为 9 秒和 period
为 3 秒的定时器。这意味着它将在 9 秒后首次触发,然后每隔 3 秒触发一次。
{
"dueTime":"0h0m9s0ms",
"period":"0h0m3s0ms"
}
以下请求体配置了一个 period
为 3 秒(ISO 8601 持续时间格式)的定时器。它还将调用次数限制为 10 次。这意味着它将触发 10 次:首先在注册后立即触发,然后每隔 3 秒触发一次。
{
"period":"R10/PT3S",
}
以下请求体配置了一个 period
为 3 秒(ISO 8601 持续时间格式)和 ttl
为 20 秒的定时器。这意味着它在注册后立即触发,然后每隔 3 秒触发一次,持续 20 秒。
{
"period":"PT3S",
"ttl":"20s"
}
以下请求体配置了一个 dueTime
为 10 秒、period
为 3 秒和 ttl
为 10 秒的定时器。它还将调用次数限制为 4 次。这意味着它将在 10 秒后首次触发,然后每隔 3 秒触发一次,持续 10 秒,但总共不超过 4 次。
{
"dueTime":"10s",
"period":"R4/PT3S",
"ttl":"10s"
}
您可以通过调用以下命令删除 actor 定时器
DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
有关更多详细信息,请参阅 api 规范。
actor 提醒
注意
在 Dapr v1.15 中,actor 提醒默认存储在 Scheduler 服务中。提醒是一种在指定时间触发 actor 上持久回调的机制。它们的功能类似于定时器。但与定时器不同,提醒在所有情况下都会被触发,直到 actor 明确取消注册它们或 actor 被明确删除或调用次数耗尽。具体来说,提醒在 actor 停用和故障转移期间被触发,因为 Dapr actor 运行时使用 Dapr actor 状态提供程序持久化关于 actor 提醒的信息。
您可以通过调用如下所示的 HTTP/gRPC 请求或通过 Dapr SDK 为 actor 创建持久提醒。
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
提醒的请求结构与 actor 的相同。请参阅 actor 定时器示例。
检索 actor 提醒
您可以通过调用以下命令检索 actor 提醒
GET http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
删除 actor 提醒
您可以通过调用以下命令删除 actor 提醒
DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
如果 actor 提醒被触发且应用程序未向运行时返回 2** 代码(例如,由于连接问题),actor 提醒将重试最多三次,每次尝试之间的退避间隔为一秒。可能会根据任何可选应用的 actor 弹性策略进行额外的重试。
有关更多详细信息,请参阅 api 规范。
错误处理
当 actor 的方法成功完成时,运行时将继续按照指定的定时器或提醒计划调用该方法。然而,如果方法抛出异常,运行时会捕获它并在 Dapr sidecar 日志中记录错误消息,而不进行重试。
为了允许 actor 从故障中恢复并在崩溃或重启后重试,您可以通过配置状态存储(如 Redis 或 Azure Cosmos DB)来持久化 actor 的状态。
如果方法的调用失败,定时器不会被移除。定时器仅在以下情况下被移除:
- sidecar 崩溃
- 执行次数用尽
- 您明确删除它
提醒数据序列化格式
actor 提醒数据默认序列化为 JSON。从 Dapr v1.13 开始,支持通过 Placement 和 Scheduler 服务为工作流的内部提醒数据使用 protobuf 序列化格式。根据吞吐量和负载大小,这可以显著提高性能,为开发人员提供更高的吞吐量和更低的延迟。
另一个好处是将较小的数据存储在 actor 底层数据库中,这在使用某些云数据库时可以实现成本优化。使用 protobuf 序列化的限制是提醒数据不再可查询。
注意
protobuf 序列化将在 Dapr 1.14 中成为默认格式以 protobuf 格式保存的提醒数据无法在 Dapr 1.12.x 及更早版本中读取。建议在 Dapr v1.13 中测试此功能,并验证它在您的数据库中按预期工作,然后再投入生产。
注意
如果您在 Dapr v1.13 中使用 protobuf 序列化并需要降级到更早的 Dapr 版本,提醒数据将与 1.12.x 及更早版本不兼容。一旦您以 protobuf 格式保存提醒数据,就无法将其移回 JSON 格式。在 Kubernetes 上启用 protobuf 序列化
要在 Kubernetes 上为 actor 提醒使用 protobuf 序列化,请使用以下 Helm 值:
--set dapr_placement.maxActorApiLevel=20
在自托管环境中启用 protobuf 序列化
要在自托管环境中为 actor 提醒使用 protobuf 序列化,请使用以下 daprd
标志:
--max-api-level=20
下一步
配置 actor 运行时行为 >>相关链接
6 - 如何启用actor提醒分区
actor提醒在sidecar重启后仍然持久化并继续触发。注册了多个提醒的应用程序可能会遇到以下问题:
- 提醒注册和注销的吞吐量低
- 基于state存储单个记录大小限制的提醒注册数量有限
为了解决这些问题,应用程序可以通过在state存储中将数据分布在多个键中来启用actor提醒分区。
- 在
actors\|\|<actor type>\|\|metadata
中使用一个元数据记录来存储给定actor类型的持久化配置。 - 多个记录存储同一actor类型的提醒子集。
键 | 值 |
---|---|
actors||<actor type>||metadata |
{ "id": <actor metadata identifier>, "actorRemindersMetadata": { "partitionCount": <number of partitions for reminders> } } |
actors||<actor type>||<actor metadata identifier>||reminders||1 |
[ <reminder 1-1>, <reminder 1-2>, ... , <reminder 1-n> ] |
actors||<actor type>||<actor metadata identifier>||reminders||2 |
[ <reminder 1-1>, <reminder 1-2>, ... , <reminder 1-m> ] |
如果您需要更改分区数量,Dapr的sidecar将自动重新分配提醒集。
配置actor运行时以分区actor提醒
与其他actor配置元素类似,actor运行时通过actor的GET /dapr/config
端点提供适当的配置来分区actor提醒。选择您偏好的语言以获取actor运行时配置示例。
// 在Startup.cs中
public void ConfigureServices(IServiceCollection services)
{
// 使用DI注册actor运行时
services.AddActors(options =>
{
// 注册actor类型并配置actor设置
options.Actors.RegisterActor<MyActor>();
// 配置默认设置
options.ActorIdleTimeout = TimeSpan.FromMinutes(60);
options.ActorScanInterval = TimeSpan.FromSeconds(30);
options.RemindersStoragePartitions = 7;
});
// 注册用于actor的其他服务
services.AddSingleton<BankService>();
}
import { CommunicationProtocolEnum, DaprClient, DaprServer } from "@dapr/dapr";
// 使用DaprClientOptions配置actor运行时。
const clientOptions = {
actor: {
remindersStoragePartitions: 0,
},
};
const actor = builder.build(new ActorId("my-actor"));
// 注册一个提醒,它有一个默认回调:`receiveReminder`
await actor.registerActorReminder(
"reminder-id", // 提醒的唯一名称。
Temporal.Duration.from({ seconds: 2 }), // DueTime
Temporal.Duration.from({ seconds: 1 }), // Period
Temporal.Duration.from({ seconds: 1 }), // TTL
100, // 发送到提醒回调的状态。
);
// 删除提醒
await actor.unregisterActorReminder("reminder-id");
from datetime import timedelta
ActorRuntime.set_actor_config(
ActorRuntimeConfig(
actor_idle_timeout=timedelta(hours=1),
actor_scan_interval=timedelta(seconds=30),
remindersStoragePartitions=7
)
)
// import io.dapr.actors.runtime.ActorRuntime;
// import java.time.Duration;
ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofMinutes(60));
ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(30));
ActorRuntime.getInstance().getConfig().setRemindersStoragePartitions(7);
type daprConfig struct {
Entities []string `json:"entities,omitempty"`
ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"`
ActorScanInterval string `json:"actorScanInterval,omitempty"`
DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"`
DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"`
RemindersStoragePartitions int `json:"remindersStoragePartitions,omitempty"`
}
var daprConfigResponse = daprConfig{
[]string{defaultActorType},
actorIdleTimeout,
actorScanInterval,
drainOngoingCallTimeout,
drainRebalancedActors,
7,
}
func configHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(daprConfigResponse)
}
以下是一个有效的提醒分区配置示例:
{
"entities": [ "MyActorType", "AnotherActorType" ],
"remindersStoragePartitions": 7
}
处理配置更改
为了配置actor提醒分区,Dapr将actor类型元数据持久化在actor的state存储中。这允许配置更改在全局范围内应用,而不仅仅是在单个sidecar实例中。
此外,您只能增加分区数量,不能减少。这允许Dapr在滚动重启时自动重新分配数据,其中一个或多个分区配置可能处于活动状态。
演示
7 - 操作指南:使用脚本与虚拟actor交互
了解如何通过HTTP/gRPC端点来使用虚拟actor。
调用actor方法
您可以通过调用HTTP/gRPC端点与Dapr交互,以调用actor方法。
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
在请求体中提供actor方法所需的数据。请求的响应,即actor方法调用返回的数据,将在响应体中。
有关更多详细信息,请参阅Actors API规范。
注意
您也可以使用Dapr SDKs来操作actors。使用actors保存状态
您可以通过HTTP/gRPC端点与Dapr交互,利用Dapr的actor状态管理功能来可靠地保存状态。
要使用actors,您的状态存储必须支持多项事务。这意味着您的状态存储组件需要实现TransactionalStore
接口。
查看支持事务/actors的组件列表。所有actors只能使用一个状态存储组件来保存状态。
下一步
actor重入 >>相关链接
8 - 如何:在Dapr中启用和使用actor重入
虚拟actor模式的一个核心原则是actor的单线程执行特性。没有重入时,Dapr运行时会锁定所有actor请求。第二个请求必须等到第一个请求完成后才能启动。这意味着actor不能调用自身,也不能被另一个actor调用,即使它们属于同一调用链。
重入通过允许同一链或上下文的请求重新进入已锁定的actor来解决这个问题。这在以下场景中非常有用:
- 一个actor想要调用自身的方法
- actor在工作流中用于执行任务,然后回调到协调actor。
重入允许的调用链示例如下:
Actor A -> Actor A
Actor A -> Actor B -> Actor A
通过重入,您可以执行更复杂的actor调用,而不影响虚拟actor的单线程特性。

maxStackDepth
参数用于设置一个值,以控制对同一actor可以进行多少次重入调用。默认情况下,这个值为32,通常已经足够。
配置actor运行时以启用重入
要启用actor重入,必须提供适当的配置。这是通过actor的GET /dapr/config
端点完成的,类似于其他actor配置元素。
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<BankService>();
services.AddActors(options =>
{
options.Actors.RegisterActor<DemoActor>();
options.ReentrancyConfig = new Dapr.Actors.ActorReentrancyConfig()
{
Enabled = true,
MaxStackDepth = 32,
};
});
}
}
import { CommunicationProtocolEnum, DaprClient, DaprServer } from "@dapr/dapr";
// 使用DaprClientOptions配置actor运行时。
const clientOptions = {
actor: {
reentrancy: {
enabled: true,
maxStackDepth: 32,
},
},
};
from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from dapr.actor.runtime.config import ActorRuntimeConfig, ActorReentrancyConfig
from dapr.actor.runtime.runtime import ActorRuntime
from demo_actor import DemoActor
reentrancyConfig = ActorReentrancyConfig(enabled=True)
config = ActorRuntimeConfig(reentrancy=reentrancyConfig)
ActorRuntime.set_actor_config(config)
app = FastAPI(title=f'{DemoActor.__name__}Service')
actor = DaprActor(app)
@app.on_event("startup")
async def startup_event():
# 注册DemoActor
await actor.register_actor(DemoActor)
@app.get("/MakeExampleReentrantCall")
def do_something_reentrant():
# 在这里调用另一个actor,重入将自动处理
return
以下是一个用Golang编写的actor代码片段,通过HTTP API提供重入配置。重入尚未包含在Go SDK中。
type daprConfig struct {
Entities []string `json:"entities,omitempty"`
ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"`
ActorScanInterval string `json:"actorScanInterval,omitempty"`
DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"`
DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"`
Reentrancy config.ReentrancyConfig `json:"reentrancy,omitempty"`
}
var daprConfigResponse = daprConfig{
[]string{defaultActorType},
actorIdleTimeout,
actorScanInterval,
drainOngoingCallTimeout,
drainRebalancedActors,
config.ReentrancyConfig{Enabled: true, MaxStackDepth: &maxStackDepth},
}
func configHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(daprConfigResponse)
}
处理重入请求
处理重入请求的关键在于Dapr-Reentrancy-Id
头。此头的值用于将请求与其调用链匹配,并允许它们绕过actor的锁。
此头由Dapr运行时为任何具有重入配置的actor请求生成。一旦生成,它用于锁定actor,并且必须传递给所有后续请求。以下是一个actor处理重入请求的示例:
func reentrantCallHandler(w http.ResponseWriter, r *http.Request) {
/*
* 省略。
*/
req, _ := http.NewRequest("PUT", url, bytes.NewReader(nextBody))
reentrancyID := r.Header.Get("Dapr-Reentrancy-Id")
req.Header.Add("Dapr-Reentrancy-Id", reentrancyID)
client := http.Client{}
resp, err := client.Do(req)
/*
* 省略。
*/
}
演示
观看此视频以了解如何使用actor重入。