1 - 发布-订阅模式概述

pubsub API 的基本构建块概述

发布-订阅模式(pubsub)使微服务能够通过消息进行事件驱动的架构通信。

  • 生产者或发布者将消息写入输入通道并发送到主题,而不关心哪个应用程序会接收它们。
  • 消费者或订阅者订阅主题并从输出通道接收消息,而不关心哪个服务生成了这些消息。

消息代理会将每条消息从发布者的输入通道复制到所有对该消息感兴趣的订阅者的输出通道。这种模式在需要将微服务解耦时特别有用。



pubsub API

在 Dapr 中,pubsub API:

  • 提供一个平台无关的 API 来发送和接收消息。
  • 确保消息至少被传递一次。
  • 与多种消息代理和队列系统集成。

您可以在运行时配置 Dapr pubsub 组件来使用特定的消息代理,这种可插拔性使您的服务更具可移植性和灵活性。

在 Dapr 中使用 pubsub 时:

  1. 您的服务通过网络调用 Dapr pubsub 构建块 API。
  2. pubsub 构建块调用封装特定消息代理的 Dapr pubsub 组件。
  3. 为了接收主题上的消息,Dapr 代表您的服务订阅 pubsub 组件,并在消息到达时将其传递到您的服务的端点。

以下概述视频和演示展示了 Dapr pubsub 的工作原理。

在下图中,“shipping”服务和“email”服务都已订阅由“cart”服务发布的主题。每个服务加载指向相同 pubsub 消息代理组件的 pubsub 组件配置文件;例如:Redis Streams、NATS Streaming、Azure Service Bus 或 GCP pubsub。

在下图中,Dapr API 将“cart”服务的“order”主题发布到“shipping”和“email”订阅服务的“order”端点。

查看 Dapr 支持的 pubsub 组件的完整列表

特性

pubsub API 构建块为您的应用程序带来了多个特性。

使用 Cloud Events 发送消息

为了启用消息路由并为服务之间的每条消息提供额外的上下文,Dapr 使用 CloudEvents 1.0 规范 作为其消息格式。任何应用程序通过 Dapr 发送到主题的消息都会自动包装在 Cloud Events 信封中,使用 Content-Type 头值 作为 datacontenttype 属性。

有关更多信息,请阅读 使用 CloudEvents 进行消息传递,或 发送不带 CloudEvents 的原始消息

与不使用 Dapr 和 CloudEvents 的应用程序通信

如果您的一个应用程序使用 Dapr 而另一个不使用,您可以为发布者或订阅者禁用 CloudEvent 包装。这允许在无法一次性采用 Dapr 的应用程序中部分采用 Dapr pubsub。

有关更多信息,请阅读 如何在没有 CloudEvents 的情况下使用 pubsub

设置消息内容类型

发布消息时,指定发送数据的内容类型很重要。除非指定,否则 Dapr 将假定为 text/plain

  • HTTP 客户端:可以在 Content-Type 头中设置内容类型
  • gRPC 客户端和 SDK:有一个专用的内容类型参数

消息传递

原则上,Dapr 认为消息一旦被订阅者处理并以非错误响应进行响应,就已成功传递。为了更细粒度的控制,Dapr 的 pubsub API 还提供了明确的状态,定义在响应负载中,订阅者可以用这些状态向 Dapr 指示特定的处理指令(例如,RETRYDROP)。

使用主题订阅接收消息

Dapr 应用程序可以通过支持相同功能的三种订阅类型订阅已发布的主题:声明式、流式和编程式。

订阅类型 描述
声明式 订阅在外部文件中定义。声明式方法消除了代码中的 Dapr 依赖性,并允许现有应用程序订阅主题,而无需更改代码。
流式 订阅在用户代码中定义。流式订阅是动态的,意味着它们允许在运行时添加或删除订阅。它们不需要应用程序中的订阅端点(这是编程式和声明式订阅所需的),使其易于在代码中配置。流式订阅也不需要应用程序配置 sidecar 来接收消息。由于消息被发送到消息处理程序代码,因此流式订阅中没有路由或批量订阅的概念。
编程式 订阅在用户代码中定义。编程式方法实现了静态订阅,并需要在代码中有一个端点。

有关更多信息,请阅读 关于订阅类型的订阅

重新加载主题订阅

要重新加载以编程方式或声明式定义的主题订阅,需要重新启动 Dapr sidecar。 通过启用 HotReload 功能门,可以使 Dapr sidecar 动态重新加载更改的声明式主题订阅,而无需重新启动。 主题订阅的热重载目前是一个预览功能。 重新加载订阅时,正在传输的消息不受影响。

消息路由

Dapr 提供 基于内容的路由 模式。Pubsub 路由 是此模式的实现,允许开发人员使用表达式根据其内容将 CloudEvents 路由到应用程序中的不同 URI/路径和事件处理程序。如果没有路由匹配,则使用可选的默认路由。随着您的应用程序扩展以支持多个事件版本或特殊情况,这很有用。

此功能适用于声明式和编程式订阅方法。

有关消息路由的更多信息,请阅读 Dapr pubsub API 参考

使用死信主题处理失败的消息

有时,由于各种可能的问题,例如生产者或消费者应用程序中的错误条件或导致应用程序代码出现问题的意外状态更改,消息无法被处理。Dapr 允许开发人员设置死信主题来处理无法传递到应用程序的消息。此功能适用于所有 pubsub 组件,并防止消费者应用程序无休止地重试失败的消息。有关更多信息,请阅读 死信主题

启用外发模式

Dapr 使开发人员能够使用外发模式在事务性状态存储和任何消息代理之间实现单一事务。有关更多信息,请阅读 如何启用事务性外发消息

命名空间消费者组

Dapr 通过 命名空间消费者组 解决大规模多租户问题。只需在组件元数据中包含 "{namespace}" 值,即可允许具有相同 app-id 的多个命名空间的应用程序发布和订阅相同的消息代理。

至少一次保证

Dapr 保证消息传递的至少一次语义。当应用程序使用 pubsub API 向主题发布消息时,Dapr 确保消息至少一次传递给每个订阅者。

即使消息传递失败,或者您的应用程序崩溃,Dapr 也会尝试重新传递消息,直到成功传递。

所有 Dapr pubsub 组件都支持至少一次保证。

消费者组和竞争消费者模式

Dapr 处理消费者组和竞争消费者模式的负担。在竞争消费者模式中,使用单个消费者组的多个应用程序实例竞争消息。当副本使用相同的 app-id 而没有显式消费者组覆盖时,Dapr 强制执行竞争消费者模式。

当同一应用程序的多个实例(具有相同的 app-id)订阅一个主题时,Dapr 将每条消息仅传递给该应用程序的一个实例。此概念在下图中进行了说明。



同样,如果两个不同的应用程序(具有不同的 app-id)订阅同一主题,Dapr 将每条消息仅传递给每个应用程序的一个实例

并非所有 Dapr pubsub 组件都支持竞争消费者模式。目前,以下(非详尽)pubsub 组件支持此功能:

为增强安全性设置主题范围

默认情况下,与 pubsub 组件实例关联的所有主题消息对配置了该组件的每个应用程序都是可用的。您可以使用 Dapr 主题范围限制哪个应用程序可以发布或订阅主题。有关更多信息,请阅读:pubsub 主题范围

消息生存时间(TTL)

Dapr 可以在每条消息的基础上设置超时消息,这意味着如果消息未从 pubsub 组件中读取,则消息将被丢弃。此超时消息可防止未读消息的积累。如果消息在队列中的时间超过配置的 TTL,则标记为死信。有关更多信息,请阅读 pubsub 消息 TTL

发布和订阅批量消息

Dapr 支持在单个请求中发送和接收多条消息。当编写需要发送或接收大量消息的应用程序时,使用批量操作可以通过减少请求总数来实现高吞吐量。有关更多信息,请阅读 pubsub 批量消息

使用 StatefulSets 扩展订阅者

在 Kubernetes 上运行时,使用 StatefulSets 结合 {podName} 标记,订阅者可以为每个实例拥有一个粘性 consumerID。请参阅 如何使用 StatefulSets 水平扩展订阅者

试用 pubsub

快速入门和教程

想要测试 Dapr pubsub API 吗?通过以下快速入门和教程来查看 pubsub 的实际应用:

快速入门/教程 描述
Pubsub 快速入门 使用发布和订阅 API 发送和接收消息。
Pubsub 教程 演示如何使用 Dapr 启用 pubsub 应用程序。使用 Redis 作为 pubsub 组件。

直接在您的应用中开始使用 pubsub

想要跳过快速入门?没问题。您可以直接在应用程序中试用 pubsub 构建块来发布消息并订阅主题。在 安装 Dapr 后,您可以从 pubsub 如何指南 开始使用 pubsub API。

下一步

2 - 如何:发布消息并订阅主题

学习如何使用一个服务向主题发送消息,并在另一个服务中订阅该主题

