1 - 状态管理概述

状态管理API模块概述

您的应用程序可以利用Dapr的状态管理API在支持的状态存储中保存、读取和查询键/值对。通过状态存储组件,您可以构建有状态且长时间运行的应用程序,例如购物车或游戏的会话状态。如下图所示:

  • 使用HTTP POST来保存或查询键/值对。
  • 使用HTTP GET来读取特定键并返回其值。

以下视频和演示概述了Dapr状态管理的工作原理。

功能

通过状态管理API模块,您的应用程序可以利用一些通常复杂且容易出错的功能,包括:

  • 设置并发控制和数据一致性的选项。
  • 执行批量更新操作CRUD,包括多个事务操作。
  • 查询和过滤键/值数据。

以下是状态管理API的一些功能:

可插拔的状态存储

Dapr的数据存储被设计为组件,可以在不更改服务代码的情况下进行替换。查看支持的状态存储以获取更多信息。

可配置的状态存储行为

使用Dapr,您可以在状态操作请求中附加元数据,描述您期望请求如何被处理。您可以附加:

  • 并发性要求
  • 一致性要求

默认情况下,您的应用程序应假设数据存储是最终一致的,并使用最后写入胜出并发模式

并非所有存储都是平等的。为了确保您的应用程序的可移植性,您可以查询存储的元数据能力,并使您的代码适应不同的存储能力。

并发

Dapr支持使用ETags的乐观并发控制(OCC)。当请求状态值时,Dapr总是将ETag属性附加到返回的状态中。当用户代码:

  • 更新状态时,期望通过请求体附加ETag。
  • 删除状态时,期望通过If-Match头附加ETag。

当提供的ETag与状态存储中的ETag匹配时,写入操作成功。

为什么Dapr选择乐观并发控制(OCC)

在许多应用程序中,数据更新冲突很少见,因为客户端通常根据业务上下文分区以操作不同的数据。然而,如果您的应用程序选择使用ETags,不匹配的ETags可能导致请求被拒绝。建议您在代码中使用重试策略,以在使用ETags时补偿冲突。

如果您的应用程序在写入请求中省略ETags,Dapr在处理请求时会跳过ETag检查。这使得最后写入胜出模式成为可能,与使用ETags的首次写入胜出模式相比。

阅读API参考以了解如何设置并发选项。

一致性

Dapr支持强一致性最终一致性,最终一致性是默认行为。

  • 强一致性:Dapr在确认写入请求之前等待所有副本(或指定的法定人数)确认。
  • 最终一致性:Dapr在底层数据存储接受写入请求后立即返回,即使这只是一个副本。

阅读API参考以了解如何设置一致性选项。

设置内容类型

状态存储组件可能会根据内容类型不同地维护和操作数据。Dapr支持在状态管理API中作为请求元数据的一部分传递内容类型。

设置内容类型是_可选的_,组件决定是否使用它。Dapr仅提供将此信息传递给组件的手段。

  • 使用HTTP API:通过URL查询参数metadata.contentType设置内容类型。例如,http://localhost:3500/v1.0/state/store?metadata.contentType=application/json
  • 使用gRPC API:通过在请求元数据中添加键/值对"contentType" : <content type>来设置内容类型。

多重操作

Dapr支持两种类型的多读或多写操作:批量事务性。阅读API参考以了解如何使用批量和多选项。

批量读取操作

您可以将多个读取请求分组为批量(或批次)操作。在批量操作中,Dapr将读取请求作为单独的请求提交到底层数据存储,并将它们作为单个结果返回。

事务性操作

您可以将写入、更新和删除操作分组为一个请求,然后作为一个原子事务处理。请求将作为一组事务性操作成功或失败。

actor状态

事务性状态存储可用于存储actor状态。要指定用于actor的状态存储,请在状态存储组件的元数据部分中将属性actorStateStore的值指定为true。actor状态以特定方案存储在事务性状态存储中,允许进行一致的查询。所有actor只能使用一个状态存储组件作为状态存储。阅读state API参考actors API参考以了解有关actor状态存储的更多信息。

actor状态的生存时间(TTL)

在保存actor状态时,您应始终设置TTL元数据字段(ttlInSeconds)或在您选择的SDK中使用等效的API调用,以确保状态最终被移除。阅读actors概述以获取更多信息。

状态加密

Dapr支持应用程序状态的自动客户端加密,并支持密钥轮换。这在所有Dapr状态存储上都支持。有关更多信息,请阅读如何:加密应用程序状态主题。

应用程序之间的共享状态

不同应用程序在共享状态时的需求各不相同。在一种情况下,您可能希望将所有状态封装在给定应用程序中,并让Dapr为您管理访问。在另一种情况下,您可能希望两个应用程序在同一状态上工作,以获取和保存相同的键。

Dapr使状态能够:

  • 隔离到一个应用程序。
  • 在应用程序之间的状态存储中共享。
  • 在不同状态存储之间的多个应用程序之间共享。

有关更多详细信息,请阅读如何:在应用程序之间共享状态

启用外发模式

Dapr使开发人员能够使用外发模式在事务性状态存储和任何消息代理之间实现单一事务。有关更多信息,请阅读如何启用事务性外发消息

查询状态

有两种方法可以查询状态:

  • 使用Dapr运行时提供的状态管理查询API。
  • 使用存储的原生SDK直接查询状态存储。

查询API

使用_可选的_状态管理查询API,您可以查询状态存储中保存的键/值数据,无论底层数据库或存储技术如何。使用状态管理查询API,您可以过滤、排序和分页键/值数据。有关更多详细信息,请阅读如何:查询状态

直接查询状态存储

Dapr在不进行任何转换的情况下保存和检索状态值。您可以直接从底层状态存储查询和聚合状态。例如,要获取与应用程序ID “myApp” 相关的所有状态键,请在Redis中使用:

KEYS "myApp*"
查询actor状态

如果数据存储支持SQL查询,您可以使用SQL查询actor的状态。例如:

SELECT * FROM StateTable WHERE Id='<app-id>||<actor-type>||<actor-id>||<key>'

您还可以通过对actor实例执行聚合查询来避免actor框架的常见轮次并发限制。例如,要计算所有温度计actor的平均温度,请使用:

SELECT AVG(value) FROM StateTable WHERE Id LIKE '<app-id>||<thermometer>||*||temperature'

状态生存时间(TTL)

Dapr支持每个状态设置请求的生存时间(TTL)。这意味着应用程序可以为每个存储的状态设置生存时间,这些状态在过期后无法检索。

状态管理API

状态管理API可以在状态管理API参考中找到,该参考描述了如何通过提供键来检索、保存、删除和查询状态值。

试用状态管理

快速入门和教程

想要测试Dapr状态管理API吗?通过以下快速入门和教程,看看状态管理的实际应用:

快速入门/教程描述
状态管理快速入门使用状态管理API创建有状态的应用程序。
Hello World推荐
演示如何在本地运行Dapr。突出显示服务调用和状态管理。
Hello World Kubernetes推荐
演示如何在Kubernetes中运行Dapr。突出显示服务调用和_状态管理_。

直接在您的应用中开始使用状态管理

想要跳过快速入门?没问题。您可以直接在您的应用程序中试用状态管理模块。在Dapr安装后,您可以从状态管理如何指南开始使用状态管理API。

下一步

2 - 操作指南:保存和获取状态

使用键值对持久化状态

状态管理是新应用程序、遗留应用程序、单体应用程序或微服务应用程序的常见需求之一。处理和测试不同的数据库库,以及处理重试和故障,可能既困难又耗时。

在本指南中,您将学习如何使用键/值状态API来保存、获取和删除应用程序的状态。

下面的代码示例描述了一个处理订单的应用程序,该应用程序使用Dapr sidecar。订单处理服务通过Dapr将状态存储在Redis状态存储中。

示例服务的状态管理图示

设置状态存储

状态存储组件是Dapr用于与数据库通信的资源。

在本指南中,我们将使用Redis状态存储,但您也可以选择支持列表中的其他状态存储。

当您在selfhost模式下运行dapr init时,Dapr会在您的本地机器上创建一个默认的Redis statestore.yaml并运行一个Redis状态存储,位置如下:

  • 在Windows上,位于%UserProfile%\.dapr\components\statestore.yaml
  • 在Linux/MacOS上,位于~/.dapr/components/statestore.yaml

通过使用statestore.yaml组件,您可以在不更改应用程序代码的情况下轻松更换底层组件。

要将其部署到Kubernetes集群中,请在下面的YAML中填写您的状态存储组件metadata连接详细信息,保存为statestore.yaml,然后运行kubectl apply -f statestore.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

请参阅如何在Kubernetes上设置不同的状态存储以获取更多信息。

保存和检索单个状态

以下示例展示了如何使用Dapr状态管理API保存和检索单个键/值对。


// 依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
using System.Text.Json;

