更多关于 Dapr 状态管理的内容
了解如何使用 Dapr 进行状态管理:
- 试试 状态管理快速入门。
- 使用任意 Dapr SDKs 探索状态管理。
- 查看 状态管理 API 参考文档。
- 浏览支持的 状态管理组件规格。
This is the multi-page printable view of this section. Click here to print.
了解如何使用 Dapr 进行状态管理:
您的应用程序可以利用Dapr的状态管理API在支持的状态存储中保存、读取和查询键/值对。通过状态存储组件,您可以构建有状态且长时间运行的应用程序,例如购物车或游戏的会话状态。如下图所示:
以下视频和演示概述了Dapr状态管理的工作原理。
通过状态管理API模块,您的应用程序可以利用一些通常复杂且容易出错的功能,包括:
以下是状态管理API的一些功能:
Dapr的数据存储被设计为组件,可以在不更改服务代码的情况下进行替换。查看支持的状态存储以获取更多信息。
使用Dapr,您可以在状态操作请求中附加元数据,描述您期望请求如何被处理。您可以附加:
默认情况下,您的应用程序应假设数据存储是最终一致的,并使用最后写入胜出并发模式。
并非所有存储都是平等的。为了确保您的应用程序的可移植性,您可以查询存储的元数据能力,并使您的代码适应不同的存储能力。
Dapr支持使用ETags的乐观并发控制(OCC)。当请求状态值时,Dapr总是将ETag属性附加到返回的状态中。当用户代码:
If-Match
头附加ETag。当提供的ETag与状态存储中的ETag匹配时,写入
操作成功。
在许多应用程序中,数据更新冲突很少见,因为客户端通常根据业务上下文分区以操作不同的数据。然而,如果您的应用程序选择使用ETags,不匹配的ETags可能导致请求被拒绝。建议您在代码中使用重试策略,以在使用ETags时补偿冲突。
如果您的应用程序在写入请求中省略ETags,Dapr在处理请求时会跳过ETag检查。这使得最后写入胜出模式成为可能,与使用ETags的首次写入胜出模式相比。
阅读API参考以了解如何设置并发选项。
Dapr支持强一致性和最终一致性,最终一致性是默认行为。
阅读API参考以了解如何设置一致性选项。
状态存储组件可能会根据内容类型不同地维护和操作数据。Dapr支持在状态管理API中作为请求元数据的一部分传递内容类型。
设置内容类型是_可选的_,组件决定是否使用它。Dapr仅提供将此信息传递给组件的手段。
metadata.contentType
设置内容类型。例如,http://localhost:3500/v1.0/state/store?metadata.contentType=application/json
。"contentType" : <content type>
来设置内容类型。Dapr支持两种类型的多读或多写操作:批量或事务性。阅读API参考以了解如何使用批量和多选项。
您可以将多个读取请求分组为批量(或批次)操作。在批量操作中,Dapr将读取请求作为单独的请求提交到底层数据存储,并将它们作为单个结果返回。
您可以将写入、更新和删除操作分组为一个请求,然后作为一个原子事务处理。请求将作为一组事务性操作成功或失败。
事务性状态存储可用于存储actor状态。要指定用于actor的状态存储,请在状态存储组件的元数据部分中将属性actorStateStore
的值指定为true
。actor状态以特定方案存储在事务性状态存储中,允许进行一致的查询。所有actor只能使用一个状态存储组件作为状态存储。阅读state API参考和actors API参考以了解有关actor状态存储的更多信息。
在保存actor状态时,您应始终设置TTL元数据字段(ttlInSeconds
)或在您选择的SDK中使用等效的API调用,以确保状态最终被移除。阅读actors概述以获取更多信息。
Dapr支持应用程序状态的自动客户端加密,并支持密钥轮换。这在所有Dapr状态存储上都支持。有关更多信息,请阅读如何:加密应用程序状态主题。
不同应用程序在共享状态时的需求各不相同。在一种情况下,您可能希望将所有状态封装在给定应用程序中,并让Dapr为您管理访问。在另一种情况下,您可能希望两个应用程序在同一状态上工作,以获取和保存相同的键。
Dapr使状态能够:
有关更多详细信息,请阅读如何:在应用程序之间共享状态。
Dapr使开发人员能够使用外发模式在事务性状态存储和任何消息代理之间实现单一事务。有关更多信息,请阅读如何启用事务性外发消息。
有两种方法可以查询状态:
使用_可选的_状态管理查询API,您可以查询状态存储中保存的键/值数据,无论底层数据库或存储技术如何。使用状态管理查询API,您可以过滤、排序和分页键/值数据。有关更多详细信息,请阅读如何:查询状态。
Dapr在不进行任何转换的情况下保存和检索状态值。您可以直接从底层状态存储查询和聚合状态。例如,要获取与应用程序ID “myApp” 相关的所有状态键,请在Redis中使用:
KEYS "myApp*"
如果数据存储支持SQL查询,您可以使用SQL查询actor的状态。例如:
SELECT * FROM StateTable WHERE Id='<app-id>||<actor-type>||<actor-id>||<key>'
您还可以通过对actor实例执行聚合查询来避免actor框架的常见轮次并发限制。例如,要计算所有温度计actor的平均温度,请使用:
SELECT AVG(value) FROM StateTable WHERE Id LIKE '<app-id>||<thermometer>||*||temperature'
Dapr支持每个状态设置请求的生存时间(TTL)。这意味着应用程序可以为每个存储的状态设置生存时间,这些状态在过期后无法检索。
状态管理API可以在状态管理API参考中找到,该参考描述了如何通过提供键来检索、保存、删除和查询状态值。
想要测试Dapr状态管理API吗?通过以下快速入门和教程,看看状态管理的实际应用:
快速入门/教程 | 描述 |
---|---|
状态管理快速入门 | 使用状态管理API创建有状态的应用程序。 |
Hello World | 推荐 演示如何在本地运行Dapr。突出显示服务调用和状态管理。 |
Hello World Kubernetes | 推荐 演示如何在Kubernetes中运行Dapr。突出显示服务调用和_状态管理_。 |
想要跳过快速入门?没问题。您可以直接在您的应用程序中试用状态管理模块。在Dapr安装后,您可以从状态管理如何指南开始使用状态管理API。
状态管理是新应用程序、遗留应用程序、单体应用程序或微服务应用程序的常见需求之一。处理和测试不同的数据库库,以及处理重试和故障,可能既困难又耗时。
在本指南中,您将学习如何使用键/值状态API来保存、获取和删除应用程序的状态。
下面的代码示例描述了一个处理订单的应用程序,该应用程序使用Dapr sidecar。订单处理服务通过Dapr将状态存储在Redis状态存储中。
状态存储组件是Dapr用于与数据库通信的资源。
在本指南中,我们将使用Redis状态存储,但您也可以选择支持列表中的其他状态存储。
当您在selfhost模式下运行dapr init
时,Dapr会在您的本地机器上创建一个默认的Redis statestore.yaml
并运行一个Redis状态存储,位置如下:
%UserProfile%\.dapr\components\statestore.yaml
~/.dapr/components/statestore.yaml
通过使用statestore.yaml
组件,您可以在不更改应用程序代码的情况下轻松更换底层组件。
要将其部署到Kubernetes集群中,请在下面的YAML中填写您的状态存储组件的metadata
连接详细信息,保存为statestore.yaml
,然后运行kubectl apply -f statestore.yaml
。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
请参阅如何在Kubernetes上设置不同的状态存储以获取更多信息。
app-id
,因为状态键会以此值为前缀。如果您不设置app-id
,系统会在运行时为您生成一个。下次运行命令时,会生成一个新的app-id
,您将无法再访问之前保存的状态。以下示例展示了如何使用Dapr状态管理API保存和检索单个键/值对。
// 依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
using System.Text.Json;
// 代码
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string DAPR_STORE_NAME = "statestore";
while(true) {
System.Threading.Thread.Sleep(5000);
using var client = new DaprClientBuilder().Build();
Random random = new Random();
int orderId = random.Next(1,1000);
// 使用Dapr SDK保存和获取状态
await client.SaveStateAsync(DAPR_STORE_NAME, "order_1", orderId.ToString());
await client.SaveStateAsync(DAPR_STORE_NAME, "order_2", orderId.ToString());
var result = await client.GetStateAsync<string>(DAPR_STORE_NAME, "order_1");
Console.WriteLine("获取后的结果: " + result);
}
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.TransactionalStateOperation;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Random;
import java.util.concurrent.TimeUnit;
// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {
private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
private static final String STATE_STORE_NAME = "statestore";
public static void main(String[] args) throws InterruptedException{
while(true) {
TimeUnit.MILLISECONDS.sleep(5000);
Random random = new Random();
int orderId = random.nextInt(1000-1) + 1;
DaprClient client = new DaprClientBuilder().build();
// 使用Dapr SDK保存和获取状态
client.saveState(STATE_STORE_NAME, "order_1", Integer.toString(orderId)).block();
client.saveState(STATE_STORE_NAME, "order_2", Integer.toString(orderId)).block();
Mono<State<String>> result = client.getState(STATE_STORE_NAME, "order_1", String.class);
log.info("获取后的结果" + result);
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
import random
from time import sleep
import requests
import logging
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"
while True:
sleep(random.randrange(50, 5000) / 1000)
orderId = random.randint(1, 1000)
with DaprClient() as client:
# 使用Dapr SDK保存和获取状态
client.save_state(DAPR_STORE_NAME, "order_1", str(orderId))
result = client.get_state(DAPR_STORE_NAME, "order_1")
logging.info('获取后的结果: ' + result.data.decode('utf-8'))
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
import (
"context"
"log"
"math/rand"
"strconv"
"time"
dapr "github.com/dapr/go-sdk/client"
)
// 代码
func main() {
const STATE_STORE_NAME = "statestore"
rand.Seed(time.Now().UnixMicro())
for i := 0; i < 10; i++ {
orderId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
if err != nil {
panic(err)
}
result, err := client.GetState(ctx, STATE_STORE_NAME, "order_1", nil)
if err != nil {
panic(err)
}
log.Println("获取后的结果:", string(result.Value))
time.Sleep(2 * time.Second)
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
// 代码
const daprHost = "127.0.0.1";
var main = function() {
for(var i=0;i<10;i++) {
sleep(5000);
var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
start(orderId).catch((e) => {
console.error(e);
process.exit(1);
});
}
}
async function start(orderId) {
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
communicationProtocol: CommunicationProtocolEnum.HTTP,
});
const STATE_STORE_NAME = "statestore";
// 使用Dapr SDK保存和获取状态
await client.state.save(STATE_STORE_NAME, [
{
key: "order_1",
value: orderId.toString()
},
{
key: "order_2",
value: orderId.toString()
}
]);
var result = await client.state.get(STATE_STORE_NAME, "order_1");
console.log("获取后的结果: " + result);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
main();
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
启动一个Dapr sidecar:
dapr run --app-id orderprocessing --dapr-http-port 3601
在一个单独的终端中,将一个键/值对保存到您的状态存储中:
curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250"}]' http://localhost:3601/v1.0/state/statestore
现在获取您刚刚保存的状态:
curl http://localhost:3601/v1.0/state/statestore/order_1
重新启动您的sidecar并尝试再次检索状态,以观察状态与应用程序分开持久化。
启动一个Dapr sidecar:
dapr --app-id orderprocessing --dapr-http-port 3601 run
在一个单独的终端中,将一个键/值对保存到您的状态存储中:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "order_1", "value": "250"}]' -Uri 'http://localhost:3601/v1.0/state/statestore'
现在获取您刚刚保存的状态:
Invoke-RestMethod -Uri 'http://localhost:3601/v1.0/state/statestore/order_1'
重新启动您的sidecar并尝试再次检索状态,以观察状态与应用程序分开持久化。
以下是利用Dapr SDK删除状态的代码示例。
// 依赖项
using Dapr.Client;
// 代码
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string DAPR_STORE_NAME = "statestore";
// 使用Dapr SDK删除状态
using var client = new DaprClientBuilder().Build();
await client.DeleteStateAsync(DAPR_STORE_NAME, "order_1", cancellationToken: cancellationToken);
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import org.springframework.boot.autoconfigure.SpringBootApplication;
// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {
public static void main(String[] args) throws InterruptedException{
String STATE_STORE_NAME = "statestore";
// 使用Dapr SDK删除状态
DaprClient client = new DaprClientBuilder().build();
String storedEtag = client.getState(STATE_STORE_NAME, "order_1", String.class).block().getEtag();
client.deleteState(STATE_STORE_NAME, "order_1", storedEtag, null).block();
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"
# 使用Dapr SDK删除状态
with DaprClient() as client:
client.delete_state(store_name=DAPR_STORE_NAME, key="order_1")
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
import (
"context"
dapr "github.com/dapr/go-sdk/client"
)
// 代码
func main() {
STATE_STORE_NAME := "statestore"
// 使用Dapr SDK删除状态
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
if err := client.DeleteState(ctx, STATE_STORE_NAME, "order_1"); err != nil {
panic(err)
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
// 代码
const daprHost = "127.0.0.1";
var main = function() {
const STATE_STORE_NAME = "statestore";
// 使用Dapr SDK保存和获取状态
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
communicationProtocol: CommunicationProtocolEnum.HTTP,
});
await client.state.delete(STATE_STORE_NAME, "order_1");
}
main();
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
使用上面相同的Dapr实例运行:
curl -X DELETE 'http://localhost:3601/v1.0/state/statestore/order_1'
尝试再次获取状态。注意没有返回值。
使用上面相同的Dapr实例运行:
Invoke-RestMethod -Method Delete -Uri 'http://localhost:3601/v1.0/state/statestore/order_1'
尝试再次获取状态。注意没有返回值。
以下是利用Dapr SDK保存和检索多个状态的代码示例。
// 依赖项
using Dapr.Client;
// 代码
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string DAPR_STORE_NAME = "statestore";
// 使用Dapr SDK检索多个状态
using var client = new DaprClientBuilder().Build();
IReadOnlyList<BulkStateItem> multipleStateResult = await client.GetBulkStateAsync(DAPR_STORE_NAME, new List<string> { "order_1", "order_2" }, parallelism: 1);
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
上述示例返回一个BulkStateItem
,其中包含您保存到状态的值的序列化格式。如果您希望SDK在每个批量响应项中反序列化值,您可以使用以下代码:
// 依赖项
using Dapr.Client;
// 代码
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string DAPR_STORE_NAME = "statestore";
// 使用Dapr SDK检索多个状态
using var client = new DaprClientBuilder().Build();
IReadOnlyList<BulkStateItem<Widget>> mulitpleStateResult = await client.GetBulkStateAsync<Widget>(DAPR_STORE_NAME, new List<string> { "widget_1", "widget_2" }, parallelism: 1);
}
}
class Widget
{
string Size { get; set; }
string Color { get; set; }
}
}
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import java.util.Arrays;
// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {
private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
public static void main(String[] args) throws InterruptedException{
String STATE_STORE_NAME = "statestore";
// 使用Dapr SDK检索多个状态
DaprClient client = new DaprClientBuilder().build();
Mono<List<State<String>>> resultBulk = client.getBulkState(STATE_STORE_NAME,
Arrays.asList("order_1", "order_2"), String.class);
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem
# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"
orderId = 100
# 使用Dapr SDK保存和检索多个状态
with DaprClient() as client:
client.save_bulk_state(store_name=DAPR_STORE_NAME, states=[StateItem(key="order_2", value=str(orderId))])
result = client.get_bulk_state(store_name=DAPR_STORE_NAME, keys=["order_1", "order_2"], states_metadata={"metakey": "metavalue"}).items
logging.info('批量获取后的结果: ' + str(result))
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
import (
"context"
"log"
"math/rand"
"strconv"
"time"
dapr "github.com/dapr/go-sdk/client"
)
// 代码
func main() {
const STATE_STORE_NAME = "statestore"
rand.Seed(time.Now().UnixMicro())
for i := 0; i < 10; i++ {
orderId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
if err != nil {
panic(err)
}
keys := []string{"key1", "key2", "key3"}
items, err := client.GetBulkState(ctx, STATE_STORE_NAME, keys, nil, 100)
if err != nil {
panic(err)
}
for _, item := range items {
log.Println("从GetBulkState获取的项:", string(item.Value))
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
// 代码
const daprHost = "127.0.0.1";
var main = function() {
const STATE_STORE_NAME = "statestore";
var orderId = 100;
// 使用Dapr SDK保存和检索多个状态
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
communicationProtocol: CommunicationProtocolEnum.HTTP,
});
await client.state.save(STATE_STORE_NAME, [
{
key: "order_1",
value: orderId.toString()
},
{
key: "order_2",
value: orderId.toString()
}
]);
result = await client.state.getBulk(STATE_STORE_NAME, ["order_1", "order_2"]);
}
main();
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
使用上面相同的Dapr实例,将两个键/值对保存到您的状态存储中:
curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250"}, { "key": "order_2", "value": "550"}]' http://localhost:3601/v1.0/state/statestore
现在获取您刚刚保存的状态:
curl -X POST -H "Content-Type: application/json" -d '{"keys":["order_1", "order_2"]}' http://localhost:3601/v1.0/state/statestore/bulk
使用上面相同的Dapr实例,将两个键/值对保存到您的状态存储中:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "order_1", "value": "250"}, { "key": "order_2", "value": "550"}]' -Uri 'http://localhost:3601/v1.0/state/statestore'
现在获取您刚刚保存的状态:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["order_1", "order_2"]}' -Uri 'http://localhost:3601/v1.0/state/statestore/bulk'
以下是利用Dapr SDK执行状态事务的代码示例。
// 依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
using System.Text.Json;
// 代码
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string DAPR_STORE_NAME = "statestore";
while(true) {
System.Threading.Thread.Sleep(5000);
Random random = new Random();
int orderId = random.Next(1,1000);
using var client = new DaprClientBuilder().Build();
var requests = new List<StateTransactionRequest>()
{
new StateTransactionRequest("order_3", JsonSerializer.SerializeToUtf8Bytes(orderId.ToString()), StateOperationType.Upsert),
new StateTransactionRequest("order_2", null, StateOperationType.Delete)
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
// 使用Dapr SDK执行状态事务
await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, requests, cancellationToken: cancellationToken);
Console.WriteLine("订单请求: " + orderId);
Console.WriteLine("结果: " + result);
}
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.TransactionalStateOperation;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {
private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
private static final String STATE_STORE_NAME = "statestore";
public static void main(String[] args) throws InterruptedException{
while(true) {
TimeUnit.MILLISECONDS.sleep(5000);
Random random = new Random();
int orderId = random.nextInt(1000-1) + 1;
DaprClient client = new DaprClientBuilder().build();
List<TransactionalStateOperation<?>> operationList = new ArrayList<>();
operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.UPSERT,
new State<>("order_3", Integer.toString(orderId), "")));
operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.DELETE,
new State<>("order_2")));
// 使用Dapr SDK执行状态事务
client.executeStateTransaction(STATE_STORE_NAME, operationList).block();
log.info("订单请求: " + orderId);
}
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
import random
from time import sleep
import requests
import logging
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"
while True:
sleep(random.randrange(50, 5000) / 1000)
orderId = random.randint(1, 1000)
with DaprClient() as client:
# 使用Dapr SDK执行状态事务
client.execute_state_transaction(store_name=DAPR_STORE_NAME, operations=[
TransactionalStateOperation(
operation_type=TransactionOperationType.upsert,
key="order_3",
data=str(orderId)),
TransactionalStateOperation(key="order_3", data=str(orderId)),
TransactionalStateOperation(
operation_type=TransactionOperationType.delete,
key="order_2",
data=str(orderId)),
TransactionalStateOperation(key="order_2", data=str(orderId))
])
client.delete_state(store_name=DAPR_STORE_NAME, key="order_1")
logging.basicConfig(level = logging.INFO)
logging.info('订单请求: ' + str(orderId))
logging.info('结果: ' + str(result))
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
package main
import (
"context"
"log"
"math/rand"
"strconv"
"time"
dapr "github.com/dapr/go-sdk/client"
)
// 代码
func main() {
const STATE_STORE_NAME = "statestore"
rand.Seed(time.Now().UnixMicro())
for i := 0; i < 10; i++ {
orderId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
if err != nil {
panic(err)
}
result, err := client.GetState(ctx, STATE_STORE_NAME, "order_1", nil)
if err != nil {
panic(err)
}
ops := make([]*dapr.StateOperation, 0)
data1 := "data1"
data2 := "data2"
op1 := &dapr.StateOperation{
Type: dapr.StateOperationTypeUpsert,
Item: &dapr.SetStateItem{
Key: "key1",
Value: []byte(data1),
},
}
op2 := &dapr.StateOperation{
Type: dapr.StateOperationTypeDelete,
Item: &dapr.SetStateItem{
Key: "key2",
Value: []byte(data2),
},
}
ops = append(ops, op1, op2)
meta := map[string]string{}
err = client.ExecuteStateTransaction(ctx, STATE_STORE_NAME, meta, ops)
log.Println("获取后的结果:", string(result.Value))
time.Sleep(2 * time.Second)
}
}
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
// 代码
const daprHost = "127.0.0.1";
var main = function() {
for(var i=0;i<10;i++) {
sleep(5000);
var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
start(orderId).catch((e) => {
console.error(e);
process.exit(1);
});
}
}
async function start(orderId) {
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
communicationProtocol: CommunicationProtocolEnum.HTTP,
});
const STATE_STORE_NAME = "statestore";
// 使用Dapr SDK保存和检索多个状态
await client.state.transaction(STATE_STORE_NAME, [
{
operation: "upsert",
request: {
key: "order_3",
value: orderId.toString()
}
},
{
operation: "delete",
request: {
key: "order_2"
}
}
]);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
main();
要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
使用上面相同的Dapr实例,执行两个状态事务:
curl -X POST -H "Content-Type: application/json" -d '{"operations": [{"operation":"upsert", "request": {"key": "order_1", "value": "250"}}, {"operation":"delete", "request": {"key": "order_2"}}]}' http://localhost:3601/v1.0/state/statestore/transaction
现在查看您的状态事务的结果:
curl -X POST -H "Content-Type: application/json" -d '{"keys":["order_1", "order_2"]}' http://localhost:3601/v1.0/state/statestore/bulk
使用上面相同的Dapr实例,将两个键/值对保存到您的状态存储中:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"operations": [{"operation":"upsert", "request": {"key": "order_1", "value": "250"}}, {"operation":"delete", "request": {"key": "order_2"}}]}' -Uri 'http://localhost:3601/v1.0/state/statestore/transaction'
现在查看您的状态事务的结果:
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["order_1", "order_2"]}' -Uri 'http://localhost:3601/v1.0/state/statestore/bulk'
通过状态查询API,您可以从状态存储组件中检索、过滤和排序键/值数据。查询API并不是完整查询语言的替代品。
尽管状态存储是键/值存储,value
可能是一个包含自身层次结构、键和值的JSON文档。查询API允许您使用这些键/值来检索相应的文档。
您可以通过HTTP POST/PUT或gRPC提交查询请求。请求的主体是一个包含以下三个部分的JSON对象:
filter
sort
page
filter
filter
用于指定查询条件,结构类似于树形,每个节点表示一个操作,可能是单一或多个操作数。
支持以下操作:
操作符 | 操作数 | 描述 |
---|---|---|
EQ | key:value | key 等于 value |
NEQ | key:value | key 不等于 value |
GT | key:value | key 大于 value |
GTE | key:value | key 大于等于 value |
LT | key:value | key 小于 value |
LTE | key:value | key 小于等于 value |
IN | key:[]value | key 等于 value[0] 或 value[1] 或 … 或 value[n] |
AND | []operation | operation[0] 且 operation[1] 且 … 且 operation[n] |
OR | []operation | operation[0] 或 operation[1] 或 … 或 operation[n] |
操作数中的key
类似于JSONPath表示法。键中的每个点表示嵌套的JSON结构。例如,考虑以下结构:
{
"shape": {
"name": "rectangle",
"dimensions": {
"height": 24,
"width": 10
},
"color": {
"name": "red",
"code": "#FF0000"
}
}
}
要比较颜色代码的值,键将是shape.color.code
。
如果省略filter
部分,查询将返回所有条目。
sort
sort
是一个有序的key:order
对数组,其中:
key
是状态存储中的一个键order
是一个可选字符串,指示排序顺序:"ASC"
表示升序"DESC"
表示降序order
,默认是升序。page
page
包含limit
和token
参数。
limit
设置每页返回的记录数。token
是组件返回的分页令牌,用于获取后续查询的结果。在后台,此查询请求被转换为本地查询语言并由状态存储组件执行。
让我们来看一些从简单到复杂的真实示例。
作为数据集,考虑一个包含员工ID、组织、州和城市的员工记录集合。注意,这个数据集是一个键/值对数组,其中:
key
是唯一IDvalue
是包含员工记录的JSON对象。为了更好地说明功能,组织名称(org)和员工ID(id)是一个嵌套的JSON person对象。
首先创建一个MongoDB实例,作为您的状态存储。
docker run -d --rm -p 27017:27017 --name mongodb mongo:5
接下来,启动一个Dapr应用程序。参考组件配置文件,该文件指示Dapr使用MongoDB作为其状态存储。
dapr run --app-id demo --dapr-http-port 3500 --resources-path query-api-examples/components/mongodb
用员工数据集填充状态存储,以便您可以稍后查询它。
curl -X POST -H "Content-Type: application/json" -d @query-api-examples/dataset.json http://localhost:3500/v1.0/state/statestore
填充后,您可以检查状态存储中的数据。下图中,MongoDB UI的一部分显示了员工记录。
每个条目都有一个_id
成员作为连接的对象键,以及一个包含JSON记录的value
成员。
查询API允许您从这个JSON结构中选择记录。
现在您可以运行示例查询。
首先,查找加利福尼亚州的所有员工,并按其员工ID降序排序。
这是查询:
{
"filter": {
"EQ": { "state": "CA" }
},
"sort": [
{
"key": "person.id",
"order": "DESC"
}
]
}
此查询在SQL中的等价形式是:
SELECT * FROM c WHERE
state = "CA"
ORDER BY
person.id DESC
使用以下命令执行查询:
curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query1.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query1.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'
查询结果是一个按请求顺序排列的匹配键/值对数组:
{
"results": [
{
"key": "3",
"data": {
"person": {
"org": "Finance",
"id": 1071
},
"city": "Sacramento",
"state": "CA"
},
"etag": "44723d41-deb1-4c23-940e-3e6896c3b6f7"
},
{
"key": "7",
"data": {
"city": "San Francisco",
"state": "CA",
"person": {
"id": 1015,
"org": "Dev Ops"
}
},
"etag": "0e69e69f-3dbc-423a-9db8-26767fcd2220"
},
{
"key": "5",
"data": {
"state": "CA",
"person": {
"org": "Hardware",
"id": 1007
},
"city": "Los Angeles"
},
"etag": "f87478fa-e5c5-4be0-afa5-f9f9d75713d8"
},
{
"key": "9",
"data": {
"person": {
"org": "Finance",
"id": 1002
},
"city": "San Diego",
"state": "CA"
},
"etag": "f5cf05cd-fb43-4154-a2ec-445c66d5f2f8"
}
]
}
现在,查找来自"Dev Ops"和"Hardware"组织的所有员工。
这是查询:
{
"filter": {
"IN": { "person.org": [ "Dev Ops", "Hardware" ] }
}
}
此查询在SQL中的等价形式是:
SELECT * FROM c WHERE
person.org IN ("Dev Ops", "Hardware")
使用以下命令执行查询:
curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query2.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query2.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'
与前一个示例类似,结果是一个匹配键/值对的数组。
在此示例中,查找:
此外,首先按州按字母降序排序,然后按员工ID升序排序。让我们一次处理最多3条记录。
这是查询:
{
"filter": {
"OR": [
{
"EQ": { "person.org": "Dev Ops" }
},
{
"AND": [
{
"EQ": { "person.org": "Finance" }
},
{
"IN": { "state": [ "CA", "WA" ] }
}
]
}
]
},
"sort": [
{
"key": "state",
"order": "DESC"
},
{
"key": "person.id"
}
],
"page": {
"limit": 3
}
}
此查询在SQL中的等价形式是:
SELECT * FROM c WHERE
person.org = "Dev Ops" OR
(person.org = "Finance" AND state IN ("CA", "WA"))
ORDER BY
state DESC,
person.id ASC
LIMIT 3
使用以下命令执行查询:
curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query3.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query3.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'
成功执行后,状态存储返回一个包含匹配记录列表和分页令牌的JSON对象:
{
"results": [
{
"key": "1",
"data": {
"person": {
"org": "Dev Ops",
"id": 1036
},
"city": "Seattle",
"state": "WA"
},
"etag": "6f54ad94-dfb9-46f0-a371-e42d550adb7d"
},
{
"key": "4",
"data": {
"person": {
"org": "Dev Ops",
"id": 1042
},
"city": "Spokane",
"state": "WA"
},
"etag": "7415707b-82ce-44d0-bf15-6dc6305af3b1"
},
{
"key": "10",
"data": {
"person": {
"org": "Dev Ops",
"id": 1054
},
"city": "New York",
"state": "NY"
},
"etag": "26bbba88-9461-48d1-8a35-db07c374e5aa"
}
],
"token": "3"
}
分页令牌在后续查询中“按原样”使用,以获取下一批记录:
{
"filter": {
"OR": [
{
"EQ": { "person.org": "Dev Ops" }
},
{
"AND": [
{
"EQ": { "person.org": "Finance" }
},
{
"IN": { "state": [ "CA", "WA" ] }
}
]
}
]
},
"sort": [
{
"key": "state",
"order": "DESC"
},
{
"key": "person.id"
}
],
"page": {
"limit": 3,
"token": "3"
}
}
curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query3-token.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query3-token.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'
此查询的结果是:
{
"results": [
{
"key": "9",
"data": {
"person": {
"org": "Finance",
"id": 1002
},
"city": "San Diego",
"state": "CA"
},
"etag": "f5cf05cd-fb43-4154-a2ec-445c66d5f2f8"
},
{
"key": "7",
"data": {
"city": "San Francisco",
"state": "CA",
"person": {
"id": 1015,
"org": "Dev Ops"
}
},
"etag": "0e69e69f-3dbc-423a-9db8-26767fcd2220"
},
{
"key": "3",
"data": {
"person": {
"org": "Finance",
"id": 1071
},
"city": "Sacramento",
"state": "CA"
},
"etag": "44723d41-deb1-4c23-940e-3e6896c3b6f7"
}
],
"token": "6"
}
这样,您可以在查询中更新分页令牌,并迭代结果,直到不再返回记录。
状态查询API有以下限制:
您可以在相关链接部分找到更多信息。
在本文中,您将学习如何创建一个可以水平扩展的有状态服务,选择性使用并发和一致性模型。状态管理API可以帮助开发者简化状态协调、冲突解决和故障处理的复杂性。
状态存储组件是Dapr用来与数据库通信的资源。在本指南中,我们将使用默认的Redis状态存储。
当您在本地模式下运行dapr init
时,Dapr会创建一个默认的Redis statestore.yaml
并在您的本地机器上运行一个Redis状态存储,位置如下:
%UserProfile%\.dapr\components\statestore.yaml
~/.dapr/components/statestore.yaml
通过statestore.yaml
组件,您可以轻松替换底层组件而无需更改应用程序代码。
查看支持的状态存储列表。
在强一致性模式下,Dapr确保底层状态存储:
对于读取请求,Dapr确保在副本之间一致地返回最新的数据。默认情况下是最终一致性,除非在请求状态API时另有指定。
以下示例展示了如何使用强一致性保存、获取和删除状态。示例用Python编写,但适用于任何编程语言。
import requests
import json
store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
stateReq = '[{ "key": "k1", "value": "Some Data", "options": { "consistency": "strong" }}]'
response = requests.post(dapr_state_url, json=stateReq)
import requests
import json
store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.get(dapr_state_url + "/key1", headers={"consistency":"strong"})
print(response.headers['ETag'])
import requests
import json
store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.delete(dapr_state_url + "/key1", headers={"consistency":"strong"})
如果没有指定concurrency
选项,默认是后写胜出并发模式。
Dapr允许开发者在使用数据存储时选择两种常见的并发模式:
Dapr使用版本号来确定特定键是否已更新。您可以:
如果自从检索版本号以来版本信息已更改,将抛出错误,要求您执行另一次读取以获取最新的版本信息和状态。
Dapr利用ETags来确定状态的版本号。ETags从状态请求中以ETag
头返回。使用ETags,您的应用程序知道自上次检查以来资源已更新,因为在ETag不匹配时会出错。
以下示例展示了如何:
以下示例用Python编写,但适用于任何编程语言。
import requests
import json
store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.get(dapr_state_url + "/key1", headers={"concurrency":"first-write"})
etag = response.headers['ETag']
newState = '[{ "key": "k1", "value": "New Data", "etag": {}, "options": { "concurrency": "first-write" }}]'.format(etag)
requests.post(dapr_state_url, json=newState)
response = requests.delete(dapr_state_url + "/key1", headers={"If-Match": "{}".format(etag)})
在以下示例中,您将看到如何在版本已更改时重试保存状态操作:
import requests
import json
# 此方法保存状态,如果保存状态失败则返回false
def save_state(data):
try:
store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.post(dapr_state_url, json=data)
if response.status_code == 200:
return True
except:
return False
return False
# 此方法获取状态并返回响应,ETag在头中 -->
def get_state(key):
response = requests.get("http://localhost:3500/v1.0/state/<state_store_name>/{}".format(key), headers={"concurrency":"first-write"})
return response
# 当保存状态成功时退出。如果存在ETag不匹配,success将为False -->
success = False
while success != True:
response = get_state("key1")
etag = response.headers['ETag']
newState = '[{ "key": "key1", "value": "New Data", "etag": {}, "options": { "concurrency": "first-write" }}]'.format(etag)
success = save_state(newState)
事务性 Outbox 模式是一种广为人知的设计模式,用于发送应用程序状态变化的通知。它通过一个跨越数据库和消息代理的单一事务来传递通知。
开发人员在尝试自行实现此模式时会遇到许多技术难题,通常需要编写复杂且容易出错的中央协调管理器,这些管理器最多支持一种或两种数据库和消息代理的组合。
例如,您可以使用 Outbox 模式来:
通过 Dapr 的 Outbox 支持,您可以在调用 Dapr 的事务 API时通知订阅者应用程序的状态何时被创建或更新。
下图概述了 Outbox 功能的工作原理:
Outbox 功能可以与 Dapr 支持的任何事务性状态存储一起使用。所有发布/订阅代理都支持 Outbox 功能。
要启用 Outbox 功能,请在状态存储组件上添加以下必需和可选字段:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mysql-outbox
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "<CONNECTION STRING>"
- name: outboxPublishPubsub # 必需
value: "mypubsub"
- name: outboxPublishTopic # 必需
value: "newOrder"
- name: outboxPubsub # 可选
value: "myOutboxPubsub"
- name: outboxDiscardWhenMissingState # 可选,默认为 false
value: false
名称 | 必需 | 默认值 | 描述 |
---|---|---|---|
outboxPublishPubsub | 是 | N/A | 设置发布状态更改时传递通知的发布/订阅组件的名称 |
outboxPublishTopic | 是 | N/A | 设置接收在配置了 outboxPublishPubsub 的发布/订阅上的状态更改的主题。消息体将是 insert 或 update 操作的状态事务项 |
outboxPubsub | 否 | outboxPublishPubsub | 设置 Dapr 用于协调状态和发布/订阅事务的发布/订阅组件。如果未设置,则使用配置了 outboxPublishPubsub 的发布/订阅组件。如果您希望将用于发送通知状态更改的发布/订阅组件与用于协调事务的组件分开,这将很有用 |
outboxDiscardWhenMissingState | 否 | false | 通过将 outboxDiscardWhenMissingState 设置为 true ,如果 Dapr 无法在数据库中找到状态且不重试,则 Dapr 将丢弃事务。如果在 Dapr 能够传递消息之前,状态存储数据因任何原因被删除,并且您希望 Dapr 从发布/订阅中删除项目并停止重试获取状态,此设置可能会很有用 |
如果您希望使用相同的状态存储来发送 Outbox 和非 Outbox 消息,只需定义两个连接到相同状态存储的状态存储组件,其中一个具有 Outbox 功能,另一个没有。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mysql
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "<CONNECTION STRING>"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mysql-outbox
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "<CONNECTION STRING>"
- name: outboxPublishPubsub # 必需
value: "mypubsub"
- name: outboxPublishTopic # 必需
value: "newOrder"
您可以通过设置另一个不保存到数据库并明确提及为投影的事务来覆盖发布到发布/订阅代理的 Outbox 模式消息。此事务添加了一个名为 outbox.projection
的元数据键,值设置为 true
。当添加到事务中保存的状态数组时,此负载在写入状态时被忽略,数据用作发送到上游订阅者的负载。
要正确使用,key
值必须在状态存储上的操作和消息投影之间匹配。如果键不匹配,则整个事务失败。
如果您为同一键启用了两个或多个 outbox.projection
状态项,则使用第一个定义的项,其他项将被忽略。
在以下 Python SDK 的状态事务示例中,值 "2"
被保存到数据库,但值 "3"
被发布到最终用户主题。
DAPR_STORE_NAME = "statestore"
async def main():
client = DaprClient()
# 定义第一个状态操作以保存值 "2"
op1 = StateItem(
key="key1",
value=b"2"
)
# 定义第二个状态操作以带有元数据发布值 "3"
op2 = StateItem(
key="key1",
value=b"3",
options=StateOptions(
metadata={
"outbox.projection": "true"
}
)
)
# 创建状态操作列表
ops = [op1, op2]
# 执行状态事务
await client.state.transaction(DAPR_STORE_NAME, operations=ops)
print("状态事务已执行。")
通过将元数据项 "outbox.projection"
设置为 "true"
并确保 key
值匹配(key1
):
在以下 JavaScript SDK 的状态事务示例中,值 "2"
被保存到数据库,但值 "3"
被发布到最终用户主题。
const { DaprClient, StateOperationType } = require('@dapr/dapr');
const DAPR_STORE_NAME = "statestore";
async function main() {
const client = new DaprClient();
// 定义第一个状态操作以保存值 "2"
const op1 = {
operation: StateOperationType.UPSERT,
request: {
key: "key1",
value: "2"
}
};
// 定义第二个状态操作以带有元数据发布值 "3"
const op2 = {
operation: StateOperationType.UPSERT,
request: {
key: "key1",
value: "3",
metadata: {
"outbox.projection": "true"
}
}
};
// 创建状态操作列表
const ops = [op1, op2];
// 执行状态事务
await client.state.transaction(DAPR_STORE_NAME, ops);
console.log("状态事务已执行。");
}
main().catch(err => {
console.error(err);
});
通过将元数据项 "outbox.projection"
设置为 "true"
并确保 key
值匹配(key1
):
在以下 .NET SDK 的状态事务示例中,值 "2"
被保存到数据库,但值 "3"
被发布到最终用户主题。
public class Program
{
private const string DAPR_STORE_NAME = "statestore";
public static async Task Main(string[] args)
{
var client = new DaprClientBuilder().Build();
// 定义第一个状态操作以保存值 "2"
var op1 = new StateTransactionRequest(
key: "key1",
value: Encoding.UTF8.GetBytes("2"),
operationType: StateOperationType.Upsert
);
// 定义第二个状态操作以带有元数据发布值 "3"
var metadata = new Dictionary<string, string>
{
{ "outbox.projection", "true" }
};
var op2 = new StateTransactionRequest(
key: "key1",
value: Encoding.UTF8.GetBytes("3"),
operationType: StateOperationType.Upsert,
metadata: metadata
);
// 创建状态操作列表
var ops = new List<StateTransactionRequest> { op1, op2 };
// 执行状态事务
await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, ops);
Console.WriteLine("状态事务已执行。");
}
}
通过将元数据项 "outbox.projection"
设置为 "true"
并确保 key
值匹配(key1
):
在以下 Java SDK 的状态事务示例中,值 "2"
被保存到数据库,但值 "3"
被发布到最终用户主题。
public class Main {
private static final String DAPR_STORE_NAME = "statestore";
public static void main(String[] args) {
try (DaprClient client = new DaprClientBuilder().build()) {
// 定义第一个状态操作以保存值 "2"
StateOperation<String> op1 = new StateOperation<>(
StateOperationType.UPSERT,
"key1",
"2"
);
// 定义第二个状态操作以带有元数据发布值 "3"
Map<String, String> metadata = new HashMap<>();
metadata.put("outbox.projection", "true");
StateOperation<String> op2 = new StateOperation<>(
StateOperationType.UPSERT,
"key1",
"3",
metadata
);
// 创建状态操作列表
List<StateOperation<?>> ops = new ArrayList<>();
ops.add(op1);
ops.add(op2);
// 执行状态事务
client.executeStateTransaction(DAPR_STORE_NAME, ops).block();
System.out.println("状态事务已执行。");
} catch (Exception e) {
e.printStackTrace();
}
}
}
通过将元数据项 "outbox.projection"
设置为 "true"
并确保 key
值匹配(key1
):
在以下 Go SDK 的状态事务示例中,值 "2"
被保存到数据库,但值 "3"
被发布到最终用户主题。
ops := make([]*dapr.StateOperation, 0)
op1 := &dapr.StateOperation{
Type: dapr.StateOperationTypeUpsert,
Item: &dapr.SetStateItem{
Key: "key1",
Value: []byte("2"),
},
}
op2 := &dapr.StateOperation{
Type: dapr.StateOperationTypeUpsert,
Item: &dapr.SetStateItem{
Key: "key1",
Value: []byte("3"),
// 覆盖保存到数据库的数据负载
Metadata: map[string]string{
"outbox.projection": "true",
},
},
}
ops = append(ops, op1, op2)
meta := map[string]string{}
err := testClient.ExecuteStateTransaction(ctx, store, meta, ops)
通过将元数据项 "outbox.projection"
设置为 "true"
并确保 key
值匹配(key1
):
您可以使用以下 HTTP 请求传递消息覆盖:
curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
-H "Content-Type: application/json" \
-d '{
"operations": [
{
"operation": "upsert",
"request": {
"key": "order1",
"value": {
"orderId": "7hf8374s",
"type": "book",
"name": "The name of the wind"
}
}
},
{
"operation": "upsert",
"request": {
"key": "order1",
"value": {
"orderId": "7hf8374s"
},
"metadata": {
"outbox.projection": "true"
},
"contentType": "application/json"
}
}
]
}'
通过将元数据项 "outbox.projection"
设置为 "true"
并确保 key
值匹配(key1
):
您可以使用自定义 CloudEvent 元数据覆盖发布的 Outbox 事件上的Dapr 生成的 CloudEvent 字段。
async def execute_state_transaction():
async with DaprClient() as client:
# 定义状态操作
ops = []
op1 = {
'operation': 'upsert',
'request': {
'key': 'key1',
'value': b'2', # 将字符串转换为字节数组
'metadata': {
'cloudevent.id': 'unique-business-process-id',
'cloudevent.source': 'CustomersApp',
'cloudevent.type': 'CustomerCreated',
'cloudevent.subject': '123',
'my-custom-ce-field': 'abc'
}
}
}
ops.append(op1)
# 执行状态事务
store_name = 'your-state-store-name'
try:
await client.execute_state_transaction(store_name, ops)
print('状态事务已执行。')
except Exception as e:
print('执行状态事务时出错:', e)
# 运行异步函数
if __name__ == "__main__":
asyncio.run(execute_state_transaction())
const { DaprClient } = require('dapr-client');
async function executeStateTransaction() {
// 初始化 Dapr 客户端
const daprClient = new DaprClient();
// 定义状态操作
const ops = [];
const op1 = {
operationType: 'upsert',
request: {
key: 'key1',
value: Buffer.from('2'),
metadata: {
'id': 'unique-business-process-id',
'source': 'CustomersApp',
'type': 'CustomerCreated',
'subject': '123',
'my-custom-ce-field': 'abc'
}
}
};
ops.push(op1);
// 执行状态事务
const storeName = 'your-state-store-name';
const metadata = {};
}
executeStateTransaction();
public class StateOperationExample
{
public async Task ExecuteStateTransactionAsync()
{
var daprClient = new DaprClientBuilder().Build();
// 将值 "2" 定义为字符串并序列化为字节数组
var value = "2";
var valueBytes = JsonSerializer.SerializeToUtf8Bytes(value);
// 定义第一个状态操作以保存值 "2" 并带有元数据
// 覆盖 Cloudevent 元数据
var metadata = new Dictionary<string, string>
{
{ "cloudevent.id", "unique-business-process-id" },
{ "cloudevent.source", "CustomersApp" },
{ "cloudevent.type", "CustomerCreated" },
{ "cloudevent.subject", "123" },
{ "my-custom-ce-field", "abc" }
};
var op1 = new StateTransactionRequest(
key: "key1",
value: valueBytes,
operationType: StateOperationType.Upsert,
metadata: metadata
);
// 创建状态操作列表
var ops = new List<StateTransactionRequest> { op1 };
// 执行状态事务
var storeName = "your-state-store-name";
await daprClient.ExecuteStateTransactionAsync(storeName, ops);
Console.WriteLine("状态事务已执行。");
}
public static async Task Main(string[] args)
{
var example = new StateOperationExample();
await example.ExecuteStateTransactionAsync();
}
}
public class StateOperationExample {
public static void main(String[] args) {
executeStateTransaction();
}
public static void executeStateTransaction() {
// 构建 Dapr 客户端
try (DaprClient daprClient = new DaprClientBuilder().build()) {
// 定义值 "2"
String value = "2";
// 覆盖 CloudEvent 元数据
Map<String, String> metadata = new HashMap<>();
metadata.put("cloudevent.id", "unique-business-process-id");
metadata.put("cloudevent.source", "CustomersApp");
metadata.put("cloudevent.type", "CustomerCreated");
metadata.put("cloudevent.subject", "123");
metadata.put("my-custom-ce-field", "abc");
// 定义状态操作
List<StateOperation<?>> ops = new ArrayList<>();
StateOperation<String> op1 = new StateOperation<>(
StateOperationType.UPSERT,
"key1",
value,
metadata
);
ops.add(op1);
// 执行状态事务
String storeName = "your-state-store-name";
daprClient.executeStateTransaction(storeName, ops).block();
System.out.println("状态事务已执行。");
} catch (Exception e) {
e.printStackTrace();
}
}
}
func main() {
// 创建 Dapr 客户端
client, err := dapr.NewClient()
if err != nil {
log.Fatalf("创建 Dapr 客户端失败: %v", err)
}
defer client.Close()
ctx := context.Background()
store := "your-state-store-name"
// 定义状态操作
ops := make([]*dapr.StateOperation, 0)
op1 := &dapr.StateOperation{
Type: dapr.StateOperationTypeUpsert,
Item: &dapr.SetStateItem{
Key: "key1",
Value: []byte("2"),
// 覆盖 Cloudevent 元数据
Metadata: map[string]string{
"cloudevent.id": "unique-business-process-id",
"cloudevent.source": "CustomersApp",
"cloudevent.type": "CustomerCreated",
"cloudevent.subject": "123",
"my-custom-ce-field": "abc",
},
},
}
ops = append(ops, op1)
// 事务的元数据(如果有)
meta := map[string]string{}
// 执行状态事务
err = client.ExecuteStateTransaction(ctx, store, meta, ops)
if err != nil {
log.Fatalf("执行状态事务失败: %v", err)
}
log.Println("状态事务已执行。")
}
curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
-H "Content-Type: application/json" \
-d '{
"operations": [
{
"operation": "upsert",
"request": {
"key": "key1",
"value": "2"
}
},
],
"metadata": {
"id": "unique-business-process-id",
"source": "CustomersApp",
"type": "CustomerCreated",
"subject": "123",
"my-custom-ce-field": "abc",
}
}'
data
CloudEvent 字段仅供 Dapr 使用,且不可自定义。Dapr 提供了多种在应用程序之间共享状态的方法。
不同的架构在共享状态时可能有不同的需求。在某些情况下,您可能会希望:
在其他情况下,您可能需要两个应用程序在同一状态上进行操作,以便获取和保存相同的键。
为了实现状态共享,Dapr 支持以下键前缀策略:
键前缀 | 描述 |
---|---|
appid | 默认策略,允许您仅通过指定 appid 的应用程序管理状态。所有状态键将以 appid 为前缀,并限定于该应用程序。 |
name | 使用状态存储组件的名称作为前缀。多个应用程序可以共享同一状态存储中的相同状态。 |
namespace | 如果设置了命名空间,此策略会将 appid 键前缀替换为配置的命名空间,生成一个限定于该命名空间的键。这允许在不同命名空间中具有相同 appid 的应用程序重用相同的状态存储。如果未配置命名空间,则会回退到 appid 策略。有关 Dapr 中命名空间的更多信息,请参见 操作指南:将组件限定到一个或多个应用程序 |
none | 不使用任何前缀。多个应用程序可以在不同的状态存储中共享状态,而不受特定前缀的限制。 |
要指定前缀策略,请在状态组件上添加名为 keyPrefix
的元数据键:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: production
spec:
type: state.redis
version: v1
metadata:
- name: keyPrefix
value: <key-prefix-strategy>
以下示例演示了使用每种支持的前缀策略进行状态检索的情况。
appid
(默认)在下面的示例中,具有应用程序 ID myApp
的 Dapr 应用程序正在将状态保存到名为 redis
的状态存储中:
curl -X POST http://localhost:3500/v1.0/state/redis \
-H "Content-Type: application/json"
-d '[
{
"key": "darth",
"value": "nihilus"
}
]'
键将被保存为 myApp||darth
。
namespace
在命名空间 production
中运行的具有应用程序 ID myApp
的 Dapr 应用程序正在将状态保存到名为 redis
的状态存储中:
curl -X POST http://localhost:3500/v1.0/state/redis \
-H "Content-Type: application/json"
-d '[
{
"key": "darth",
"value": "nihilus"
}
]'
键将被保存为 production.myApp||darth
。
name
在下面的示例中,具有应用程序 ID myApp
的 Dapr 应用程序正在将状态保存到名为 redis
的状态存储中:
curl -X POST http://localhost:3500/v1.0/state/redis \
-H "Content-Type: application/json"
-d '[
{
"key": "darth",
"value": "nihilus"
}
]'
键将被保存为 redis||darth
。
none
在下面的示例中,具有应用程序 ID myApp
的 Dapr 应用程序正在将状态保存到名为 redis
的状态存储中:
curl -X POST http://localhost:3500/v1.0/state/redis \
-H "Content-Type: application/json"
-d '[
{
"key": "darth",
"value": "nihilus"
}
]'
键将被保存为 darth
。
对静态应用程序状态进行加密,以在企业工作负载或受监管环境中提供更强的安全性。Dapr 提供基于 AES 的自动客户端加密,采用 Galois/Counter Mode (GCM),支持 128、192 和 256 位的密钥。
除了自动加密,Dapr 还支持主加密密钥和次加密密钥,使开发人员和运维团队更容易启用密钥轮换策略。此功能由所有 Dapr 状态存储支持。
加密密钥始终从 secret 中获取,不能在 metadata
部分中以明文形式提供。
将以下 metadata
部分添加到任何 Dapr 支持的状态存储中:
metadata:
- name: primaryEncryptionKey
secretKeyRef:
name: mysecret
key: mykey # key 是可选的。
例如,这是一个 Redis 加密状态存储的完整 YAML:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: primaryEncryptionKey
secretKeyRef:
name: mysecret
key: mykey
现在,您已配置了一个 Dapr 状态存储,以从名为 mysecret
的 secret 中获取加密密钥,其中包含名为 mykey
的实际加密密钥。
实际的加密密钥必须是有效的、十六进制编码的加密密钥。虽然支持 192 位和 256 位密钥,但建议使用 128 位加密密钥。如果加密密钥无效,Dapr 会报错并退出。
例如,您可以使用以下命令生成一个随机的、十六进制编码的 128 位(16 字节)密钥:
openssl rand 16 | hexdump -v -e '/1 "%02x"'
# 结果将类似于 "cb321007ad11a9d23f963bff600d58e0"
注意,secret 存储不必支持密钥。
为了支持密钥轮换,Dapr 提供了一种指定次加密密钥的方法:
metadata:
- name: primaryEncryptionKey
secretKeyRef:
name: mysecret
key: mykey
- name: secondaryEncryptionKey
secretKeyRef:
name: mysecret2
key: mykey2
当 Dapr 启动时,它会获取 metadata
部分中列出的包含加密密钥的 secrets。Dapr 自动识别哪个状态项是用哪个密钥加密的,因为它会将 secretKeyRef.name
字段附加到实际状态密钥的末尾。
要轮换密钥,
primaryEncryptionKey
以指向包含新密钥的 secret。secondaryEncryptionKey
。新数据将使用新密钥加密,任何检索到的旧数据将使用次密钥解密。
使用旧密钥加密的数据项的任何更新都将使用新密钥重新加密。
Dapr 在保存和检索状态时不对状态值进行转换。Dapr 要求所有状态存储实现遵循特定的键格式规范(参见状态管理规范)。您可以直接与底层存储交互以操作状态数据,例如:
要连接到您的 Cosmos DB 实例,您可以:
要获取与应用程序 “myapp” 关联的所有状态键,请使用查询:
SELECT * FROM states WHERE CONTAINS(states.id, 'myapp||')
上述查询返回所有 id 包含 “myapp||” 的文档,这是状态键的前缀。
要通过键 “balance” 获取应用程序 “myapp” 的状态数据,请使用查询:
SELECT * FROM states WHERE states.id = 'myapp||balance'
读取返回文档的 value 字段。要获取状态版本/ETag,请使用命令:
SELECT states._etag FROM states WHERE states.id = 'myapp||balance'
要获取与实例 ID 为 “leroy” 的 actor 类型 “cat” 关联的所有状态键,该 actor 属于 ID 为 “mypets” 的应用程序,请使用命令:
SELECT * FROM states WHERE CONTAINS(states.id, 'mypets||cat||leroy||')
要获取特定的 actor 状态,例如 “food”,请使用命令:
SELECT * FROM states WHERE states.id = 'mypets||cat||leroy||food'
Dapr 在保存和检索状态时不对状态值进行转换。Dapr 要求所有状态存储实现遵循特定的键格式规范(参见状态管理规范)。您可以直接与底层存储交互以操作状态数据,例如:
您可以使用官方的 redis-cli 或任何其他兼容 Redis 的工具连接到 Redis 状态存储以直接查询 Dapr 状态。如果您在容器中运行 Redis,最简单的使用 redis-cli 的方法是通过容器:
docker run --rm -it --link <Redis 容器的名称> redis redis-cli -h <Redis 容器的名称>
要获取与应用程序 “myapp” 关联的所有状态键,请使用命令:
KEYS myapp*
上述命令返回现有键的列表,例如:
1) "myapp||balance"
2) "myapp||amount"
Dapr 将状态值保存为哈希值。每个哈希值包含一个 “data” 字段,其中存储状态数据,以及一个 “version” 字段,作为 ETag,表示不断递增的版本。
例如,要通过键 “balance” 获取应用程序 “myapp” 的状态数据,请使用命令:
HGET myapp||balance data
要获取状态版本/ETag,请使用命令:
HGET myapp||balance version
要获取与应用程序 ID 为 “mypets” 的 actor 类型 “cat” 的实例 ID 为 “leroy” 关联的所有状态键,请使用命令:
KEYS mypets||cat||leroy*
要获取特定的 actor 状态,例如 “food”,请使用命令:
HGET mypets||cat||leroy||food value
Dapr 在保存和检索状态时不对状态值进行转换。Dapr 要求所有状态存储实现遵循特定的键格式(参见状态管理规范)。您可以直接与底层存储交互来操作状态数据,例如:
连接到 SQL Server 实例的最简单方法是使用:
要获取与应用程序 “myapp” 关联的所有状态键,请使用以下查询:
SELECT * FROM states WHERE [Key] LIKE 'myapp||%'
上述查询返回所有 ID 包含 “myapp||” 的行,这是状态键的前缀。
要通过键 “balance” 获取应用程序 “myapp” 的状态数据,请使用以下查询:
SELECT * FROM states WHERE [Key] = 'myapp||balance'
读取返回行的 Data 字段。要获取状态版本/ETag,请使用以下命令:
SELECT [RowVersion] FROM states WHERE [Key] = 'myapp||balance'
要获取 JSON 数据中值 “color” 等于 “blue” 的所有状态数据,请使用以下查询:
SELECT * FROM states WHERE JSON_VALUE([Data], '$.color') = 'blue'
要获取与 actor 类型 “cat” 的实例 ID “leroy” 关联的所有状态键,该 actor 属于 ID 为 “mypets” 的应用程序,请使用以下命令:
SELECT * FROM states WHERE [Key] LIKE 'mypets||cat||leroy||%'
要获取特定的 actor 状态,例如 “food”,请使用以下命令:
SELECT * FROM states WHERE [Key] = 'mypets||cat||leroy||food'
Dapr 允许为每个状态设置生存时间 (TTL)。这意味着应用程序可以为存储的每个状态指定一个生存时间,过期后将无法检索这些状态。
对于支持的状态存储,只需在发布消息时设置 ttlInSeconds
元数据。其他状态存储将忽略此值。对于某些状态存储,您可以为每个表或容器指定默认的过期时间。
当状态存储组件原生支持状态 TTL 时,Dapr 会直接传递 TTL 配置,而不添加额外的逻辑,从而保持行为的一致性。这在组件以不同方式处理过期状态时尤为有用。
如果未指定 TTL,将保留状态存储的默认行为。
对于允许为所有数据指定默认 TTL 的状态存储,持久化状态的方式包括:
如果未指定特定的 TTL,数据将在全局 TTL 时间段后过期,这不是由 Dapr 直接控制的。
此外,所有状态存储还支持显式持久化数据的选项。这意味着您可以忽略默认的数据库策略(可能是在 Dapr 之外或通过 Dapr 组件设置的),以无限期保留特定的数据库记录。您可以通过将 ttlInSeconds
设置为 -1
来实现,这表示忽略任何设置的 TTL 值。
请参阅状态存储组件指南中的 TTL 列。
您可以在状态存储请求的元数据中设置状态 TTL:
# 依赖
from dapr.clients import DaprClient
# 代码
DAPR_STORE_NAME = "statestore"
with DaprClient() as client:
client.save_state(DAPR_STORE_NAME, "order_1", str(orderId), state_metadata={
'ttlInSeconds': '120'
})
要启动 Dapr sidecar 并运行上述示例应用程序,您可以运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖
using Dapr.Client;
// 代码
await client.SaveStateAsync(storeName, stateKeyName, state, metadata: new Dictionary<string, string>() {
{
"ttlInSeconds", "120"
}
});
要启动 Dapr sidecar 并运行上述示例应用程序,您可以运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖
import (
dapr "github.com/dapr/go-sdk/client"
)
// 代码
md := map[string]string{"ttlInSeconds": "120"}
if err := client.SaveState(ctx, store, "key1", []byte("hello world"), md); err != nil {
panic(err)
}
要启动 Dapr sidecar 并运行上述示例应用程序,您可以运行类似以下的命令:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run .
curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250", "metadata": { "ttlInSeconds": "120" } }]' http://localhost:3601/v1.0/state/statestore
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "order_1", "value": "250", "metadata": {"ttlInSeconds": "120"}}]' -Uri 'http://localhost:3601/v1.0/state/statestore'