既然您已经了解了Dapr pubsub构建块的功能,接下来我们来看看如何在您的服务中应用它。下面的代码示例描述了一个使用两个服务处理订单的应用程序,每个服务都有Dapr sidecar:

  • 一个结账服务,使用Dapr订阅消息队列中的主题。
  • 一个订单处理服务,使用Dapr向RabbitMQ发布消息。
示例服务的状态管理图

Dapr会自动将用户的负载封装在符合CloudEvents v1.0的格式中,并使用Content-Type头的值作为datacontenttype属性。了解更多关于CloudEvents的消息。

以下示例展示了如何在您的应用程序中发布和订阅名为orders的主题。

设置Pub/Sub组件

第一步是设置pubsub组件:

当您运行dapr init时,Dapr会创建一个默认的Redis pubsub.yaml并在您的本地机器上运行一个Redis容器,位置如下:

  • 在Windows上,位于%UserProfile%\.dapr\components\pubsub.yaml
  • 在Linux/MacOS上,位于~/.dapr/components/pubsub.yaml

使用pubsub.yaml组件,您可以轻松地更换底层组件而无需更改应用程序代码。在此示例中,使用RabbitMQ。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: order-pub-sub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel
scopes:
  - orderprocessing
  - checkout

您可以通过创建一个包含该文件的组件目录(在此示例中为myComponents)并使用dapr run CLI命令的--resources-path标志来覆盖此文件。

dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
dapr run --app-id myapp --resources-path ./myComponents -- npm start

要将其部署到Kubernetes集群中,请填写以下YAML中的pub/sub组件metadata连接详细信息,保存为pubsub.yaml,然后运行kubectl apply -f pubsub.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: order-pub-sub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: connectionString
    value: "amqp://localhost:5672"
  - name: protocol
    value: amqp  
  - name: hostname
    value: localhost 
  - name: username
    value: username
  - name: password
    value: password 
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel
scopes:
  - orderprocessing
  - checkout

订阅主题

Dapr提供了三种方法来订阅主题:

  • 声明式,在外部文件中定义订阅。
  • 流式,在用户代码中定义订阅。
  • 编程式,在用户代码中定义订阅。

声明式、流式和编程式订阅文档中了解更多信息。此示例演示了声明式订阅。

创建一个名为subscription.yaml的文件并粘贴以下内容:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
scopes:
- orderprocessing
- checkout

上面的示例显示了对主题orders的事件订阅,针对pubsub组件order-pub-sub

  • route字段指示Dapr将所有主题消息发送到应用程序中的/checkout端点。
  • scopes字段指定此订阅适用于ID为orderprocessingcheckout的应用程序。

subscription.yaml放在与您的pubsub.yaml组件相同的目录中。当Dapr启动时,它会加载订阅和组件。

以下是利用Dapr SDK订阅您在subscription.yaml中定义的主题的代码示例。

//依赖项 
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
using Dapr;
using Dapr.Client;

//代码
namespace CheckoutService.controller
{
    [ApiController]
    public class CheckoutServiceController : Controller
    {
         //订阅一个主题 
        [Topic("order-pub-sub", "orders")]
        [HttpPost("checkout")]
        public void getCheckout([FromBody] int orderId)
        {
            Console.WriteLine("订阅者接收到 : " + orderId);
        }
    }
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-protocol https dotnet run
//依赖项
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

//代码
@RestController
public class CheckoutServiceController {

    private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
     //订阅一个主题
    @Topic(name = "orders", pubsubName = "order-pub-sub")
    @PostMapping(path = "/checkout")
    public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
        return Mono.fromRunnable(() -> {
            try {
                log.info("订阅者接收到: " + cloudEvent.getData());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
#依赖项
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import logging
import json

#代码
app = App()
logging.basicConfig(level = logging.INFO)
#订阅一个主题 
@app.subscribe(pubsub_name='order-pub-sub', topic='orders')
def mytopic(event: v1.Event) -> None:
    data = json.loads(event.Data())
    logging.info('订阅者接收到: ' + str(data))

app.run(6002)

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
//依赖项
import (
	"log"
	"net/http"
	"context"

	"github.com/dapr/go-sdk/service/common"
	daprd "github.com/dapr/go-sdk/service/http"
)

//代码
var sub = &common.Subscription{
	PubsubName: "order-pub-sub",
	Topic:      "orders",
	Route:      "/checkout",
}

func main() {
	s := daprd.NewService(":6002")
   //订阅一个主题
	if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
		log.Fatalf("添加主题订阅时出错: %v", err)
	}
	if err := s.Start(); err != nil && err != http.ErrServerClosed {
		log.Fatalf("监听时出错: %v", err)
	}
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	log.Printf("订阅者接收到: %s", e.Data)
	return false, nil
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
//依赖项
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr'; 

//代码
const daprHost = "127.0.0.1"; 
const serverHost = "127.0.0.1";
const serverPort = "6002"; 

start().catch((e) => {
    console.error(e);
    process.exit(1);
});

async function start(orderId) {
    const server = new DaprServer({
        serverHost,
        serverPort,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
        clientOptions: {
          daprHost,
          daprPort: process.env.DAPR_HTTP_PORT,
        },
    });
    //订阅一个主题
    await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
        console.log(`订阅者接收到: ${JSON.stringify(orderId)}`)
    });
    await server.start();
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start

发布消息

启动一个名为orderprocessing的Dapr实例:

dapr run --app-id orderprocessing --dapr-http-port 3601

然后向orders主题发布消息:

dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

以下是利用Dapr SDK发布主题的代码示例。

//依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;

//代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
           string PUBSUB_NAME = "order-pub-sub";
           string TOPIC_NAME = "orders";
           while(true) {
                System.Threading.Thread.Sleep(5000);
                Random random = new Random();
                int orderId = random.Next(1,1000);
                CancellationTokenSource source = new CancellationTokenSource();
                CancellationToken cancellationToken = source.Token;
                using var client = new DaprClientBuilder().Build();
                //使用Dapr SDK发布主题
                await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
                Console.WriteLine("发布的数据: " + orderId);
		        }
        }
    }
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-protocol https dotnet run
//依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;

//代码
@SpringBootApplication
public class OrderProcessingServiceApplication {

	private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

	public static void main(String[] args) throws InterruptedException{
		String MESSAGE_TTL_IN_SECONDS = "1000";
		String TOPIC_NAME = "orders";
		String PUBSUB_NAME = "order-pub-sub";

		while(true) {
			TimeUnit.MILLISECONDS.sleep(5000);
			Random random = new Random();
			int orderId = random.nextInt(1000-1) + 1;
			DaprClient client = new DaprClientBuilder().build();
      //使用Dapr SDK发布主题
			client.publishEvent(
					PUBSUB_NAME,
					TOPIC_NAME,
					orderId,
					singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
			log.info("发布的数据:" + orderId);
		}
	}
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
#依赖项  
import random
from time import sleep    
import requests
import logging
import json
from dapr.clients import DaprClient

#代码
logging.basicConfig(level = logging.INFO)
while True:
    sleep(random.randrange(50, 5000) / 1000)
    orderId = random.randint(1, 1000)
    PUBSUB_NAME = 'order-pub-sub'
    TOPIC_NAME = 'orders'
    with DaprClient() as client:
        #使用Dapr SDK发布主题
        result = client.publish_event(
            pubsub_name=PUBSUB_NAME,
            topic_name=TOPIC_NAME,
            data=json.dumps(orderId),
            data_content_type='application/json',
        )
    logging.info('发布的数据: ' + str(orderId))

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
//依赖项
import (
	"context"
	"log"
	"math/rand"
	"time"
	"strconv"
	dapr "github.com/dapr/go-sdk/client"
)

//代码
var (
	PUBSUB_NAME = "order-pub-sub"
	TOPIC_NAME  = "orders"
)

func main() {
	for i := 0; i < 10; i++ {
		time.Sleep(5000)
		orderId := rand.Intn(1000-1) + 1
		client, err := dapr.NewClient()
		if err != nil {
			panic(err)
		}
		defer client.Close()
		ctx := context.Background()
    //使用Dapr SDK发布主题
		if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId))); 
		err != nil {
			panic(err)
		}

		log.Println("发布的数据: " + strconv.Itoa(orderId))
	}
}

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
//依赖项
import { DaprServer, DaprClient, CommunicationProtocolEnum } from '@dapr/dapr'; 

const daprHost = "127.0.0.1"; 

var main = function() {
    for(var i=0;i<10;i++) {
        sleep(5000);
        var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
        start(orderId).catch((e) => {
            console.error(e);
            process.exit(1);
        });
    }
}

async function start(orderId) {
    const PUBSUB_NAME = "order-pub-sub"
    const TOPIC_NAME  = "orders"
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT, 
        communicationProtocol: CommunicationProtocolEnum.HTTP
    });
    console.log("发布的数据:" + orderId)
    //使用Dapr SDK发布主题
    await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

main();

导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

消息确认和重试

为了告诉Dapr消息已成功处理,返回200 OK响应。如果Dapr收到的返回状态码不是200,或者您的应用程序崩溃,Dapr将尝试根据至少一次语义重新传递消息。