// 代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string DAPR_STORE_NAME = "statestore";
            while(true) {
                System.Threading.Thread.Sleep(5000);
                using var client = new DaprClientBuilder().Build();
                Random random = new Random();
                int orderId = random.Next(1,1000);
                // 使用Dapr SDK保存和获取状态
                await client.SaveStateAsync(DAPR_STORE_NAME, "order_1", orderId.ToString());
                await client.SaveStateAsync(DAPR_STORE_NAME, "order_2", orderId.ToString());
                var result = await client.GetStateAsync<string>(DAPR_STORE_NAME, "order_1");
                Console.WriteLine("获取后的结果: " + result);
            }
        }
    }
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.TransactionalStateOperation;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Random;
import java.util.concurrent.TimeUnit;

// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {

	private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

	private static final String STATE_STORE_NAME = "statestore";

	public static void main(String[] args) throws InterruptedException{
		while(true) {
			TimeUnit.MILLISECONDS.sleep(5000);
			Random random = new Random();
			int orderId = random.nextInt(1000-1) + 1;
			DaprClient client = new DaprClientBuilder().build();
            // 使用Dapr SDK保存和获取状态
			client.saveState(STATE_STORE_NAME, "order_1", Integer.toString(orderId)).block();
			client.saveState(STATE_STORE_NAME, "order_2", Integer.toString(orderId)).block();
			Mono<State<String>> result = client.getState(STATE_STORE_NAME, "order_1", String.class);
			log.info("获取后的结果" + result);
		}
	}

}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
import random
from time import sleep    
import requests
import logging
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType

# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"
while True:
    sleep(random.randrange(50, 5000) / 1000)
    orderId = random.randint(1, 1000)
    with DaprClient() as client:
        # 使用Dapr SDK保存和获取状态
        client.save_state(DAPR_STORE_NAME, "order_1", str(orderId)) 
        result = client.get_state(DAPR_STORE_NAME, "order_1")
        logging.info('获取后的结果: ' + result.data.decode('utf-8'))

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
import (
	"context"
	"log"
	"math/rand"
	"strconv"
	"time"

	dapr "github.com/dapr/go-sdk/client"
)

// 代码
func main() {
	const STATE_STORE_NAME = "statestore"
	rand.Seed(time.Now().UnixMicro())
	for i := 0; i < 10; i++ {
		orderId := rand.Intn(1000-1) + 1
		client, err := dapr.NewClient()
		if err != nil {
			panic(err)
		}
		defer client.Close()
		ctx := context.Background()
		err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
		if err != nil {
			panic(err)
		}
		result, err := client.GetState(ctx, STATE_STORE_NAME, "order_1", nil)
		if err != nil {
			panic(err)
		}
		log.Println("获取后的结果:", string(result.Value))
		time.Sleep(2 * time.Second)
	}
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr'; 

// 代码
const daprHost = "127.0.0.1"; 
var main = function() {
    for(var i=0;i<10;i++) {
        sleep(5000);
        var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
        start(orderId).catch((e) => {
            console.error(e);
            process.exit(1);
        });
    }
}

async function start(orderId) {
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
    });
    const STATE_STORE_NAME = "statestore";
    // 使用Dapr SDK保存和获取状态
    await client.state.save(STATE_STORE_NAME, [
        {
            key: "order_1",
            value: orderId.toString()
        },
        {
            key: "order_2",
            value: orderId.toString()
        }
    ]);
    var result = await client.state.get(STATE_STORE_NAME, "order_1");
    console.log("获取后的结果: " + result);
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

main();

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

启动一个Dapr sidecar:

dapr run --app-id orderprocessing --dapr-http-port 3601

在一个单独的终端中,将一个键/值对保存到您的状态存储中:

curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250"}]' http://localhost:3601/v1.0/state/statestore

现在获取您刚刚保存的状态:

curl http://localhost:3601/v1.0/state/statestore/order_1

重新启动您的sidecar并尝试再次检索状态,以观察状态与应用程序分开持久化。

启动一个Dapr sidecar:

dapr --app-id orderprocessing --dapr-http-port 3601 run

在一个单独的终端中,将一个键/值对保存到您的状态存储中:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "order_1", "value": "250"}]' -Uri 'http://localhost:3601/v1.0/state/statestore'

现在获取您刚刚保存的状态:

Invoke-RestMethod -Uri 'http://localhost:3601/v1.0/state/statestore/order_1'

重新启动您的sidecar并尝试再次检索状态,以观察状态与应用程序分开持久化。

删除状态

以下是利用Dapr SDK删除状态的代码示例。

// 依赖项
using Dapr.Client;

// 代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string DAPR_STORE_NAME = "statestore";
            // 使用Dapr SDK删除状态
            using var client = new DaprClientBuilder().Build();
            await client.DeleteStateAsync(DAPR_STORE_NAME, "order_1", cancellationToken: cancellationToken);
        }
    }
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import org.springframework.boot.autoconfigure.SpringBootApplication;

// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {
	public static void main(String[] args) throws InterruptedException{
        String STATE_STORE_NAME = "statestore";

        // 使用Dapr SDK删除状态
        DaprClient client = new DaprClientBuilder().build();
        String storedEtag = client.getState(STATE_STORE_NAME, "order_1", String.class).block().getEtag();
        client.deleteState(STATE_STORE_NAME, "order_1", storedEtag, null).block();
	}
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType

# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"

# 使用Dapr SDK删除状态
with DaprClient() as client:
    client.delete_state(store_name=DAPR_STORE_NAME, key="order_1")

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
import (
	"context"
	dapr "github.com/dapr/go-sdk/client"

)

// 代码
func main() {
    STATE_STORE_NAME := "statestore"
    // 使用Dapr SDK删除状态
    client, err := dapr.NewClient()
    if err != nil {
        panic(err)
    }
    defer client.Close()
    ctx := context.Background()

    if err := client.DeleteState(ctx, STATE_STORE_NAME, "order_1"); err != nil {
        panic(err)
    }
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr'; 

// 代码
const daprHost = "127.0.0.1"; 
var main = function() {
    const STATE_STORE_NAME = "statestore";
    // 使用Dapr SDK保存和获取状态
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
    });
    
    await client.state.delete(STATE_STORE_NAME, "order_1"); 
}

main();

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

使用上面相同的Dapr实例运行:

curl -X DELETE 'http://localhost:3601/v1.0/state/statestore/order_1'

尝试再次获取状态。注意没有返回值。

使用上面相同的Dapr实例运行:

Invoke-RestMethod -Method Delete -Uri 'http://localhost:3601/v1.0/state/statestore/order_1'

尝试再次获取状态。注意没有返回值。

保存和检索多个状态

以下是利用Dapr SDK保存和检索多个状态的代码示例。

// 依赖项
using Dapr.Client;
// 代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string DAPR_STORE_NAME = "statestore";
            // 使用Dapr SDK检索多个状态
            using var client = new DaprClientBuilder().Build();
            IReadOnlyList<BulkStateItem> multipleStateResult = await client.GetBulkStateAsync(DAPR_STORE_NAME, new List<string> { "order_1", "order_2" }, parallelism: 1);
        }
    }
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run

上述示例返回一个BulkStateItem,其中包含您保存到状态的值的序列化格式。如果您希望SDK在每个批量响应项中反序列化值,您可以使用以下代码:

// 依赖项
using Dapr.Client;
// 代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string DAPR_STORE_NAME = "statestore";
            // 使用Dapr SDK检索多个状态
            using var client = new DaprClientBuilder().Build();
            IReadOnlyList<BulkStateItem<Widget>> mulitpleStateResult = await client.GetBulkStateAsync<Widget>(DAPR_STORE_NAME, new List<string> { "widget_1", "widget_2" }, parallelism: 1);
        }
    }

    class Widget
    {
        string Size { get; set; }
        string Color { get; set; }        
    }
}
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import java.util.Arrays;

// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {

	private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

	public static void main(String[] args) throws InterruptedException{
        String STATE_STORE_NAME = "statestore";
        // 使用Dapr SDK检索多个状态
        DaprClient client = new DaprClientBuilder().build();
        Mono<List<State<String>>> resultBulk = client.getBulkState(STATE_STORE_NAME,
        Arrays.asList("order_1", "order_2"), String.class);
	}
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem

# 代码
logging.basicConfig(level = logging.INFO)
DAPR_STORE_NAME = "statestore"
orderId = 100
# 使用Dapr SDK保存和检索多个状态
with DaprClient() as client:
    client.save_bulk_state(store_name=DAPR_STORE_NAME, states=[StateItem(key="order_2", value=str(orderId))])
    result = client.get_bulk_state(store_name=DAPR_STORE_NAME, keys=["order_1", "order_2"], states_metadata={"metakey": "metavalue"}).items
    logging.info('批量获取后的结果: ' + str(result)) 

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
import (
	"context"
	"log"
	"math/rand"
	"strconv"
	"time"

	dapr "github.com/dapr/go-sdk/client"
)