演示视频

观看此演示视频以了解更多关于Dapr的pubsub消息传递。

下一步

3 - 使用 CloudEvents 进行消息传递

了解 Dapr 使用 CloudEvents 的原因,它们在 Dapr 发布订阅中的工作原理,以及如何创建 CloudEvents。

为了实现消息路由并为每条消息提供额外的上下文,Dapr 采用 CloudEvents 1.0 规范 作为其消息格式。通过 Dapr 发送到主题的任何消息都会自动被包装在 CloudEvents 信封中,使用 Content-Type 头部值 作为 datacontenttype 属性。

Dapr 使用 CloudEvents 为事件负载提供额外的上下文,从而实现以下功能:

  • 跟踪
  • 事件数据的正确反序列化的内容类型
  • 发送应用程序的验证

您可以选择以下三种方法之一通过发布订阅发布 CloudEvent:

  1. 发送一个发布订阅事件,然后由 Dapr 包装在 CloudEvent 信封中。
  2. 通过覆盖标准 CloudEvent 属性来替换 Dapr 提供的特定 CloudEvents 属性。
  3. 将您自己的 CloudEvent 信封作为发布订阅事件的一部分编写。

Dapr 生成的 CloudEvents 示例

向 Dapr 发送发布操作会自动将其包装在一个包含以下字段的 CloudEvent 信封中:

  • id
  • source
  • specversion
  • type
  • traceparent
  • traceid
  • tracestate
  • topic
  • pubsubname
  • time
  • datacontenttype (可选)

以下示例演示了 Dapr 为发布到 orders 主题的操作生成的 CloudEvent,其中包括:

  • 一个 W3C traceid,唯一标识消息
  • data 和 CloudEvent 的字段,其中数据内容被序列化为 JSON
{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "checkout",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}

作为另一个 v1.0 CloudEvent 的示例,以下显示了在 CloudEvent 消息中以 JSON 序列化的 XML 内容的数据:

{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data" : "<note><to></to><from>user2</from><message>Order</message></note>",
  "id" : "id-1234-5678-9101",
  "specversion" : "1.0",
  "datacontenttype" : "text/xml",
  "subject" : "Test XML Message",
  "source" : "https://example.com/message",
  "type" : "xml.message",
   "time" : "2020-09-23T06:23:21Z"
}

替换 Dapr 生成的 CloudEvents 值

Dapr 自动生成多个 CloudEvent 属性。您可以通过提供以下可选元数据键/值来替换这些生成的 CloudEvent 属性:

  • cloudevent.id: 覆盖 id
  • cloudevent.source: 覆盖 source
  • cloudevent.type: 覆盖 type
  • cloudevent.traceid: 覆盖 traceid
  • cloudevent.tracestate: 覆盖 tracestate
  • cloudevent.traceparent: 覆盖 traceparent

使用这些元数据属性替换 CloudEvents 属性的能力适用于所有发布订阅组件。

示例

例如,要替换代码中上述 CloudEvent 示例中的 sourceid 值:

with DaprClient() as client:
    order = {'orderId': i}
    # 使用 Dapr 发布订阅发布事件/消息
    result = client.publish_event(
        pubsub_name='order_pub_sub',
        topic_name='orders',
        publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
    )
var order = new Order(i);
using var client = new DaprClientBuilder().Build();

// 覆盖 cloudevent 元数据
var metadata = new Dictionary<string,string>() {
    { "cloudevent.source", "payment" },
    { "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}

// 使用 Dapr 发布订阅发布事件/消息
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);

await Task.Delay(TimeSpan.FromSeconds(1));

然后 JSON 负载反映新的 sourceid 值:

{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "payment",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}

发布您自己的 CloudEvent

如果您想使用自己的 CloudEvent,请确保将 datacontenttype 指定为 application/cloudevents+json

如果应用程序编写的 CloudEvent 不包含 CloudEvent 规范中最低要求的字段,则消息将被拒绝。如果缺少,Dapr 会将以下字段添加到 CloudEvent 中:

  • time
  • traceid
  • traceparent
  • tracestate
  • topic
  • pubsubname
  • source
  • type
  • specversion

您可以向自定义 CloudEvent 添加不属于官方 CloudEvent 规范的其他字段。Dapr 将按原样传递这些字段。

示例

发布一个 CloudEvent 到 orders 主题:

dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'

发布一个 CloudEvent 到 orders 主题:

curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'

发布一个 CloudEvent 到 orders 主题:

Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

事件去重

使用 Dapr 创建的 CloudEvents 时,信封中包含一个 id 字段,应用程序可以使用该字段执行消息去重。Dapr 不会自动进行去重处理。Dapr 支持使用本身具备消息去重功能的消息代理。

下一步

4 - 发布和订阅非CloudEvents消息

了解何时可能不使用CloudEvents以及如何禁用它们。

在将Dapr集成到您的应用程序时,由于兼容性原因或某些应用程序不使用Dapr,某些服务可能仍需要通过不封装在CloudEvents中的pub/sub消息进行通信。这些消息被称为“原始”pub/sub消息。Dapr允许应用程序发布和订阅原始事件,这些事件未封装在CloudEvent中以实现兼容性。

发布原始消息

Dapr应用程序可以将原始事件发布到pub/sub主题中,而不需要CloudEvent封装,以便与非Dapr应用程序兼容。

显示当订阅者不使用Dapr或CloudEvent时如何使用Dapr发布的图示

要禁用CloudEvent封装,请在发布请求中将rawPayload元数据设置为true。这样,订阅者可以接收这些消息而无需解析CloudEvent架构。

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"order-number": "345"}'
from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order-number': '345'
    }
    # 创建一个带有内容类型和主体的类型化消息
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic_name='TOPIC_A',
        data=json.dumps(req_data),
        publish_metadata={'rawPayload': 'true'}
    )
    # 打印请求
    print(req_data, flush=True)
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
    $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
    $publisher->topic('TOPIC_A')->publish('data', ['rawPayload' => 'true']);
});

订阅原始消息

Dapr应用程序还可以订阅来自不使用CloudEvent封装的现有pub/sub主题的原始事件。

显示当发布者不使用Dapr或CloudEvent时如何使用Dapr订阅的图示

以编程方式订阅原始事件

在以编程方式订阅时,添加rawPayload的额外元数据条目,以便Dapr sidecar自动将负载封装到与当前Dapr SDK兼容的CloudEvent中。

import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'deathStarStatus',
                      'route': 'dsstatus',
                      'metadata': {
                          'rawPayload': 'true',
                      } }]
    return jsonify(subscriptions)

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

app.run()
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
    new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus', metadata: [ 'rawPayload' => 'true'] ),
]]));

$app->post('/dsstatus', function(
    #[\Dapr\Attributes\FromBody]
    \Dapr\PubSub\CloudEvent $cloudEvent,
    \Psr\Log\LoggerInterface $logger
    ) {
        $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
        return ['status' => 'SUCCESS'];
    }
);

$app->start();

声明式订阅原始事件

同样,您可以通过在订阅规范中添加rawPayload元数据条目来声明式地订阅原始事件。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  routes: 
    default: /dsstatus
  pubsubname: pubsub
  metadata:
    rawPayload: "true"
scopes:
- app1
- app2

下一步

5 - 操作指南:将消息路由到不同的事件处理程序

学习如何根据 CloudEvent 字段将主题中的消息路由到不同的事件处理程序

pubsub 路由实现了基于内容的路由,这是一种使用 DSL 而不是命令式应用程序代码的消息模式。通过 pubsub 路由,您可以根据 CloudEvents 的内容,将消息路由到应用程序中的不同 URI/路径和事件处理程序。如果没有匹配的路由,则可以使用可选的默认路由。随着您的应用程序扩展以支持多个事件版本或特殊情况,这种方法将非常有用。

虽然可以通过代码实现路由,但将路由规则与应用程序分离可以提高可移植性。

此功能适用于声明式和编程式订阅方法,但不适用于流式订阅。

声明式订阅

对于声明式订阅,使用 dapr.io/v2alpha1 作为 apiVersion。以下是使用路由的 subscriptions.yaml 示例:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  pubsubname: pubsub
  topic: inventory
  routes:
    rules:
      - match: event.type == "widget"
        path: /widgets
      - match: event.type == "gadget"
        path: /gadgets
    default: /products
scopes:
  - app1
  - app2

编程式订阅

在编程方法中,返回的是 routes 结构而不是 route。JSON 结构与声明式 YAML 匹配:

import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'inventory',
        'routes': {
          'rules': [
            {
              'match': 'event.type == "widget"',
              'path': '/widgets'
            },
            {
              'match': 'event.type == "gadget"',
              'path': '/gadgets'
            },
          ],
          'default': '/products'
        }
      }]
    return jsonify(subscriptions)

@app.route('/products', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "inventory",
      routes: {
        rules: [
          {
            match: 'event.type == "widget"',
            path: '/widgets'
          },
          {
            match: 'event.type == "gadget"',
            path: '/gadgets'
          },
        ],
        default: '/products'
      }
    }
  ]);
})

app.post('/products', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
        [Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
        [HttpPost("widgets")]
        public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

        [Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
        [HttpPost("gadgets")]
        public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

        [Topic("pubsub", "inventory")]
        [HttpPost("products")]
        public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

type rule struct {
	Match string `json:"match"`
	Path  string `json:"path"`
}

// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "inventory",
			Routes: routes{
				Rules: []rule{
					{
						Match: `event.type == "widget"`,
						Path:  "/widgets",
					},
					{
						Match: `event.type == "gadget"`,
						Path:  "/gadgets",
					},
				},
				Default: "/products",
			},
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
    new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'inventory', routes: (
      rules: => [
        ('match': 'event.type == "widget"', path: '/widgets'),
        ('match': 'event.type == "gadget"', path: '/gadgets'),
      ]
      default: '/products')),
]]));
$app->post('/products', function(
    #[\Dapr\Attributes\FromBody]
    \Dapr\PubSub\CloudEvent $cloudEvent,
    \Psr\Log\LoggerInterface $logger
    ) {
        $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
        return ['status' => 'SUCCESS'];
    }
);
$app->start();

通用表达式语言 (CEL)

在这些示例中,根据 event.type,应用程序将被调用于:

  • /widgets
  • /gadgets
  • /products

表达式是用通用表达式语言 (CEL)编写的,其中 event 代表云事件。表达式中可以引用 CloudEvents 核心规范中的任何属性。

示例表达式

匹配“重要”消息:

has(event.data.important) && event.data.important == true

匹配大于 $10,000 的存款:

event.type == "deposit" && int(event.data.amount) > 10000

匹配消息的多个版本:

event.type == "mymessage.v1"
event.type == "mymessage.v2"

CloudEvent 属性

作为参考,以下属性来自 CloudEvents 规范。

事件数据

data

根据术语 data 的定义,CloudEvents 可能 包含有关事件发生的领域特定信息。当存在时,此信息将被封装在 data 中。

  • 描述: 事件负载。此规范对信息类型没有限制。它被编码为一种媒体格式,由 datacontenttype 属性指定(例如 application/json),并在这些相应属性存在时遵循 dataschema 格式。
  • 约束:
    • 可选

必需属性

以下属性在所有 CloudEvents 中是必需的:

id

  • 类型: String
  • 描述: 标识事件。生产者 必须 确保 source + id 对于每个不同的事件都是唯一的。如果由于网络错误而重新发送重复事件,则它可能具有相同的 id。消费者可以假设具有相同 sourceid 的事件是重复的。
  • 约束:
    • 必需
    • 必须是非空字符串
    • 必须在生产者范围内唯一
  • 示例:
    • 由生产者维护的事件计数器
    • UUID

source

  • 类型: URI-reference

  • 描述: 标识事件发生的上下文。通常包括以下信息:

    • 事件源的类型
    • 发布事件的组织
    • 产生事件的过程

    URI 中编码的数据的确切语法和语义由事件生产者定义。

    生产者 必须 确保 source + id 对于每个不同的事件都是唯一的。

    应用程序可以:

    • 为每个不同的生产者分配一个唯一的 source,以便更容易生成唯一的 ID,并防止其他生产者具有相同的 source
    • 使用 UUID、URN、DNS 权威或应用程序特定方案创建唯一的 source 标识符。

    一个 source 可能包含多个生产者。在这种情况下,生产者 必须 合作以确保 source + id 对于每个不同的事件都是唯一的。

  • 约束:

    • 必需
    • 必须是非空 URI-reference
    • 推荐使用绝对 URI
  • 示例:

    • 具有 DNS 权威的互联网范围唯一 URI:
    • 具有 UUID 的全球唯一 URN:
      • urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66
    • 应用程序特定标识符:
      • /cloudevents/spec/pull/123
      • /sensors/tn-1234567/alerts
      • 1-555-123-4567

specversion

  • 类型: String

  • 描述: 事件使用的 CloudEvents 规范版本。这使得上下文的解释成为可能。合规的事件生产者 必须 在引用此版本的规范时使用 1.0 值。

    目前,此属性仅包含“主要”和“次要”版本号。这允许在不更改此属性值的情况下对规范进行补丁更改。

    注意:对于“候选发布”版本,可能会使用后缀进行测试。

  • 约束:

    • 必需
    • 必须是非空字符串

type

  • 类型: String
  • 描述: 包含描述与原始事件相关的事件类型的值。通常,此属性用于路由、可观察性、策略执行等。格式由生产者定义,可能包括 type 的版本信息。有关更多信息,请参阅CloudEvents 的版本控制
  • 约束:
    • 必需
    • 必须是非空字符串
    • 应该以反向 DNS 名称为前缀。前缀域决定了定义此事件类型语义的组织。
  • 示例:
    • com.github.pull_request.opened
    • com.example.object.deleted.v2

可选属性

以下属性在 CloudEvents 中是可选的。有关可选定义的更多信息,请参阅符号约定部分。

datacontenttype

  • 类型: String 根据 RFC 2046

  • 描述: data 值的内容类型。此属性使 data 能够携带任何类型的内容,其中格式和编码可能与所选事件格式不同。

    例如,使用 JSON 信封格式呈现的事件可能在 data 中携带 XML 负载。此属性被设置为 "application/xml",通知消费者。

    不同 datacontenttype 值的数据内容呈现规则在事件格式规范中定义。例如,JSON 事件格式在第 3.1 节中定义了关系。

    对于某些二进制模式协议绑定,此字段直接映射到相应协议的内容类型元数据属性。您可以在相应协议中找到二进制模式和内容类型元数据映射的规范规则。

    在某些事件格式中,您可以省略 datacontenttype 属性。例如,如果 JSON 格式事件没有 datacontenttype 属性,则意味着 data 是符合 "application/json" 媒体类型的 JSON 值。换句话说:没有 datacontenttype 的 JSON 格式事件与 datacontenttype="application/json" 的事件完全等效。

    当将没有 datacontenttype 属性的事件消息转换为不同格式或协议绑定时,目标 datacontenttype 应明确设置为源的隐含 datacontenttype

  • 约束:

    • 可选
    • 如果存在,必须符合 RFC 2046 中指定的格式
  • 有关媒体类型示例,请参阅 IANA 媒体类型

dataschema

  • 类型: URI
  • 描述: 标识 data 遵循的模式。与模式不兼容的更改应通过不同的 URI 反映。有关更多信息,请参阅CloudEvents 的版本控制
  • 约束:
    • 可选
    • 如果存在,必须是非空 URI

subject

  • 类型: String

  • 描述: 这描述了事件生产者(由 source 标识)上下文中的事件主题。在发布-订阅场景中,订阅者通常会订阅由 source 发出的事件。如果 source 上下文具有内部子结构,则仅 source 标识符可能不足以作为任何特定事件的限定符。

    在上下文元数据中(而不是仅在 data 负载中)识别事件的主题在通用订阅过滤场景中很有帮助,其中中间件无法解释 data 内容。在上述示例中,订阅者可能只对名称以 ‘.jpg’ 或 ‘.jpeg’ 结尾的 blob 感兴趣。使用 subject 属性,您可以为该事件子集构建简单而高效的字符串后缀过滤器。

  • 约束:

    • 可选
    • 如果存在,必须是非空字符串
  • 示例:
    订阅者可能会注册对在 blob 存储容器中创建新 blob 时的兴趣。在这种情况下:

    • 事件 source 标识订阅范围(存储容器)
    • 事件 type 标识“blob 创建”事件
    • 事件 id 唯一标识事件实例,以区分同名 blob 的单独创建事件。

    新创建的 blob 的名称在 subject 中传递:

time

  • 类型: Timestamp
  • 描述: 事件发生的时间戳。如果无法确定事件发生的时间,则此属性可以由 CloudEvents 生产者设置为其他时间(例如当前时间)。然而,所有相同 source 的生产者 必须 在这方面保持一致。换句话说,要么他们都使用事件发生的实际时间,要么他们都使用相同的算法来确定使用的值。
  • 约束:
    • 可选
    • 如果存在,必须符合 RFC 3339 中指定的格式

社区电话演示

观看此视频以了解如何使用 pubsub 进行消息路由:

下一步

6 - 声明式、流式和编程式订阅类型

了解更多关于允许您订阅消息主题的订阅类型。

发布/订阅 API 订阅类型

Dapr 应用程序可以通过三种订阅类型来订阅已发布的主题,这三种类型支持相同的功能:声明式、流式和编程式。

订阅类型 描述
声明式 订阅在外部文件中定义。声明式方法将 Dapr 的依赖从代码中移除,允许现有应用程序无需更改代码即可订阅主题。
流式 订阅在应用程序代码中定义。流式订阅是动态的,允许在运行时添加或删除订阅。它们不需要在应用程序中设置订阅端点(这是编程式和声明式订阅所需的),使其在代码中易于配置。流式订阅也不需要应用程序配置 sidecar 来接收消息。
编程式 订阅在应用程序代码中定义。编程式方法实现了静态订阅,并需要在代码中设置一个端点。