// 代码
func main() {
	const STATE_STORE_NAME = "statestore"
	rand.Seed(time.Now().UnixMicro())
	for i := 0; i < 10; i++ {
		orderId := rand.Intn(1000-1) + 1
		client, err := dapr.NewClient()
		if err != nil {
			panic(err)
		}
		defer client.Close()
		ctx := context.Background()
		err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
		if err != nil {
			panic(err)
		}
		keys := []string{"key1", "key2", "key3"}
        items, err := client.GetBulkState(ctx, STATE_STORE_NAME, keys, nil, 100)
		if err != nil {
			panic(err)
		}
		for _, item := range items {
			log.Println("从GetBulkState获取的项:", string(item.Value))
		}
	}
} 

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr'; 

// 代码
const daprHost = "127.0.0.1"; 
var main = function() {
    const STATE_STORE_NAME = "statestore";
    var orderId = 100;
    // 使用Dapr SDK保存和检索多个状态
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
    });

    await client.state.save(STATE_STORE_NAME, [
        {
            key: "order_1",
            value: orderId.toString()
        },
        {
            key: "order_2",
            value: orderId.toString()
        }
    ]);
    result = await client.state.getBulk(STATE_STORE_NAME, ["order_1", "order_2"]);
}

main();

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

使用上面相同的Dapr实例,将两个键/值对保存到您的状态存储中:

curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250"}, { "key": "order_2", "value": "550"}]' http://localhost:3601/v1.0/state/statestore

现在获取您刚刚保存的状态:

curl -X POST -H "Content-Type: application/json" -d '{"keys":["order_1", "order_2"]}' http://localhost:3601/v1.0/state/statestore/bulk

使用上面相同的Dapr实例,将两个键/值对保存到您的状态存储中:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "order_1", "value": "250"}, { "key": "order_2", "value": "550"}]' -Uri 'http://localhost:3601/v1.0/state/statestore'

现在获取您刚刚保存的状态:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["order_1", "order_2"]}' -Uri 'http://localhost:3601/v1.0/state/statestore/bulk'

执行状态事务

以下是利用Dapr SDK执行状态事务的代码示例。

// 依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
using System.Text.Json;

// 代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string DAPR_STORE_NAME = "statestore";
            while(true) {
                System.Threading.Thread.Sleep(5000);
                Random random = new Random();
                int orderId = random.Next(1,1000);
                using var client = new DaprClientBuilder().Build();
                var requests = new List<StateTransactionRequest>()
                {
                    new StateTransactionRequest("order_3", JsonSerializer.SerializeToUtf8Bytes(orderId.ToString()), StateOperationType.Upsert),
                    new StateTransactionRequest("order_2", null, StateOperationType.Delete)
                };
                CancellationTokenSource source = new CancellationTokenSource();
                CancellationToken cancellationToken = source.Token;
                // 使用Dapr SDK执行状态事务
                await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, requests, cancellationToken: cancellationToken);
                Console.WriteLine("订单请求: " + orderId);
                Console.WriteLine("结果: " + result);
            }
        }
    }
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.TransactionalStateOperation;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

// 代码
@SpringBootApplication
public class OrderProcessingServiceApplication {

	private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

	private static final String STATE_STORE_NAME = "statestore";

	public static void main(String[] args) throws InterruptedException{
		while(true) {
			TimeUnit.MILLISECONDS.sleep(5000);
			Random random = new Random();
			int orderId = random.nextInt(1000-1) + 1;
			DaprClient client = new DaprClientBuilder().build();
			List<TransactionalStateOperation<?>> operationList = new ArrayList<>();
			operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.UPSERT,
					new State<>("order_3", Integer.toString(orderId), "")));
			operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.DELETE,
					new State<>("order_2")));
            // 使用Dapr SDK执行状态事务
			client.executeStateTransaction(STATE_STORE_NAME, operationList).block();
			log.info("订单请求: " + orderId);
		}
	}

}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
# 依赖项
import random
from time import sleep    
import requests
import logging
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType

# 代码
logging.basicConfig(level = logging.INFO)    
DAPR_STORE_NAME = "statestore"
while True:
    sleep(random.randrange(50, 5000) / 1000)
    orderId = random.randint(1, 1000)
    with DaprClient() as client:
        # 使用Dapr SDK执行状态事务
        client.execute_state_transaction(store_name=DAPR_STORE_NAME, operations=[
            TransactionalStateOperation(
                operation_type=TransactionOperationType.upsert,
                key="order_3",
                data=str(orderId)),
            TransactionalStateOperation(key="order_3", data=str(orderId)),
            TransactionalStateOperation(
                operation_type=TransactionOperationType.delete,
                key="order_2",
                data=str(orderId)),
            TransactionalStateOperation(key="order_2", data=str(orderId))
        ])

    client.delete_state(store_name=DAPR_STORE_NAME, key="order_1")
    logging.basicConfig(level = logging.INFO)
    logging.info('订单请求: ' + str(orderId))
    logging.info('结果: ' + str(result))

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖项
package main

import (
	"context"
	"log"
	"math/rand"
	"strconv"
	"time"

	dapr "github.com/dapr/go-sdk/client"
)

// 代码
func main() {
	const STATE_STORE_NAME = "statestore"
	rand.Seed(time.Now().UnixMicro())
	for i := 0; i < 10; i++ {
		orderId := rand.Intn(1000-1) + 1
		client, err := dapr.NewClient()
		if err != nil {
			panic(err)
		}
		defer client.Close()
		ctx := context.Background()
		err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
		if err != nil {
			panic(err)
		}
		result, err := client.GetState(ctx, STATE_STORE_NAME, "order_1", nil)
		if err != nil {
			panic(err)
		}

        ops := make([]*dapr.StateOperation, 0)
        data1 := "data1"
        data2 := "data2"

        op1 := &dapr.StateOperation{
            Type: dapr.StateOperationTypeUpsert,
            Item: &dapr.SetStateItem{
                Key:   "key1",
                Value: []byte(data1),
            },
        }
        op2 := &dapr.StateOperation{
            Type: dapr.StateOperationTypeDelete,
            Item: &dapr.SetStateItem{
                Key:   "key2",
                Value: []byte(data2),
            },
        }
        ops = append(ops, op1, op2)
        meta := map[string]string{}
        err = client.ExecuteStateTransaction(ctx, STATE_STORE_NAME, meta, ops)

		log.Println("获取后的结果:", string(result.Value))
		time.Sleep(2 * time.Second)
	}
}

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
// 依赖项
import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr'; 

// 代码
const daprHost = "127.0.0.1"; 
var main = function() {
    for(var i=0;i<10;i++) {
        sleep(5000);
        var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
        start(orderId).catch((e) => {
            console.error(e);
            process.exit(1);
        });
    }
}

async function start(orderId) {
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
    });

    const STATE_STORE_NAME = "statestore";
    // 使用Dapr SDK保存和检索多个状态
    await client.state.transaction(STATE_STORE_NAME, [
        {
        operation: "upsert",
        request: {
            key: "order_3",
            value: orderId.toString()
        }
        },
        {
        operation: "delete",
        request: {
            key: "order_2"
        }
        }
    ]);
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

main();

要为上述示例应用程序启动一个Dapr sidecar,运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

使用上面相同的Dapr实例,执行两个状态事务:

curl -X POST -H "Content-Type: application/json" -d '{"operations": [{"operation":"upsert", "request": {"key": "order_1", "value": "250"}}, {"operation":"delete", "request": {"key": "order_2"}}]}' http://localhost:3601/v1.0/state/statestore/transaction

现在查看您的状态事务的结果:

curl -X POST -H "Content-Type: application/json" -d '{"keys":["order_1", "order_2"]}' http://localhost:3601/v1.0/state/statestore/bulk

使用上面相同的Dapr实例,将两个键/值对保存到您的状态存储中:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"operations": [{"operation":"upsert", "request": {"key": "order_1", "value": "250"}}, {"operation":"delete", "request": {"key": "order_2"}}]}' -Uri 'http://localhost:3601/v1.0/state/statestore/transaction'

现在查看您的状态事务的结果:

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["order_1", "order_2"]}' -Uri 'http://localhost:3601/v1.0/state/statestore/bulk'

下一步

3 - 操作指南:查询状态

使用查询API查询状态存储

通过状态查询API,您可以从状态存储组件中检索、过滤和排序键/值数据。查询API并不是完整查询语言的替代品。

尽管状态存储是键/值存储,value可能是一个包含自身层次结构、键和值的JSON文档。查询API允许您使用这些键/值来检索相应的文档。

查询状态

您可以通过HTTP POST/PUT或gRPC提交查询请求。请求的主体是一个包含以下三个部分的JSON对象:

  • filter
  • sort
  • page