下面的示例演示了通过 orders 主题在 checkout 应用程序和 orderprocessing 应用程序之间的发布/订阅消息。示例首先以声明式,然后以编程式演示了相同的 Dapr 发布/订阅组件。

声明式订阅

您可以使用外部组件文件声明性地订阅一个主题。此示例使用名为 subscription.yaml 的 YAML 组件文件:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order
spec:
  topic: orders
  routes:
    default: /orders
  pubsubname: pubsub
scopes:
- orderprocessing

这里的订阅名为 order

  • 使用名为 pubsub 的发布/订阅组件订阅名为 orders 的主题。
  • 设置 route 字段以将所有主题消息发送到应用程序中的 /orders 端点。
  • 设置 scopes 字段以将此订阅的访问范围仅限于 ID 为 orderprocessing 的应用程序。

运行 Dapr 时,设置 YAML 组件文件路径以指向 Dapr 的组件。

dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- npm start
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go

在 Kubernetes 中,将组件应用到集群:

kubectl apply -f subscription.yaml

在您的应用程序代码中,订阅 Dapr 发布/订阅组件中指定的主题。

 //订阅一个主题 
[HttpPost("orders")]
public void getCheckout([FromBody] int orderId)
{
    Console.WriteLine("Subscriber received : " + orderId);
}
import io.dapr.client.domain.CloudEvent;

 //订阅一个主题
@PostMapping(path = "/orders")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
    return Mono.fromRunnable(() -> {
        try {
            log.info("Subscriber received: " + cloudEvent.getData());
        } 
    });
}
from cloudevents.sdk.event import v1

#订阅一个主题 
@app.route('/orders', methods=['POST'])
def checkout(event: v1.Event) -> None:
    data = json.loads(event.Data())
    logging.info('Subscriber received: ' + str(data))
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

// 监听声明式路由
app.post('/orders', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});
//订阅一个主题
var sub = &common.Subscription{
	PubsubName: "pubsub",
	Topic:      "orders",
	Route:      "/orders",
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	log.Printf("Subscriber received: %s", e.Data)
	return false, nil
}

/orders 端点与订阅中定义的 route 匹配,这是 Dapr 发送所有主题消息的地方。

流式订阅

流式订阅是在应用程序代码中定义的订阅,可以在运行时动态停止和启动。 消息由应用程序从 Dapr 拉取。这意味着不需要端点来订阅主题,并且可以在没有任何应用程序配置在 sidecar 上的情况下进行订阅。 可以同时订阅任意数量的发布/订阅和主题。 由于消息被发送到给定的消息处理代码,因此没有路由或批量订阅的概念。

注意: 每个应用程序一次只能订阅一个发布/订阅/主题对。

下面的示例展示了不同的流式订阅主题的方法。

您可以使用 subscribe 方法,该方法返回一个 Subscription 对象,并允许您通过调用 next_message 方法从流中拉取消息。这在主线程中运行,并可能在等待消息时阻塞主线程。

import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError

counter = 0


def process_message(message):
    global counter
    counter += 1
    # 在此处处理消息
    print(f'Processing message: {message.data()} from {message.topic()}...')
    return 'success'


def main():
    with DaprClient() as client:
        global counter

        subscription = client.subscribe(
            pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
        )

        try:
            while counter < 5:
                try:
                    message = subscription.next_message()

                except StreamInactiveError as e:
                    print('Stream is inactive. Retrying...')
                    time.sleep(1)
                    continue
                if message is None:
                    print('No message received within timeout period.')
                    continue

                # 处理消息
                response_status = process_message(message)

                if response_status == 'success':
                    subscription.respond_success(message)
                elif response_status == 'retry':
                    subscription.respond_retry(message)
                elif response_status == 'drop':
                    subscription.respond_drop(message)

        finally:
            print("Closing subscription...")
            subscription.close()


if __name__ == '__main__':
    main()

您还可以使用 subscribe_with_handler 方法,该方法接受一个回调函数,该函数为从流中接收到的每条消息执行。此方法在单独的线程中运行,因此不会阻塞主线程。

import time

from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse

counter = 0


def process_message(message):
    # 在此处处理消息
    global counter
    counter += 1
    print(f'Processing message: {message.data()} from {message.topic()}...')
    return TopicEventResponse('success')


def main():
    with (DaprClient() as client):
        # 这将启动一个新线程,该线程将监听消息
        # 并在 `process_message` 函数中处理它们
        close_fn = client.subscribe_with_handler(
            pubsub_name='pubsub', topic='orders', handler_fn=process_message,
            dead_letter_topic='orders_dead'
        )

        while counter < 5:
            time.sleep(1)

        print("Closing subscription...")
        close_fn()


if __name__ == '__main__':
    main()

了解更多关于使用 Python SDK 客户端的流式订阅。

package main

import (
	"context"
	"log"

	"github.com/dapr/go-sdk/client"
)

func main() {
	cl, err := client.NewClient()
	if err != nil {
		log.Fatal(err)
	}

	sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
		PubsubName: "pubsub",
		Topic:      "orders",
	})
	if err != nil {
		panic(err)
	}
	// 必须始终调用 Close。
	defer sub.Close()

	for {
		msg, err := sub.Receive()
		if err != nil {
			panic(err)
		}

		// 处理事件

		// 我们 _必须_ 始终表示消息处理的结果,否则
		// 消息将不会被视为已处理,并将被重新传递或
		// 死信。
		// msg.Retry()
		// msg.Drop()
		if err := msg.Success(); err != nil {
			panic(err)
		}
	}
}

package main

import (
	"context"
	"log"

	"github.com/dapr/go-sdk/client"
	"github.com/dapr/go-sdk/service/common"
)

func main() {
	cl, err := client.NewClient()
	if err != nil {
		log.Fatal(err)
	}

	stop, err := cl.SubscribeWithHandler(context.Background(),
		client.SubscriptionOptions{
			PubsubName: "pubsub",
			Topic:      "orders",
		},
		eventHandler,
	)
	if err != nil {
		panic(err)
	}

	// 必须始终调用 Stop。
	defer stop()

	<-make(chan struct{})
}

func eventHandler(e *common.TopicEvent) common.SubscriptionResponseStatus {
	// 在此处处理消息
    // common.SubscriptionResponseStatusRetry
    // common.SubscriptionResponseStatusDrop
			common.SubscriptionResponseStatusDrop, status)
	}

	return common.SubscriptionResponseStatusSuccess
}

演示

观看 此视频以了解流式订阅的概述

编程式订阅

动态编程式方法在代码中返回 routes JSON 结构,与声明式方法的 route YAML 结构不同。

注意: 编程式订阅仅在应用程序启动时读取一次。您不能 动态 添加新的编程式订阅,只能在编译时添加新的。

在下面的示例中,您在应用程序代码中定义了在上面的声明式 YAML 订阅中找到的值。

[Topic("pubsub", "orders")]
[HttpPost("/orders")]
public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient)
{
    // 逻辑
    return order;
}

// Dapr 订阅在 [Topic] 中将 orders 主题路由到此路由
app.MapPost("/orders", [Topic("pubsub", "orders")] (Order order) => {
    Console.WriteLine("Subscriber received : " + order);
    return Results.Ok(order);
});

上面定义的两个处理程序还需要映射以配置 dapr/subscribe 端点。这是在定义端点时在应用程序启动代码中完成的。

app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Topic(name = "orders", pubsubName = "pubsub")
@PostMapping(path = "/orders")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  return Mono.fromRunnable(() -> {
    try {
      System.out.println("Subscriber received: " + cloudEvent.getData());
      System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  });
}
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'orders',
        'routes': {
          'rules': [
            {
              'match': 'event.type == "order"',
              'path': '/orders'
            },
          ],
          'default': '/orders'
        }
      }]
    return jsonify(subscriptions)

@app.route('/orders', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "orders",
      routes: {
        rules: [
          {
            match: 'event.type == "order"',
            path: '/orders'
          },
        ],
        default: '/products'
      }
    }
  ]);
})

app.post('/orders', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

type rule struct {
	Match string `json:"match"`
	Path  string `json:"path"`
}

// 处理 /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "orders",
			Routes: routes{
				Rules: []rule{
					{
						Match: `event.type == "order"`,
						Path:  "/orders",
					},
				},
				Default: "/orders",
			},
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}

下一步

7 - 死信主题

通过订阅死信主题来处理无法投递的消息

介绍

在某些情况下,应用程序可能由于各种原因无法处理消息。例如,可能会出现获取处理消息所需数据的临时问题,或者应用程序的业务逻辑失败并返回错误。死信主题用于处理这些无法投递的消息,并将其转发到订阅应用程序。这可以减轻应用程序处理失败消息的负担,使开发人员可以编写代码从死信主题中读取消息,修复后重新发送,或者选择放弃这些消息。

死信主题通常与重试策略和处理死信主题消息的订阅一起使用。