filter

filter用于指定查询条件,结构类似于树形,每个节点表示一个操作,可能是单一或多个操作数。

支持以下操作:

操作符操作数描述
EQkey:valuekey 等于 value
NEQkey:valuekey 不等于 value
GTkey:valuekey 大于 value
GTEkey:valuekey 大于等于 value
LTkey:valuekey 小于 value
LTEkey:valuekey 小于等于 value
INkey:[]valuekey 等于 value[0] 或 value[1] 或 … 或 value[n]
AND[]operationoperation[0] 且 operation[1] 且 … 且 operation[n]
OR[]operationoperation[0] 或 operation[1] 或 … 或 operation[n]

操作数中的key类似于JSONPath表示法。键中的每个点表示嵌套的JSON结构。例如,考虑以下结构:

{
  "shape": {
    "name": "rectangle",
    "dimensions": {
      "height": 24,
      "width": 10
    },
    "color": {
      "name": "red",
      "code": "#FF0000"
    }
  }
}

要比较颜色代码的值,键将是shape.color.code

如果省略filter部分,查询将返回所有条目。

sort

sort是一个有序的key:order对数组,其中:

  • key是状态存储中的一个键
  • order是一个可选字符串,指示排序顺序:
    • "ASC"表示升序
    • "DESC"表示降序
      如果省略order,默认是升序。

page

page包含limittoken参数。

  • limit设置每页返回的记录数。
  • token是组件返回的分页令牌,用于获取后续查询的结果。

在后台,此查询请求被转换为本地查询语言并由状态存储组件执行。

示例数据和查询

让我们来看一些从简单到复杂的真实示例。

作为数据集,考虑一个包含员工ID、组织、州和城市的员工记录集合。注意,这个数据集是一个键/值对数组,其中:

  • key是唯一ID
  • value是包含员工记录的JSON对象。

为了更好地说明功能,组织名称(org)和员工ID(id)是一个嵌套的JSON person对象。

首先创建一个MongoDB实例,作为您的状态存储。

docker run -d --rm -p 27017:27017 --name mongodb mongo:5

接下来,启动一个Dapr应用程序。参考组件配置文件,该文件指示Dapr使用MongoDB作为其状态存储。

dapr run --app-id demo --dapr-http-port 3500 --resources-path query-api-examples/components/mongodb

用员工数据集填充状态存储,以便您可以稍后查询它。

curl -X POST -H "Content-Type: application/json" -d @query-api-examples/dataset.json http://localhost:3500/v1.0/state/statestore

填充后,您可以检查状态存储中的数据。下图中,MongoDB UI的一部分显示了员工记录。

示例数据集

每个条目都有一个_id成员作为连接的对象键,以及一个包含JSON记录的value成员。

查询API允许您从这个JSON结构中选择记录。

现在您可以运行示例查询。

示例1

首先,查找加利福尼亚州的所有员工,并按其员工ID降序排序。

这是查询

{
    "filter": {
        "EQ": { "state": "CA" }
    },
    "sort": [
        {
            "key": "person.id",
            "order": "DESC"
        }
    ]
}

此查询在SQL中的等价形式是:

SELECT * FROM c WHERE
  state = "CA"
ORDER BY
  person.id DESC

使用以下命令执行查询:

curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query1.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query1.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'

查询结果是一个按请求顺序排列的匹配键/值对数组:

{
  "results": [
    {
      "key": "3",
      "data": {
        "person": {
          "org": "Finance",
          "id": 1071
        },
        "city": "Sacramento",
        "state": "CA"
      },
      "etag": "44723d41-deb1-4c23-940e-3e6896c3b6f7"
    },
    {
      "key": "7",
      "data": {
        "city": "San Francisco",
        "state": "CA",
        "person": {
          "id": 1015,
          "org": "Dev Ops"
        }
      },
      "etag": "0e69e69f-3dbc-423a-9db8-26767fcd2220"
    },
    {
      "key": "5",
      "data": {
        "state": "CA",
        "person": {
          "org": "Hardware",
          "id": 1007
        },
        "city": "Los Angeles"
      },
      "etag": "f87478fa-e5c5-4be0-afa5-f9f9d75713d8"
    },
    {
      "key": "9",
      "data": {
        "person": {
          "org": "Finance",
          "id": 1002
        },
        "city": "San Diego",
        "state": "CA"
      },
      "etag": "f5cf05cd-fb43-4154-a2ec-445c66d5f2f8"
    }
  ]
}

示例2

现在,查找来自"Dev Ops"和"Hardware"组织的所有员工。

这是查询

{
    "filter": {
        "IN": { "person.org": [ "Dev Ops", "Hardware" ] }
    }
}

此查询在SQL中的等价形式是:

SELECT * FROM c WHERE
  person.org IN ("Dev Ops", "Hardware")

使用以下命令执行查询:

curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query2.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query2.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'

与前一个示例类似,结果是一个匹配键/值对的数组。

示例3

在此示例中,查找:

  • 来自"Dev Ops"部门的所有员工。
  • 来自"Finance"部门并居住在华盛顿州和加利福尼亚州的员工。

此外,首先按州按字母降序排序,然后按员工ID升序排序。让我们一次处理最多3条记录。

这是查询

{
    "filter": {
        "OR": [
            {
                "EQ": { "person.org": "Dev Ops" }
            },
            {
                "AND": [
                    {
                        "EQ": { "person.org": "Finance" }
                    },
                    {
                        "IN": { "state": [ "CA", "WA" ] }
                    }
                ]
            }
        ]
    },
    "sort": [
        {
            "key": "state",
            "order": "DESC"
        },
        {
            "key": "person.id"
        }
    ],
    "page": {
        "limit": 3
    }
}

此查询在SQL中的等价形式是:

SELECT * FROM c WHERE
  person.org = "Dev Ops" OR
  (person.org = "Finance" AND state IN ("CA", "WA"))
ORDER BY
  state DESC,
  person.id ASC
LIMIT 3

使用以下命令执行查询:

curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query3.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query3.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'

成功执行后,状态存储返回一个包含匹配记录列表和分页令牌的JSON对象:

{
  "results": [
    {
      "key": "1",
      "data": {
        "person": {
          "org": "Dev Ops",
          "id": 1036
        },
        "city": "Seattle",
        "state": "WA"
      },
      "etag": "6f54ad94-dfb9-46f0-a371-e42d550adb7d"
    },
    {
      "key": "4",
      "data": {
        "person": {
          "org": "Dev Ops",
          "id": 1042
        },
        "city": "Spokane",
        "state": "WA"
      },
      "etag": "7415707b-82ce-44d0-bf15-6dc6305af3b1"
    },
    {
      "key": "10",
      "data": {
        "person": {
          "org": "Dev Ops",
          "id": 1054
        },
        "city": "New York",
        "state": "NY"
      },
      "etag": "26bbba88-9461-48d1-8a35-db07c374e5aa"
    }
  ],
  "token": "3"
}

分页令牌在后续查询中“按原样”使用,以获取下一批记录:

{
    "filter": {
        "OR": [
            {
                "EQ": { "person.org": "Dev Ops" }
            },
            {
                "AND": [
                    {
                        "EQ": { "person.org": "Finance" }
                    },
                    {
                        "IN": { "state": [ "CA", "WA" ] }
                    }
                ]
            }
        ]
    },
    "sort": [
        {
            "key": "state",
            "order": "DESC"
        },
        {
            "key": "person.id"
        }
    ],
    "page": {
        "limit": 3,
        "token": "3"
    }
}
curl -s -X POST -H "Content-Type: application/json" -d @query-api-examples/query3-token.json http://localhost:3500/v1.0-alpha1/state/statestore/query | jq .
Invoke-RestMethod -Method Post -ContentType 'application/json' -InFile query-api-examples/query3-token.json -Uri 'http://localhost:3500/v1.0-alpha1/state/statestore/query'

此查询的结果是:

{
  "results": [
    {
      "key": "9",
      "data": {
        "person": {
          "org": "Finance",
          "id": 1002
        },
        "city": "San Diego",
        "state": "CA"
      },
      "etag": "f5cf05cd-fb43-4154-a2ec-445c66d5f2f8"
    },
    {
      "key": "7",
      "data": {
        "city": "San Francisco",
        "state": "CA",
        "person": {
          "id": 1015,
          "org": "Dev Ops"
        }
      },
      "etag": "0e69e69f-3dbc-423a-9db8-26767fcd2220"
    },
    {
      "key": "3",
      "data": {
        "person": {
          "org": "Finance",
          "id": 1071
        },
        "city": "Sacramento",
        "state": "CA"
      },
      "etag": "44723d41-deb1-4c23-940e-3e6896c3b6f7"
    }
  ],
  "token": "6"
}

这样,您可以在查询中更新分页令牌,并迭代结果,直到不再返回记录。

限制

状态查询API有以下限制:

  • 要查询存储在状态存储中的actor状态,您需要使用特定数据库的查询API。请参阅查询actor状态
  • 该API不适用于Dapr加密状态存储功能。由于加密是由Dapr运行时完成并存储为加密数据,因此这实际上阻止了服务器端查询。

您可以在相关链接部分找到更多信息。

相关链接

4 - 操作指南:构建有状态服务

通过状态管理构建可扩展、可复制的服务

在本文中,您将学习如何创建一个可以水平扩展的有状态服务,选择性使用并发和一致性模型。状态管理API可以帮助开发者简化状态协调、冲突解决和故障处理的复杂性。

设置状态存储

状态存储组件是Dapr用来与数据库通信的资源。在本指南中,我们将使用默认的Redis状态存储。

使用Dapr CLI

当您在本地模式下运行dapr init时,Dapr会创建一个默认的Redis statestore.yaml并在您的本地机器上运行一个Redis状态存储,位置如下:

  • 在Windows上,位于%UserProfile%\.dapr\components\statestore.yaml
  • 在Linux/MacOS上,位于~/.dapr/components/statestore.yaml

通过statestore.yaml组件,您可以轻松替换底层组件而无需更改应用程序代码。

查看支持的状态存储列表

Kubernetes

查看如何在Kubernetes上设置不同的状态存储

强一致性和最终一致性

在强一致性模式下,Dapr确保底层状态存储:

  • 在数据写入所有副本后才返回响应。
  • 在写入或删除状态之前从法定人数接收确认。

对于读取请求,Dapr确保在副本之间一致地返回最新的数据。默认情况下是最终一致性,除非在请求状态API时另有指定。

以下示例展示了如何使用强一致性保存、获取和删除状态。示例用Python编写,但适用于任何编程语言。

保存状态

import requests
import json

store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
stateReq = '[{ "key": "k1", "value": "Some Data", "options": { "consistency": "strong" }}]'
response = requests.post(dapr_state_url, json=stateReq)

获取状态

import requests
import json

store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.get(dapr_state_url + "/key1", headers={"consistency":"strong"})
print(response.headers['ETag'])

删除状态

import requests
import json

store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.delete(dapr_state_url + "/key1", headers={"consistency":"strong"})

如果没有指定concurrency选项,默认是后写胜出并发模式。

先写胜出和后写胜出

Dapr允许开发者在使用数据存储时选择两种常见的并发模式:

  • 先写胜出:在应用程序的多个实例同时写入同一个键的情况下很有用。
  • 后写胜出:Dapr的默认模式。

Dapr使用版本号来确定特定键是否已更新。您可以:

  1. 在读取键的数据时保留版本号。
  2. 在更新(如写入和删除)时使用版本号。

如果自从检索版本号以来版本信息已更改,将抛出错误,要求您执行另一次读取以获取最新的版本信息和状态。

Dapr利用ETags来确定状态的版本号。ETags从状态请求中以ETag头返回。使用ETags,您的应用程序知道自上次检查以来资源已更新,因为在ETag不匹配时会出错。

以下示例展示了如何:

  • 获取ETag。
  • 使用ETag保存状态。
  • 删除状态。

以下示例用Python编写,但适用于任何编程语言。

import requests
import json

store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
response = requests.get(dapr_state_url + "/key1", headers={"concurrency":"first-write"})
etag = response.headers['ETag']
newState = '[{ "key": "k1", "value": "New Data", "etag": {}, "options": { "concurrency": "first-write" }}]'.format(etag)

requests.post(dapr_state_url, json=newState)
response = requests.delete(dapr_state_url + "/key1", headers={"If-Match": "{}".format(etag)})

处理版本不匹配失败

在以下示例中,您将看到如何在版本已更改时重试保存状态操作:

import requests
import json

# 此方法保存状态,如果保存状态失败则返回false
def save_state(data):
    try:
        store_name = "redis-store" # 在状态存储组件yaml文件中指定的状态存储名称
        dapr_state_url = "http://localhost:3500/v1.0/state/{}".format(store_name)
        response = requests.post(dapr_state_url, json=data)
        if response.status_code == 200:
            return True
    except:
        return False
    return False

# 此方法获取状态并返回响应,ETag在头中 -->
def get_state(key):
    response = requests.get("http://localhost:3500/v1.0/state/<state_store_name>/{}".format(key), headers={"concurrency":"first-write"})
    return response

# 当保存状态成功时退出。如果存在ETag不匹配,success将为False -->
success = False
while success != True:
    response = get_state("key1")
    etag = response.headers['ETag']
    newState = '[{ "key": "key1", "value": "New Data", "etag": {}, "options": { "concurrency": "first-write" }}]'.format(etag)

    success = save_state(newState)

5 - 操作指南:启用事务性 Outbox 模式

在状态存储和发布/订阅消息代理之间提交单个事务

事务性 Outbox 模式是一种广为人知的设计模式,用于发送应用程序状态变化的通知。它通过一个跨越数据库和消息代理的单一事务来传递通知。

开发人员在尝试自行实现此模式时会遇到许多技术难题,通常需要编写复杂且容易出错的中央协调管理器,这些管理器最多支持一种或两种数据库和消息代理的组合。

例如,您可以使用 Outbox 模式来:

  1. 向账户数据库写入新的用户记录。
  2. 发送账户成功创建的通知消息。

通过 Dapr 的 Outbox 支持,您可以在调用 Dapr 的事务 API时通知订阅者应用程序的状态何时被创建或更新。

下图概述了 Outbox 功能的工作原理:

  1. 服务 A 使用事务将状态保存/更新到状态存储。
  2. 在同一事务下将消息写入消息代理。当消息成功传递到消息代理时,事务完成,确保状态和消息一起被事务化。
  3. 消息代理将消息主题传递给任何订阅者 - 在此情况下为服务 B。
显示 Outbox 模式步骤的图示

要求

Outbox 功能可以与 Dapr 支持的任何事务性状态存储一起使用。所有发布/订阅代理都支持 Outbox 功能。

了解更多关于您可以使用的事务方法。

启用 Outbox 模式

要启用 Outbox 功能,请在状态存储组件上添加以下必需和可选字段:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mysql-outbox
spec:
  type: state.mysql
  version: v1
  metadata:
  - name: connectionString
    value: "<CONNECTION STRING>"
  - name: outboxPublishPubsub # 必需
    value: "mypubsub"
  - name: outboxPublishTopic # 必需
    value: "newOrder"
  - name: outboxPubsub # 可选
    value: "myOutboxPubsub"
  - name: outboxDiscardWhenMissingState # 可选,默认为 false
    value: false

元数据字段

名称必需默认值描述
outboxPublishPubsubN/A设置发布状态更改时传递通知的发布/订阅组件的名称
outboxPublishTopicN/A设置接收在配置了 outboxPublishPubsub 的发布/订阅上的状态更改的主题。消息体将是 insertupdate 操作的状态事务项
outboxPubsuboutboxPublishPubsub设置 Dapr 用于协调状态和发布/订阅事务的发布/订阅组件。如果未设置,则使用配置了 outboxPublishPubsub 的发布/订阅组件。如果您希望将用于发送通知状态更改的发布/订阅组件与用于协调事务的组件分开,这将很有用
outboxDiscardWhenMissingStatefalse通过将 outboxDiscardWhenMissingState 设置为 true,如果 Dapr 无法在数据库中找到状态且不重试,则 Dapr 将丢弃事务。如果在 Dapr 能够传递消息之前,状态存储数据因任何原因被删除,并且您希望 Dapr 从发布/订阅中删除项目并停止重试获取状态,此设置可能会很有用

其他配置

在同一状态存储上组合 Outbox 和非 Outbox 消息

如果您希望使用相同的状态存储来发送 Outbox 和非 Outbox 消息,只需定义两个连接到相同状态存储的状态存储组件,其中一个具有 Outbox 功能,另一个没有。

没有 Outbox 的 MySQL 状态存储

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mysql
spec:
  type: state.mysql
  version: v1
  metadata:
  - name: connectionString
    value: "<CONNECTION STRING>"

具有 Outbox 的 MySQL 状态存储

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mysql-outbox
spec:
  type: state.mysql
  version: v1
  metadata:
  - name: connectionString
    value: "<CONNECTION STRING>"
  - name: outboxPublishPubsub # 必需
    value: "mypubsub"
  - name: outboxPublishTopic # 必需
    value: "newOrder"

形状 Outbox 模式消息

您可以通过设置另一个不保存到数据库并明确提及为投影的事务来覆盖发布到发布/订阅代理的 Outbox 模式消息。此事务添加了一个名为 outbox.projection 的元数据键,值设置为 true。当添加到事务中保存的状态数组时,此负载在写入状态时被忽略,数据用作发送到上游订阅者的负载。

要正确使用,key 值必须在状态存储上的操作和消息投影之间匹配。如果键不匹配,则整个事务失败。