当配置了死信主题时,任何无法投递到应用程序的消息都会被放置在死信主题中,以便转发到处理这些消息的订阅。这可以是同一个应用程序或完全不同的应用程序。

即使底层系统不支持,Dapr 也为其所有的 pubsub 组件启用了死信主题。例如,AWS SNS 组件有一个死信队列,RabbitMQ有死信主题。您需要确保正确配置这些组件。

下图展示了死信主题的工作原理。首先,消息从 orders 主题的发布者发送。Dapr 代表订阅者应用程序接收消息,但 orders 主题的消息未能投递到应用程序的 /checkout 端点,即使经过重试也是如此。由于投递失败,消息被转发到 poisonMessages 主题,该主题将其投递到 /failedMessages 端点进行处理,在这种情况下是在同一个应用程序上。failedMessages 处理代码可以选择丢弃消息或重新发送新消息。

使用声明式订阅配置死信主题

以下 YAML 显示了如何为从 orders 主题消费的消息配置名为 poisonMessages 的死信主题。此订阅的范围限定为具有 checkout ID 的应用程序。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: pubsub
  deadLetterTopic: poisonMessages
scopes:
- checkout

使用流式订阅配置死信主题

	var deadLetterTopic = "poisonMessages"
	sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
		PubsubName:      "pubsub",
		Topic:           "orders",
		DeadLetterTopic: &deadLetterTopic,
	})

使用编程订阅配置死信主题

/subscribe 端点返回的 JSON 显示了如何为从 orders 主题消费的消息配置名为 poisonMessages 的死信主题。

app.get('/dapr/subscribe', (_req, res) => {
    res.json([
        {
            pubsubname: "pubsub",
            topic: "orders",
            route: "/checkout",
            deadLetterTopic: "poisonMessages"
        }
    ]);
});

重试和死信主题

默认情况下,当设置了死信主题时,任何失败的消息会立即进入死信主题。因此,建议在订阅中使用死信主题时始终设置重试策略。 要在将消息发送到死信主题之前启用消息重试,请对 pubsub 组件应用 重试策略

此示例显示了如何为 pubsub pubsub 组件设置名为 pubsubRetry 的常量重试策略,每 5 秒应用一次,最多尝试投递 10 次。

apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
spec:
  policies:
    retries:
      pubsubRetry:
        policy: constant
        duration: 5s
        maxRetries: 10
  targets:
    components:
      pubsub:
        inbound:
          retry: pubsubRetry

配置处理死信主题的订阅

请记得配置一个订阅来处理死信主题。例如,您可以创建另一个声明式订阅,在同一个或不同的应用程序上接收这些消息。下面的示例显示了 checkout 应用程序通过另一个订阅订阅 poisonMessages 主题,并将这些消息发送到 /failedmessages 端点进行处理。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: deadlettertopics
spec:
  topic: poisonMessages
  routes:
    rules:
      - match:
        path: /failedMessages
  pubsubname: pubsub
scopes:
- checkout

演示

观看此视频以了解死信主题的概述:

下一步

8 - 如何:设置 pub/sub 命名空间消费者组

了解如何在组件中使用基于元数据的命名空间消费者组

您已经配置了 Dapr 的 pub/sub API 构建块,并且您的应用程序正在使用集中式消息代理顺利地发布和订阅主题。如果您想为应用程序执行简单的 A/B 测试、蓝/绿部署,甚至金丝雀部署,该怎么办?即使使用 Dapr,这也可能很困难。

Dapr 通过其 pub/sub 命名空间消费者组机制解决了大规模的多租户问题。

没有命名空间消费者组

假设您有一个 Kubernetes 集群,其中两个应用程序(App1 和 App2)部署在同一个命名空间(namespace-a)中。App2 发布到一个名为 order 的主题,而 App1 订阅名为 order 的主题。这将创建两个以您的应用程序命名的消费者组(App1 和 App2)。

显示基本 pubsub 过程的图示。

为了在使用集中式消息代理时进行简单的测试和部署,您创建了另一个命名空间,其中包含两个具有相同 app-id 的应用程序,App1 和 App2。

Dapr 使用单个应用程序的 app-id 创建消费者组,因此消费者组名称将保持为 App1 和 App2。

显示没有 Dapr 命名空间消费者组的多租户复杂性的图示。

为了避免这种情况,您需要在代码中“潜入”一些东西来更改 app-id,具体取决于您运行的命名空间。这种方法既麻烦又容易出错。

使用命名空间消费者组

Dapr 不仅允许您使用 UUID 和 pod 名称的 consumerID 更改消费者组的行为,还提供了一个存在于 pub/sub 组件元数据中的 命名空间机制。例如,使用 Redis 作为您的消息代理:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: consumerID
    value: "{namespace}"

通过将 consumerID 配置为 {namespace} 值,您可以在不同的命名空间中使用相同的 app-id 和相同的主题。

显示命名空间消费者组如何帮助多租户的图示。

在上图中,您有两个命名空间,每个命名空间都有相同 app-id 的应用程序,发布和订阅相同的集中式消息代理 orders。然而这次,Dapr 创建了以它们运行的命名空间为前缀的消费者组名称。

无需更改您的代码或 app-id,命名空间消费者组允许您:

  • 添加更多命名空间
  • 保持相同的主题
  • 在命名空间之间保持相同的 app-id
  • 保持整个部署管道完整

只需在您的组件元数据中包含 "{namespace}" 消费者组机制。您无需在元数据中手动编码命名空间。Dapr 会自动识别其运行的命名空间并为您填充命名空间值,就像由运行时注入的动态元数据值一样。

演示

观看 此视频以了解 pub/sub 多租户的概述

下一步

9 - 如何:使用StatefulSets水平扩展订阅者

学习如何使用StatefulSet进行订阅,并通过一致的消费者ID水平扩展

与在Deployments中Pod是临时的不同,StatefulSets通过为每个Pod保持固定的身份,使得在Kubernetes上可以部署有状态应用程序。

以下是一个使用Dapr的StatefulSet示例:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: python-subscriber
spec:
  selector:
    matchLabels:
      app: python-subscriber  # 必须匹配.spec.template.metadata.labels
  serviceName: "python-subscriber"
  replicas: 3
  template:
    metadata:
      labels:
        app: python-subscriber # 必须匹配.spec.selector.matchLabels
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "python-subscriber"
        dapr.io/app-port: "5001"
    spec:
      containers:
      - name: python-subscriber
        image: ghcr.io/dapr/samples/pubsub-python-subscriber:latest
        ports:
        - containerPort: 5001
        imagePullPolicy: Always

在通过Dapr订阅pubsub主题时,应用程序可以定义一个consumerID,这个ID决定了订阅者在队列或主题中的位置。利用StatefulSets中Pod的固定身份,您可以为每个Pod分配一个唯一的consumerID,从而实现订阅者应用程序的水平扩展。Dapr会跟踪每个Pod的名称,并可以在组件中使用{podName}标记来声明。

当扩展某个主题的订阅者数量时,每个Dapr组件都有特定的设置来决定其行为。通常,对于多个消费者有两种选择:

  • 广播:发布到主题的每条消息将被所有订阅者接收。
  • 共享:一条消息仅由一个订阅者接收(而不是所有订阅者)。

Kafka通过consumerID为每个订阅者分配独立的位置。当实例重新启动时,它会使用相同的consumerID继续从上次的位置处理消息,而不会遗漏任何消息。以下组件示例展示了如何让多个Pod使用Kafka组件:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers
    value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
  - name: consumerID
    value: "{podName}"
  - name: authRequired
    value: "false"

MQTT3协议支持共享主题,允许多个订阅者“竞争”处理来自主题的消息,这意味着每条消息仅由其中一个订阅者处理。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: consumerID
      value: "{podName}"
    - name: cleanSession
      value: "true"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"

下一步

10 - 限制 Pub/sub 主题访问

通过范围控制将 pub/sub 主题限制为特定应用程序

介绍

命名空间或组件范围可以用来限制组件的访问权限,使其仅对特定应用程序可用。这些应用程序范围的设置确保只有具有特定 ID 的应用程序才能使用该组件。

除了这种通用的组件范围外,还可以对 pub/sub 组件进行以下限制:

  • 哪些主题可以被使用(发布或订阅)
  • 哪些应用程序被允许发布到特定主题
  • 哪些应用程序被允许订阅特定主题

这被称为 pub/sub 主题范围控制

为每个 pub/sub 组件定义 pub/sub 范围。您可能有一个名为 pubsub 的 pub/sub 组件,它有一组范围,另一个 pubsub2 则有不同的范围。