如果您为同一键启用了两个或多个 outbox.projection 状态项,则使用第一个定义的项,其他项将被忽略。

了解更多关于默认和自定义 CloudEvent 消息。

在以下 Python SDK 的状态事务示例中,值 "2" 被保存到数据库,但值 "3" 被发布到最终用户主题。

DAPR_STORE_NAME = "statestore"

async def main():
    client = DaprClient()

    # 定义第一个状态操作以保存值 "2"
    op1 = StateItem(
        key="key1",
        value=b"2"
    )

    # 定义第二个状态操作以带有元数据发布值 "3"
    op2 = StateItem(
        key="key1",
        value=b"3",
        options=StateOptions(
            metadata={
                "outbox.projection": "true"
            }
        )
    )

    # 创建状态操作列表
    ops = [op1, op2]

    # 执行状态事务
    await client.state.transaction(DAPR_STORE_NAME, operations=ops)
    print("状态事务已执行。")

通过将元数据项 "outbox.projection" 设置为 "true" 并确保 key 值匹配(key1):

  • 第一个操作被写入状态存储,消息未写入消息代理。
  • 第二个操作值被发布到配置的发布/订阅主题。

在以下 JavaScript SDK 的状态事务示例中,值 "2" 被保存到数据库,但值 "3" 被发布到最终用户主题。

const { DaprClient, StateOperationType } = require('@dapr/dapr');

const DAPR_STORE_NAME = "statestore";

async function main() {
  const client = new DaprClient();

  // 定义第一个状态操作以保存值 "2"
  const op1 = {
    operation: StateOperationType.UPSERT,
    request: {
      key: "key1",
      value: "2"
    }
  };

  // 定义第二个状态操作以带有元数据发布值 "3"
  const op2 = {
    operation: StateOperationType.UPSERT,
    request: {
      key: "key1",
      value: "3",
      metadata: {
        "outbox.projection": "true"
      }
    }
  };

  // 创建状态操作列表
  const ops = [op1, op2];

  // 执行状态事务
  await client.state.transaction(DAPR_STORE_NAME, ops);
  console.log("状态事务已执行。");
}

main().catch(err => {
  console.error(err);
});

通过将元数据项 "outbox.projection" 设置为 "true" 并确保 key 值匹配(key1):

  • 第一个操作被写入状态存储,消息未写入消息代理。
  • 第二个操作值被发布到配置的发布/订阅主题。

在以下 .NET SDK 的状态事务示例中,值 "2" 被保存到数据库,但值 "3" 被发布到最终用户主题。

public class Program
{
    private const string DAPR_STORE_NAME = "statestore";

    public static async Task Main(string[] args)
    {
        var client = new DaprClientBuilder().Build();

        // 定义第一个状态操作以保存值 "2"
        var op1 = new StateTransactionRequest(
            key: "key1",
            value: Encoding.UTF8.GetBytes("2"),
            operationType: StateOperationType.Upsert
        );

        // 定义第二个状态操作以带有元数据发布值 "3"
        var metadata = new Dictionary<string, string>
        {
            { "outbox.projection", "true" }
        };
        var op2 = new StateTransactionRequest(
            key: "key1",
            value: Encoding.UTF8.GetBytes("3"),
            operationType: StateOperationType.Upsert,
            metadata: metadata
        );

        // 创建状态操作列表
        var ops = new List<StateTransactionRequest> { op1, op2 };

        // 执行状态事务
        await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, ops);
        Console.WriteLine("状态事务已执行。");
    }
}

通过将元数据项 "outbox.projection" 设置为 "true" 并确保 key 值匹配(key1):

  • 第一个操作被写入状态存储,消息未写入消息代理。
  • 第二个操作值被发布到配置的发布/订阅主题。

在以下 Java SDK 的状态事务示例中,值 "2" 被保存到数据库,但值 "3" 被发布到最终用户主题。

public class Main {
    private static final String DAPR_STORE_NAME = "statestore";