要使用此主题范围,可以为 pub/sub 组件设置三个元数据属性:

  • spec.metadata.publishingScopes
    • 一个以分号分隔的应用程序列表和以逗号分隔的主题列表,允许该应用程序发布到该主题列表
    • 如果在 publishingScopes 中未指定任何内容(默认行为),则所有应用程序都可以发布到所有主题
    • 要拒绝应用程序发布到任何主题的能力,请将主题列表留空(app1=;app2=topic2
    • 例如,app1=topic1;app2=topic2,topic3;app3= 将允许 app1 仅发布到 topic1,app2 仅发布到 topic2 和 topic3,app3 则不能发布到任何主题。
  • spec.metadata.subscriptionScopes
    • 一个以分号分隔的应用程序列表和以逗号分隔的主题列表,允许该应用程序订阅该主题列表
    • 如果在 subscriptionScopes 中未指定任何内容(默认行为),则所有应用程序都可以订阅所有主题
    • 例如,app1=topic1;app2=topic2,topic3 将允许 app1 仅订阅 topic1,app2 仅订阅 topic2 和 topic3
  • spec.metadata.allowedTopics
    • 一个为所有应用程序允许的主题的逗号分隔列表。
    • 如果未设置 allowedTopics(默认行为),则所有主题都是有效的。如果存在,subscriptionScopespublishingScopes 仍然生效。
    • publishingScopessubscriptionScopes 可以与 allowedTopics 结合使用以添加细粒度限制
  • spec.metadata.protectedTopics
    • 一个为所有应用程序保护的主题的逗号分隔列表。
    • 如果一个主题被标记为保护,则必须通过 publishingScopessubscriptionScopes 明确授予应用程序发布或订阅权限才能发布/订阅该主题。

这些元数据属性可用于所有 pub/sub 组件。以下示例使用 Redis 作为 pub/sub 组件。

示例 1:限制主题访问

在某些情况下,限制哪些应用程序可以发布/订阅主题是有用的,例如当您有包含敏感信息的主题时,只有一部分应用程序被允许发布或订阅这些主题。

它也可以用于所有主题,以始终拥有一个“真实来源”,以了解哪些应用程序作为发布者/订阅者使用哪些主题。

以下是三个应用程序和三个主题的示例:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: publishingScopes
    value: "app1=topic1;app2=topic2,topic3;app3="
  - name: subscriptionScopes
    value: "app2=;app3=topic1"

下表显示了哪些应用程序被允许发布到主题:

topic1 topic2 topic3
app1
app2
app3

下表显示了哪些应用程序被允许订阅主题:

topic1 topic2 topic3
app1
app2
app3

注意:如果应用程序未列出(例如 subscriptionScopes 中的 app1),则允许其订阅所有主题。因为未使用 allowedTopics,且 app1 没有任何订阅范围,它也可以使用上面未列出的其他主题。

示例 2:限制允许的主题

如果 Dapr 应用程序向其发送消息,则会创建一个主题。在某些情况下,这种主题创建应该受到管理。例如:

  • Dapr 应用程序在生成主题名称时的错误可能导致创建无限数量的主题
  • 精简主题名称和总数,防止主题无限增长

在这些情况下可以使用 allowedTopics

以下是三个允许的主题的示例:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: allowedTopics
    value: "topic1,topic2,topic3"

所有应用程序都可以使用这些主题,但仅限于这些主题,不允许其他主题。

示例 3:结合 allowedTopics 和范围

有时您希望结合两者范围,从而仅拥有一组固定的允许主题,并指定对某些应用程序的范围。

以下是三个应用程序和两个主题的示例:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: allowedTopics
    value: "A,B"
  - name: publishingScopes
    value: "app1=A"
  - name: subscriptionScopes
    value: "app1=;app2=A"

注意:第三个应用程序未列出,因为如果应用程序未在范围内指定,则允许其使用所有主题。

下表显示了哪个应用程序被允许发布到主题:

A B C
app1
app2
app3

下表显示了哪个应用程序被允许订阅主题:

A B C
app1
app2
app3

示例 4:将主题标记为保护

如果您的主题涉及敏感数据,则每个新应用程序必须在 publishingScopessubscriptionScopes 中明确列出,以确保其无法读取或写入该主题。或者,您可以将主题指定为“保护”(使用 protectedTopics),并仅授予真正需要的特定应用程序访问权限。

以下是三个应用程序和三个主题的示例,其中两个主题是保护的:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: protectedTopics
    value: "A,B"
  - name: publishingScopes
    value: "app1=A,B;app2=B"
  - name: subscriptionScopes
    value: "app1=A,B;app2=B"

在上面的示例中,主题 A 和 B 被标记为保护。因此,即使 app3 未列在 publishingScopessubscriptionScopes 中,它也无法与这些主题交互。

下表显示了哪个应用程序被允许发布到主题:

A B C
app1
app2
app3

下表显示了哪个应用程序被允许订阅主题:

A B C
app1
app2
app3

演示

下一步

11 - 消息生存时间 (TTL)

在发布/订阅消息中使用生存时间。

介绍

Dapr 支持为每条消息设置生存时间 (TTL)。这意味着应用程序可以为每条消息指定生存时间,过期后订阅者将不会收到这些消息。

所有 Dapr 发布/订阅组件 都兼容消息 TTL,因为 Dapr 在运行时内处理 TTL 逻辑。只需在发布消息时设置 ttlInSeconds 元数据即可。

在某些组件中,例如 Kafka,可以通过 retention.ms 在主题中配置生存时间,详见文档。使用 Dapr 的消息 TTL,使用 Kafka 的应用程序现在可以为每条消息设置生存时间,而不仅限于每个主题。

原生消息 TTL 支持

当发布/订阅组件原生支持消息生存时间时,Dapr 仅转发生存时间配置而不添加额外逻辑,保持行为的可预测性。这在组件以不同方式处理过期消息时非常有用。例如,在 Azure Service Bus 中,过期消息会被存储在死信队列中,而不是简单地删除。

支持的组件

Azure Service Bus

Azure Service Bus 支持实体级别的生存时间。这意味着消息有默认的生存时间,但也可以在发布时设置为更短的时间跨度。Dapr 传播消息的生存时间元数据,并让 Azure Service Bus 直接处理过期。

非 Dapr 订阅者

如果消息由不使用 Dapr 的订阅者消费,过期消息不会自动丢弃,因为过期是由 Dapr 运行时在 Dapr sidecar 接收到消息时处理的。然而,订阅者可以通过在云事件中添加逻辑来处理 expiration 属性,以编程方式丢弃过期消息,该属性遵循 RFC3339 格式。

当非 Dapr 订阅者使用诸如 Azure Service Bus 等原生处理消息 TTL 的组件时,他们不会收到过期消息。在这种情况下,不需要额外的逻辑。

示例

消息 TTL 可以在发布请求的元数据中设置:

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.ttlInSeconds=120 -H "Content-Type: application/json" -d '{"order-number": "345"}'
from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order-number': '345'
    }
    # 创建一个带有内容类型和主体的类型化消息
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic='TOPIC_A',
        data=json.dumps(req_data),
        publish_metadata={'ttlInSeconds': '120'}
    )
    # 打印请求
    print(req_data, flush=True)
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
    $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
    $publisher->topic('TOPIC_A')->publish('data', ['ttlInSeconds' => '120']);
});

请参阅本指南以获取发布/订阅 API 的参考。

下一步

12 - 批量发布和订阅消息

了解如何在Dapr中使用批量发布和订阅API。

通过批量发布和订阅API,您可以在单个请求中发布和订阅多个消息。在开发需要发送或接收大量消息的应用程序时,使用批量操作可以通过减少Dapr sidecar、应用程序和底层pubsub代理之间的请求总数来提高吞吐量。

批量发布消息

批量发布消息时的限制

批量发布API允许您通过单个请求将多个消息发布到一个主题。它是非事务性的,这意味着在一个批量请求中,某些消息可能会成功发布,而某些可能会失败。如果有消息发布失败,批量发布操作将返回失败消息的列表。

批量发布操作不保证消息的顺序。

示例

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;

class BulkPublisher {
  private static final String PUBSUB_NAME = "my-pubsub-name";
  private static final String TOPIC_NAME = "topic-a";

  public void publishMessages() {
    try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
      // 创建要发布的消息列表
      List<String> messages = new ArrayList<>();
      for (int i = 0; i < 10; i++) {
        String message = String.format("这是消息 #%d", i);
        messages.add(message);
      }

      // 使用批量发布API发布消息列表
      BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
    }
  }
}

import { DaprClient } from "@dapr/dapr";

const pubSubName = "my-pubsub-name";
const topic = "topic-a";

async function start() {
    const client = new DaprClient();

    // 向主题发布多个消息。
    await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);

    // 使用显式批量发布消息向主题发布多个消息。
    const bulkPublishMessages = [
    {
      entryID: "entry-1",
      contentType: "application/json",
      event: { hello: "foo message 1" },
    },
    {
      entryID: "entry-2",
      contentType: "application/cloudevents+json",
      event: {
        specversion: "1.0",
        source: "/some/source",
        type: "example",
        id: "1234",
        data: "foo message 2",
        datacontenttype: "text/plain"
      },
    },
    {
      entryID: "entry-3",
      contentType: "text/plain",
      event: "foo message 3",
    },
  ];
  await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
using System;
using System.Collections.Generic;
using Dapr.Client;

const string PubsubName = "my-pubsub-name";
const string TopicName = "topic-a";
IReadOnlyList<object> BulkPublishData = new List<object>() {
    new { Id = "17", Amount = 10m },
    new { Id = "18", Amount = 20m },
    new { Id = "19", Amount = 30m }
};

using var client = new DaprClientBuilder().Build();

var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
if (res == null) {
    throw new Exception("从dapr返回的响应为空");
}
if (res.FailedEntries.Count > 0)
{
    Console.WriteLine("某些事件发布失败!");
    foreach (var failedEntry in res.FailedEntries)
    {
        Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " 错误信息: " +
                          failedEntry.ErrorMessage);
    }
}
else
{
    Console.WriteLine("所有事件已发布!");
}
import requests
import json

base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
pubsub_name = "my-pubsub-name"
topic_name = "topic-a"
payload = [
  {
    "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
    "event": "first text message",
    "contentType": "text/plain"
  },
  {
    "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
    "event": {
      "message": "second JSON message"
    },
    "contentType": "application/json"
  }
]

response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
print(response.status_code)
package main

import (
  "fmt"
  "strings"
  "net/http"
  "io/ioutil"
)

const (
  pubsubName = "my-pubsub-name"
  topicName = "topic-a"
  baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
)

func main() {
  url := fmt.Sprintf(baseUrl, pubsubName, topicName)
  method := "POST"
  payload := strings.NewReader(`[
        {
            "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
            "event":  "first text message",
            "contentType": "text/plain"
        },
        {
            "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
            "event":  {
                "message": "second JSON message"
            },
            "contentType": "application/json"
        }
]`)

  client := &http.Client {}
  req, _ := http.NewRequest(method, url, payload)

  req.Header.Add("Content-Type", "application/json")
  res, err := client.Do(req)
  // ...
}
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
  -H 'Content-Type: application/json' \
  -d '[
        {
            "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
            "event":  "first text message",
            "contentType": "text/plain"
        },
        {
            "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
            "event":  {
                "message": "second JSON message"
            },
            "contentType": "application/json"
        },
      ]'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
-Body '[
        {
            "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
            "event":  "first text message",
            "contentType": "text/plain"
        },
        {
            "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
            "event":  {
                "message": "second JSON message"
            },
            "contentType": "application/json"
        },
      ]'

批量订阅消息

批量订阅API允许您在单个请求中从一个主题订阅多个消息。正如我们从如何:发布和订阅主题中所知,有三种方式可以订阅主题:

  • 声明式 - 订阅在外部文件中定义。
  • 编程式 - 订阅在代码中定义。
  • 流式 - 不支持批量订阅,因为消息被发送到处理程序代码。

要批量订阅主题,我们只需使用bulkSubscribe规范属性,如下所示:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes:
    default: /checkout
  pubsubname: order-pub-sub
  bulkSubscribe:
    enabled: true
    maxMessagesCount: 100
    maxAwaitDurationMs: 40
scopes:
- orderprocessing
- checkout

在上面的示例中,bulkSubscribe是_可选的_。如果您使用bulkSubscribe,那么:

  • enabled是必需的,用于启用或禁用此主题的批量订阅。
  • 您可以选择配置批量消息中传递的最大消息数(maxMessagesCount)。 对于不支持批量订阅的组件,maxMessagesCount的默认值为100,即应用程序和Dapr之间的默认批量事件。请参阅组件如何处理发布和订阅批量消息。 如果组件支持批量订阅,则该参数的默认值可以在该组件文档中找到。
  • 您可以选择提供在批量消息发送到应用程序之前的最大等待时间(maxAwaitDurationMs)。 对于不支持批量订阅的组件,maxAwaitDurationMs的默认值为1000,即应用程序和Dapr之间的默认批量事件。请参阅组件如何处理发布和订阅批量消息。 如果组件支持批量订阅,则该参数的默认值可以在该组件文档中找到。

应用程序接收与批量消息中的每个条目(单个消息)关联的EntryId。应用程序必须使用此EntryId来传达该特定条目的状态。如果应用程序未能通知EntryId状态,则被视为RETRY

需要发送一个带有每个条目处理状态的JSON编码的有效负载体:

{
  "statuses":
  [
    {
    "entryId": "<entryId1>",
    "status": "<status>"
    },
    {
    "entryId": "<entryId2>",
    "status": "<status>"
    }
  ]
}

可能的状态值:

状态 描述
SUCCESS 消息处理成功
RETRY 消息由Dapr重试
DROP 记录警告并丢弃消息

请参阅批量订阅的预期HTTP响应以获取更多见解。

示例

以下代码示例演示如何使用批量订阅。

import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import reactor.core.publisher.Mono;

class BulkSubscriber {
  @BulkSubscribe()
  // @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
  @Topic(name = "topicbulk", pubsubName = "orderPubSub")
  @PostMapping(path = "/topicbulk")
  public Mono<BulkSubscribeAppResponse> handleBulkMessage(
          @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
    return Mono.fromCallable(() -> {
      List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
      for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
        try {
          CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
          System.out.printf("批量订阅者收到: %s\n", cloudEvent.getData());
          entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
        } catch (Exception e) {
          e.printStackTrace();
          entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
        }
      }
      return new BulkSubscribeAppResponse(entries);
    });
  }
}

import { DaprServer } from "@dapr/dapr";

const pubSubName = "orderPubSub";
const topic = "topicbulk";

const daprHost = process.env.DAPR_HOST || "127.0.0.1";
const daprPort = process.env.DAPR_HTTP_PORT || "3502";
const serverHost = process.env.SERVER_HOST || "127.0.0.1";
const serverPort = process.env.APP_PORT || 5001;

async function start() {
    const server = new DaprServer({
        serverHost,
        serverPort,
        clientOptions: {
            daprHost,
            daprPort,
        },
    });

    // 使用默认配置向主题发布多个消息。
    await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("订阅者收到: " + JSON.stringify(data)));

    // 使用特定的maxMessagesCount和maxAwaitDurationMs向主题发布多个消息。
    await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("订阅者收到: " + JSON.stringify(data)), 100, 40);
}
using Microsoft.AspNetCore.Mvc;
using Dapr.AspNetCore;
using Dapr;

namespace DemoApp.Controllers;

[ApiController]
[Route("[controller]")]
public class BulkMessageController : ControllerBase
{
    private readonly ILogger<BulkMessageController> logger;

    public BulkMessageController(ILogger<BulkMessageController> logger)
    {
        this.logger = logger;
    }

    [BulkSubscribe("messages", 10, 10)]
    [Topic("pubsub", "messages")]
    public ActionResult<BulkSubscribeAppResponse> HandleBulkMessages([FromBody] BulkSubscribeMessage<BulkMessageModel<BulkMessageModel>> bulkMessages)
    {
        List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>();
        logger.LogInformation($"收到 {bulkMessages.Entries.Count()} 条消息");
        foreach (var message in bulkMessages.Entries)
        {
            try
            {
                logger.LogInformation($"收到一条数据为 '{message.Event.Data.MessageData}' 的消息");
                responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
            }
            catch (Exception e)
            {
                logger.LogError(e.Message);
                responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.RETRY));
            }
        }
        return new BulkSubscribeAppResponse(responseEntries);
    }
    public class BulkMessageModel
    {
        public string MessageData { get; set; }
    }
}

目前,您只能使用HTTP客户端在Python中进行批量订阅。

import json
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    # 定义批量订阅配置
    subscriptions = [{
        "pubsubname": "pubsub",
        "topic": "TOPIC_A",
        "route": "/checkout",
        "bulkSubscribe": {
            "enabled": True,
            "maxMessagesCount": 3,
            "maxAwaitDurationMs": 40
        }
    }]
    print('Dapr pub/sub已订阅: ' + json.dumps(subscriptions))
    return jsonify(subscriptions)


# 定义处理传入消息的端点
@app.route('/checkout', methods=['POST'])
def checkout():
    messages = request.json
    print(messages)
    for message in messages:
        print(f"收到消息: {message}")
    return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}

if __name__ == '__main__':
    app.run(port=5000)

组件如何处理发布和订阅批量消息

对于事件发布/订阅,涉及两种网络传输。

  1. 从/到应用程序到/从Dapr
  2. 从/到Dapr到/从pubsub代理

这些是可以进行优化的机会。当优化时,进行批量请求,从而减少总体调用次数,从而提高吞吐量并提供更好的延迟。

启用批量发布和/或批量订阅时,应用程序和Dapr sidecar之间的通信(上面第1点)针对所有组件进行了优化。

从Dapr sidecar到pubsub代理的优化取决于许多因素,例如:

  • 代理必须本质上支持批量pubsub
  • Dapr组件必须更新以支持代理提供的批量API的使用

目前,以下组件已更新以支持此级别的优化:

组件 批量发布 批量订阅
Kafka
Azure Servicebus
Azure Eventhubs

演示

观看以下关于批量pubsub的演示和演讲。

KubeCon Europe 2023 演讲

Dapr社区电话#77 演讲

相关链接