    public static void main(String[] args) {
        try (DaprClient client = new DaprClientBuilder().build()) {
            // 定义第一个状态操作以保存值 "2"
            StateOperation<String> op1 = new StateOperation<>(
                    StateOperationType.UPSERT,
                    "key1",
                    "2"
            );

            // 定义第二个状态操作以带有元数据发布值 "3"
            Map<String, String> metadata = new HashMap<>();
            metadata.put("outbox.projection", "true");

            StateOperation<String> op2 = new StateOperation<>(
                    StateOperationType.UPSERT,
                    "key1",
                    "3",
                    metadata
            );

            // 创建状态操作列表
            List<StateOperation<?>> ops = new ArrayList<>();
            ops.add(op1);
            ops.add(op2);

            // 执行状态事务
            client.executeStateTransaction(DAPR_STORE_NAME, ops).block();
            System.out.println("状态事务已执行。");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过将元数据项 "outbox.projection" 设置为 "true" 并确保 key 值匹配(key1):

  • 第一个操作被写入状态存储,消息未写入消息代理。
  • 第二个操作值被发布到配置的发布/订阅主题。

在以下 Go SDK 的状态事务示例中,值 "2" 被保存到数据库,但值 "3" 被发布到最终用户主题。

ops := make([]*dapr.StateOperation, 0)

op1 := &dapr.StateOperation{
    Type: dapr.StateOperationTypeUpsert,
    Item: &dapr.SetStateItem{
        Key:   "key1",
        Value: []byte("2"),
    },
}
op2 := &dapr.StateOperation{
    Type: dapr.StateOperationTypeUpsert,
    Item: &dapr.SetStateItem{
        Key:   "key1",
				Value: []byte("3"),
         // 覆盖保存到数据库的数据负载 
				Metadata: map[string]string{
					"outbox.projection": "true",
        },
    },
}
ops = append(ops, op1, op2)
meta := map[string]string{}
err := testClient.ExecuteStateTransaction(ctx, store, meta, ops)

通过将元数据项 "outbox.projection" 设置为 "true" 并确保 key 值匹配(key1):

  • 第一个操作被写入状态存储,消息未写入消息代理。
  • 第二个操作值被发布到配置的发布/订阅主题。

您可以使用以下 HTTP 请求传递消息覆盖:

curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
  -H "Content-Type: application/json" \
  -d '{
  "operations": [
    {
      "operation": "upsert",
      "request": {
        "key": "order1",
        "value": {
            "orderId": "7hf8374s",
            "type": "book",
            "name": "The name of the wind"
        }
      }
    },
    {
      "operation": "upsert",
      "request": {
        "key": "order1",
        "value": {
            "orderId": "7hf8374s"
        },
        "metadata": {
           "outbox.projection": "true"
        },
        "contentType": "application/json"
      }
    }
  ]
}'

通过将元数据项 "outbox.projection" 设置为 "true" 并确保 key 值匹配(key1):

  • 第一个操作被写入状态存储,消息未写入消息代理。
  • 第二个操作值被发布到配置的发布/订阅主题。

覆盖 Dapr 生成的 CloudEvent 字段

您可以使用自定义 CloudEvent 元数据覆盖发布的 Outbox 事件上的Dapr 生成的 CloudEvent 字段

async def execute_state_transaction():
    async with DaprClient() as client:
        # 定义状态操作
        ops = []

        op1 = {
            'operation': 'upsert',
            'request': {
                'key': 'key1',
                'value': b'2',  # 将字符串转换为字节数组
                'metadata': {
                    'cloudevent.id': 'unique-business-process-id',
                    'cloudevent.source': 'CustomersApp',
                    'cloudevent.type': 'CustomerCreated',
                    'cloudevent.subject': '123',
                    'my-custom-ce-field': 'abc'
                }
            }
        }

        ops.append(op1)

        # 执行状态事务
        store_name = 'your-state-store-name'
        try:
            await client.execute_state_transaction(store_name, ops)
            print('状态事务已执行。')
        except Exception as e:
            print('执行状态事务时出错:', e)

# 运行异步函数
if __name__ == "__main__":
    asyncio.run(execute_state_transaction())
const { DaprClient } = require('dapr-client');

async function executeStateTransaction() {
    // 初始化 Dapr 客户端
    const daprClient = new DaprClient();

    // 定义状态操作
    const ops = [];

    const op1 = {
        operationType: 'upsert',
        request: {
            key: 'key1',
            value: Buffer.from('2'),
            metadata: {
                'id': 'unique-business-process-id',
                'source': 'CustomersApp',
                'type': 'CustomerCreated',
                'subject': '123',
                'my-custom-ce-field': 'abc'
            }
        }
    };

    ops.push(op1);

    // 执行状态事务
    const storeName = 'your-state-store-name';
    const metadata = {};
}

executeStateTransaction();
public class StateOperationExample
{
    public async Task ExecuteStateTransactionAsync()
    {
        var daprClient = new DaprClientBuilder().Build();

        // 将值 "2" 定义为字符串并序列化为字节数组
        var value = "2";
        var valueBytes = JsonSerializer.SerializeToUtf8Bytes(value);

        // 定义第一个状态操作以保存值 "2" 并带有元数据
       // 覆盖 Cloudevent 元数据
        var metadata = new Dictionary<string, string>
        {
            { "cloudevent.id", "unique-business-process-id" },
            { "cloudevent.source", "CustomersApp" },
            { "cloudevent.type", "CustomerCreated" },
            { "cloudevent.subject", "123" },
            { "my-custom-ce-field", "abc" }
        };

        var op1 = new StateTransactionRequest(
            key: "key1",
            value: valueBytes,
            operationType: StateOperationType.Upsert,
            metadata: metadata
        );

        // 创建状态操作列表
        var ops = new List<StateTransactionRequest> { op1 };

        // 执行状态事务
        var storeName = "your-state-store-name";
        await daprClient.ExecuteStateTransactionAsync(storeName, ops);
        Console.WriteLine("状态事务已执行。");
    }

    public static async Task Main(string[] args)
    {
        var example = new StateOperationExample();
        await example.ExecuteStateTransactionAsync();
    }
}
public class StateOperationExample {

    public static void main(String[] args) {
        executeStateTransaction();
    }

    public static void executeStateTransaction() {
        // 构建 Dapr 客户端
        try (DaprClient daprClient = new DaprClientBuilder().build()) {

            // 定义值 "2"
            String value = "2";

            // 覆盖 CloudEvent 元数据
            Map<String, String> metadata = new HashMap<>();
            metadata.put("cloudevent.id", "unique-business-process-id");
            metadata.put("cloudevent.source", "CustomersApp");
            metadata.put("cloudevent.type", "CustomerCreated");
            metadata.put("cloudevent.subject", "123");
            metadata.put("my-custom-ce-field", "abc");

            // 定义状态操作
            List<StateOperation<?>> ops = new ArrayList<>();
            StateOperation<String> op1 = new StateOperation<>(
                    StateOperationType.UPSERT,
                    "key1",
                    value,
                    metadata
            );
            ops.add(op1);

            // 执行状态事务
            String storeName = "your-state-store-name";
            daprClient.executeStateTransaction(storeName, ops).block();
            System.out.println("状态事务已执行。");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
func main() {
	// 创建 Dapr 客户端
	client, err := dapr.NewClient()
	if err != nil {
		log.Fatalf("创建 Dapr 客户端失败: %v", err)
	}
	defer client.Close()

	ctx := context.Background()
	store := "your-state-store-name"

	// 定义状态操作
	ops := make([]*dapr.StateOperation, 0)
	op1 := &dapr.StateOperation{
		Type: dapr.StateOperationTypeUpsert,
		Item: &dapr.SetStateItem{
			Key:   "key1",
			Value: []byte("2"),
			// 覆盖 Cloudevent 元数据
			Metadata: map[string]string{
				"cloudevent.id":                "unique-business-process-id",
				"cloudevent.source":            "CustomersApp",
				"cloudevent.type":              "CustomerCreated",
				"cloudevent.subject":           "123",
				"my-custom-ce-field":           "abc",
			},
		},
	}
	ops = append(ops, op1)

	// 事务的元数据(如果有)
	meta := map[string]string{}

	// 执行状态事务
	err = client.ExecuteStateTransaction(ctx, store, meta, ops)
	if err != nil {
		log.Fatalf("执行状态事务失败: %v", err)
	}

	log.Println("状态事务已执行。")
}
curl -X POST http://localhost:3500/v1.0/state/starwars/transaction \
  -H "Content-Type: application/json" \
  -d '{
        "operations": [
          {
            "operation": "upsert",
            "request": {
              "key": "key1",
              "value": "2"
            }
          },
        ],
        "metadata": {
          "id": "unique-business-process-id",
          "source": "CustomersApp",
          "type": "CustomerCreated",
          "subject": "123",
          "my-custom-ce-field": "abc",
        }
      }'

演示

观看此视频以了解 Outbox 模式的概述

6 - 操作指南:在应用程序之间共享状态

了解在不同应用程序之间共享状态的策略

Dapr 提供了多种在应用程序之间共享状态的方法。

不同的架构在共享状态时可能有不同的需求。在某些情况下,您可能会希望:

  • 在特定应用程序中封装所有状态
  • 让 Dapr 为您管理状态访问

在其他情况下,您可能需要两个应用程序在同一状态上进行操作,以便获取和保存相同的键。

为了实现状态共享,Dapr 支持以下键前缀策略:

键前缀描述
appid默认策略,允许您仅通过指定 appid 的应用程序管理状态。所有状态键将以 appid 为前缀,并限定于该应用程序。
name使用状态存储组件的名称作为前缀。多个应用程序可以共享同一状态存储中的相同状态。
namespace如果设置了命名空间,此策略会将 appid 键前缀替换为配置的命名空间,生成一个限定于该命名空间的键。这允许在不同命名空间中具有相同 appid 的应用程序重用相同的状态存储。如果未配置命名空间,则会回退到 appid 策略。有关 Dapr 中命名空间的更多信息,请参见 操作指南:将组件限定到一个或多个应用程序
none不使用任何前缀。多个应用程序可以在不同的状态存储中共享状态,而不受特定前缀的限制。

指定状态前缀策略

要指定前缀策略,请在状态组件上添加名为 keyPrefix 的元数据键:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
  namespace: production
spec:
  type: state.redis
  version: v1
  metadata:
  - name: keyPrefix
    value: <key-prefix-strategy>

示例

以下示例演示了使用每种支持的前缀策略进行状态检索的情况。

appid(默认)

在下面的示例中,具有应用程序 ID myApp 的 Dapr 应用程序正在将状态保存到名为 redis 的状态存储中:

curl -X POST http://localhost:3500/v1.0/state/redis \
  -H "Content-Type: application/json"
  -d '[
        {
          "key": "darth",
          "value": "nihilus"
        }
      ]'

键将被保存为 myApp||darth

namespace

在命名空间 production 中运行的具有应用程序 ID myApp 的 Dapr 应用程序正在将状态保存到名为 redis 的状态存储中:

curl -X POST http://localhost:3500/v1.0/state/redis \
  -H "Content-Type: application/json"
  -d '[
        {
          "key": "darth",
          "value": "nihilus"
        }
      ]'

键将被保存为 production.myApp||darth

name

在下面的示例中,具有应用程序 ID myApp 的 Dapr 应用程序正在将状态保存到名为 redis 的状态存储中:

curl -X POST http://localhost:3500/v1.0/state/redis \
  -H "Content-Type: application/json"
  -d '[
        {
          "key": "darth",
          "value": "nihilus"
        }
      ]'

键将被保存为 redis||darth

none

在下面的示例中,具有应用程序 ID myApp 的 Dapr 应用程序正在将状态保存到名为 redis 的状态存储中:

curl -X POST http://localhost:3500/v1.0/state/redis \
  -H "Content-Type: application/json"
  -d '[
        {
          "key": "darth",
          "value": "nihilus"
        }
      ]'

键将被保存为 darth

7 - 操作指南:加密应用程序状态

自动加密应用程序状态并管理密钥轮换

对静态应用程序状态进行加密,以在企业工作负载或受监管环境中提供更强的安全性。Dapr 提供基于 AES 的自动客户端加密,采用 Galois/Counter Mode (GCM),支持 128、192 和 256 位的密钥。

除了自动加密,Dapr 还支持主加密密钥和次加密密钥,使开发人员和运维团队更容易启用密钥轮换策略。此功能由所有 Dapr 状态存储支持。

加密密钥始终从 secret 中获取,不能在 metadata 部分中以明文形式提供。

启用自动加密

将以下 metadata 部分添加到任何 Dapr 支持的状态存储中:

metadata:
- name: primaryEncryptionKey
  secretKeyRef:
    name: mysecret
    key: mykey # key 是可选的。

例如,这是一个 Redis 加密状态存储的完整 YAML:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: primaryEncryptionKey
    secretKeyRef:
      name: mysecret
      key: mykey

现在,您已配置了一个 Dapr 状态存储,以从名为 mysecret 的 secret 中获取加密密钥,其中包含名为 mykey 的实际加密密钥。

实际的加密密钥必须是有效的、十六进制编码的加密密钥。虽然支持 192 位和 256 位密钥,但建议使用 128 位加密密钥。如果加密密钥无效,Dapr 会报错并退出。

例如,您可以使用以下命令生成一个随机的、十六进制编码的 128 位(16 字节)密钥:

openssl rand 16 | hexdump -v -e '/1 "%02x"'
# 结果将类似于 "cb321007ad11a9d23f963bff600d58e0"

注意,secret 存储不必支持密钥。

密钥轮换

为了支持密钥轮换,Dapr 提供了一种指定次加密密钥的方法:

metadata:
- name: primaryEncryptionKey
    secretKeyRef:
      name: mysecret
      key: mykey
- name: secondaryEncryptionKey
    secretKeyRef:
      name: mysecret2
      key: mykey2

当 Dapr 启动时,它会获取 metadata 部分中列出的包含加密密钥的 secrets。Dapr 自动识别哪个状态项是用哪个密钥加密的,因为它会将 secretKeyRef.name 字段附加到实际状态密钥的末尾。

要轮换密钥,

  1. 更改 primaryEncryptionKey 以指向包含新密钥的 secret。
  2. 将旧的主加密密钥移至 secondaryEncryptionKey

新数据将使用新密钥加密,任何检索到的旧数据将使用次密钥解密。

使用旧密钥加密的数据项的任何更新都将使用新密钥重新加密。

相关链接

8 - 与后端状态存储交互

指导如何与特定的后端状态存储进行交互

请查看操作部分,了解支持的状态存储列表,并学习如何配置状态存储组件

8.1 - Azure Cosmos DB

使用 Azure Cosmos DB 作为状态存储

Dapr 在保存和检索状态时不对状态值进行转换。Dapr 要求所有状态存储实现遵循特定的键格式规范(参见状态管理规范)。您可以直接与底层存储交互以操作状态数据,例如:

  • 查询状态。
  • 创建聚合视图。
  • 进行备份。

连接到 Azure Cosmos DB

要连接到您的 Cosmos DB 实例,您可以:

按应用程序 ID 列出键

要获取与应用程序 “myapp” 关联的所有状态键,请使用查询:

SELECT * FROM states WHERE CONTAINS(states.id, 'myapp||')

上述查询返回所有 id 包含 “myapp||” 的文档,这是状态键的前缀。

获取特定状态数据

要通过键 “balance” 获取应用程序 “myapp” 的状态数据,请使用查询:

SELECT * FROM states WHERE states.id = 'myapp||balance'

读取返回文档的 value 字段。要获取状态版本/ETag,请使用命令:

SELECT states._etag FROM states WHERE states.id = 'myapp||balance'

读取 actor 状态

要获取与实例 ID 为 “leroy” 的 actor 类型 “cat” 关联的所有状态键,该 actor 属于 ID 为 “mypets” 的应用程序,请使用命令:

SELECT * FROM states WHERE CONTAINS(states.id, 'mypets||cat||leroy||')

要获取特定的 actor 状态,例如 “food”,请使用命令:

SELECT * FROM states WHERE states.id = 'mypets||cat||leroy||food'

8.2 - Redis

使用 Redis 作为状态存储

Dapr 在保存和检索状态时不对状态值进行转换。Dapr 要求所有状态存储实现遵循特定的键格式规范(参见状态管理规范)。您可以直接与底层存储交互以操作状态数据,例如:

  • 查询状态。
  • 创建聚合视图。
  • 进行备份。

连接到 Redis

您可以使用官方的 redis-cli 或任何其他兼容 Redis 的工具连接到 Redis 状态存储以直接查询 Dapr 状态。如果您在容器中运行 Redis,最简单的使用 redis-cli 的方法是通过容器:

docker run --rm -it --link <Redis 容器的名称> redis redis-cli -h <Redis 容器的名称>

按应用 ID 列出键

要获取与应用程序 “myapp” 关联的所有状态键,请使用命令:

KEYS myapp*

上述命令返回现有键的列表,例如:

1) "myapp||balance"
2) "myapp||amount"

获取特定状态数据

Dapr 将状态值保存为哈希值。每个哈希值包含一个 “data” 字段,其中存储状态数据,以及一个 “version” 字段,作为 ETag,表示不断递增的版本。

例如,要通过键 “balance” 获取应用程序 “myapp” 的状态数据,请使用命令:

HGET myapp||balance data

要获取状态版本/ETag,请使用命令:

HGET myapp||balance version

读取 actor 状态

要获取与应用程序 ID 为 “mypets” 的 actor 类型 “cat” 的实例 ID 为 “leroy” 关联的所有状态键,请使用命令:

KEYS mypets||cat||leroy*

要获取特定的 actor 状态,例如 “food”,请使用命令:

HGET mypets||cat||leroy||food value

8.3 - SQL server

使用 SQL server 作为后端状态存储

Dapr 在保存和检索状态时不对状态值进行转换。Dapr 要求所有状态存储实现遵循特定的键格式(参见状态管理规范)。您可以直接与底层存储交互来操作状态数据,例如:

  • 查询状态。
  • 创建聚合视图。
  • 进行备份。

连接到 SQL Server

连接到 SQL Server 实例的最简单方法是使用:

按应用程序 ID 列出键

要获取与应用程序 “myapp” 关联的所有状态键,请使用以下查询:

SELECT * FROM states WHERE [Key] LIKE 'myapp||%'

上述查询返回所有 ID 包含 “myapp||” 的行,这是状态键的前缀。

获取特定状态数据

要通过键 “balance” 获取应用程序 “myapp” 的状态数据,请使用以下查询:

SELECT * FROM states WHERE [Key] = 'myapp||balance'

读取返回行的 Data 字段。要获取状态版本/ETag,请使用以下命令:

SELECT [RowVersion] FROM states WHERE [Key] = 'myapp||balance'

获取过滤的状态数据

要获取 JSON 数据中值 “color” 等于 “blue” 的所有状态数据,请使用以下查询:

SELECT * FROM states WHERE JSON_VALUE([Data], '$.color') = 'blue'

读取 actor 状态

要获取与 actor 类型 “cat” 的实例 ID “leroy” 关联的所有状态键,该 actor 属于 ID 为 “mypets” 的应用程序,请使用以下命令:

SELECT * FROM states WHERE [Key] LIKE 'mypets||cat||leroy||%'

要获取特定的 actor 状态,例如 “food”,请使用以下命令:

SELECT * FROM states WHERE [Key] = 'mypets||cat||leroy||food'

9 - 状态生存时间 (TTL)

管理具有 TTL 的状态。

Dapr 允许为每个状态设置生存时间 (TTL)。这意味着应用程序可以为存储的每个状态指定一个生存时间,过期后将无法检索这些状态。

对于支持的状态存储,只需在发布消息时设置 ttlInSeconds 元数据。其他状态存储将忽略此值。对于某些状态存储,您可以为每个表或容器指定默认的过期时间。

原生状态 TTL 支持

当状态存储组件原生支持状态 TTL 时,Dapr 会直接传递 TTL 配置,而不添加额外的逻辑,从而保持行为的一致性。这在组件以不同方式处理过期状态时尤为有用。

如果未指定 TTL,将保留状态存储的默认行为。

显式持久化绕过全局定义的 TTL

对于允许为所有数据指定默认 TTL 的状态存储,持久化状态的方式包括:

  • 通过 Dapr 组件设置全局 TTL 值,或
  • 在 Dapr 之外创建状态存储并设置全局 TTL 值。

如果未指定特定的 TTL,数据将在全局 TTL 时间段后过期,这不是由 Dapr 直接控制的。

此外,所有状态存储还支持显式持久化数据的选项。这意味着您可以忽略默认的数据库策略(可能是在 Dapr 之外或通过 Dapr 组件设置的),以无限期保留特定的数据库记录。您可以通过将 ttlInSeconds 设置为 -1 来实现,这表示忽略任何设置的 TTL 值。

支持的组件

请参阅状态存储组件指南中的 TTL 列。

示例

您可以在状态存储请求的元数据中设置状态 TTL:

# 依赖

from dapr.clients import DaprClient

# 代码

DAPR_STORE_NAME = "statestore"

with DaprClient() as client:
        client.save_state(DAPR_STORE_NAME, "order_1", str(orderId), state_metadata={
            'ttlInSeconds': '120'
        }) 

要启动 Dapr sidecar 并运行上述示例应用程序,您可以运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
// 依赖

using Dapr.Client;

// 代码

await client.SaveStateAsync(storeName, stateKeyName, state, metadata: new Dictionary<string, string>() { 
    { 
        "ttlInSeconds", "120" 
    } 
});

要启动 Dapr sidecar 并运行上述示例应用程序,您可以运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
// 依赖

import (
	dapr "github.com/dapr/go-sdk/client"
)

// 代码

md := map[string]string{"ttlInSeconds": "120"}
if err := client.SaveState(ctx, store, "key1", []byte("hello world"), md); err != nil {
   panic(err)
}

要启动 Dapr sidecar 并运行上述示例应用程序,您可以运行类似以下的命令:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run .
curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250", "metadata": { "ttlInSeconds": "120" } }]' http://localhost:3601/v1.0/state/statestore
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "order_1", "value": "250", "metadata": {"ttlInSeconds": "120"}}]' -Uri 'http://localhost:3601/v1.0/state/statestore'

相关链接