Dapr 发布/订阅的更多信息
了解如何使用 Dapr 进行发布/订阅:
- 试试 发布/订阅快速入门。
- 通过支持的 Dapr SDKs 探索发布/订阅功能。
- 查看 发布/订阅 API 参考文档。
- 浏览支持的 发布/订阅组件说明。
This is the multi-page printable view of this section. Click here to print.
了解如何使用 Dapr 进行发布/订阅:
发布-订阅模式(pubsub)使微服务能够通过消息进行事件驱动的架构通信。
消息代理会将每条消息从发布者的输入通道复制到所有对该消息感兴趣的订阅者的输出通道。这种模式在需要将微服务解耦时特别有用。
在 Dapr 中,pubsub API:
您可以在运行时配置 Dapr pubsub 组件来使用特定的消息代理,这种可插拔性使您的服务更具可移植性和灵活性。
在 Dapr 中使用 pubsub 时:
以下概述视频和演示展示了 Dapr pubsub 的工作原理。
在下图中,“shipping”服务和“email”服务都已订阅由“cart”服务发布的主题。每个服务加载指向相同 pubsub 消息代理组件的 pubsub 组件配置文件;例如:Redis Streams、NATS Streaming、Azure Service Bus 或 GCP pubsub。
在下图中,Dapr API 将“cart”服务的“order”主题发布到“shipping”和“email”订阅服务的“order”端点。
pubsub API 构建块为您的应用程序带来了多个特性。
为了启用消息路由并为服务之间的每条消息提供额外的上下文,Dapr 使用 CloudEvents 1.0 规范 作为其消息格式。任何应用程序通过 Dapr 发送到主题的消息都会自动包装在 Cloud Events 信封中,使用 Content-Type
头值 作为 datacontenttype
属性。
有关更多信息,请阅读 使用 CloudEvents 进行消息传递,或 发送不带 CloudEvents 的原始消息。
如果您的一个应用程序使用 Dapr 而另一个不使用,您可以为发布者或订阅者禁用 CloudEvent 包装。这允许在无法一次性采用 Dapr 的应用程序中部分采用 Dapr pubsub。
有关更多信息,请阅读 如何在没有 CloudEvents 的情况下使用 pubsub。
发布消息时,指定发送数据的内容类型很重要。除非指定,否则 Dapr 将假定为 text/plain
。
Content-Type
头中设置内容类型原则上,Dapr 认为消息一旦被订阅者处理并以非错误响应进行响应,就已成功传递。为了更细粒度的控制,Dapr 的 pubsub API 还提供了明确的状态,定义在响应负载中,订阅者可以用这些状态向 Dapr 指示特定的处理指令(例如,RETRY
或 DROP
)。
Dapr 应用程序可以通过支持相同功能的三种订阅类型订阅已发布的主题:声明式、流式和编程式。
订阅类型 | 描述 |
---|---|
声明式 | 订阅在外部文件中定义。声明式方法消除了代码中的 Dapr 依赖性,并允许现有应用程序订阅主题,而无需更改代码。 |
流式 | 订阅在用户代码中定义。流式订阅是动态的,意味着它们允许在运行时添加或删除订阅。它们不需要应用程序中的订阅端点(这是编程式和声明式订阅所需的),使其易于在代码中配置。流式订阅也不需要应用程序配置 sidecar 来接收消息。由于消息被发送到消息处理程序代码,因此流式订阅中没有路由或批量订阅的概念。 |
编程式 | 订阅在用户代码中定义。编程式方法实现了静态订阅,并需要在代码中有一个端点。 |
有关更多信息,请阅读 关于订阅类型的订阅。
要重新加载以编程方式或声明式定义的主题订阅,需要重新启动 Dapr sidecar。
通过启用 HotReload
功能门,可以使 Dapr sidecar 动态重新加载更改的声明式主题订阅,而无需重新启动。
主题订阅的热重载目前是一个预览功能。
重新加载订阅时,正在传输的消息不受影响。
Dapr 提供 基于内容的路由 模式。Pubsub 路由 是此模式的实现,允许开发人员使用表达式根据其内容将 CloudEvents 路由到应用程序中的不同 URI/路径和事件处理程序。如果没有路由匹配,则使用可选的默认路由。随着您的应用程序扩展以支持多个事件版本或特殊情况,这很有用。
此功能适用于声明式和编程式订阅方法。
有关消息路由的更多信息,请阅读 Dapr pubsub API 参考
有时,由于各种可能的问题,例如生产者或消费者应用程序中的错误条件或导致应用程序代码出现问题的意外状态更改,消息无法被处理。Dapr 允许开发人员设置死信主题来处理无法传递到应用程序的消息。此功能适用于所有 pubsub 组件,并防止消费者应用程序无休止地重试失败的消息。有关更多信息,请阅读 死信主题
Dapr 使开发人员能够使用外发模式在事务性状态存储和任何消息代理之间实现单一事务。有关更多信息,请阅读 如何启用事务性外发消息
Dapr 通过 命名空间消费者组 解决大规模多租户问题。只需在组件元数据中包含 "{namespace}"
值,即可允许具有相同 app-id
的多个命名空间的应用程序发布和订阅相同的消息代理。
Dapr 保证消息传递的至少一次语义。当应用程序使用 pubsub API 向主题发布消息时,Dapr 确保消息至少一次传递给每个订阅者。
即使消息传递失败,或者您的应用程序崩溃,Dapr 也会尝试重新传递消息,直到成功传递。
所有 Dapr pubsub 组件都支持至少一次保证。
Dapr 处理消费者组和竞争消费者模式的负担。在竞争消费者模式中,使用单个消费者组的多个应用程序实例竞争消息。当副本使用相同的 app-id
而没有显式消费者组覆盖时,Dapr 强制执行竞争消费者模式。
当同一应用程序的多个实例(具有相同的 app-id
)订阅一个主题时,Dapr 将每条消息仅传递给该应用程序的一个实例。此概念在下图中进行了说明。
同样,如果两个不同的应用程序(具有不同的 app-id
)订阅同一主题,Dapr 将每条消息仅传递给每个应用程序的一个实例。
并非所有 Dapr pubsub 组件都支持竞争消费者模式。目前,以下(非详尽)pubsub 组件支持此功能:
默认情况下,与 pubsub 组件实例关联的所有主题消息对配置了该组件的每个应用程序都是可用的。您可以使用 Dapr 主题范围限制哪个应用程序可以发布或订阅主题。有关更多信息,请阅读:pubsub 主题范围。
Dapr 可以在每条消息的基础上设置超时消息,这意味着如果消息未从 pubsub 组件中读取,则消息将被丢弃。此超时消息可防止未读消息的积累。如果消息在队列中的时间超过配置的 TTL,则标记为死信。有关更多信息,请阅读 pubsub 消息 TTL。
Dapr 支持在单个请求中发送和接收多条消息。当编写需要发送或接收大量消息的应用程序时,使用批量操作可以通过减少请求总数来实现高吞吐量。有关更多信息,请阅读 pubsub 批量消息。
在 Kubernetes 上运行时,使用 StatefulSets 结合 {podName}
标记,订阅者可以为每个实例拥有一个粘性 consumerID
。请参阅 如何使用 StatefulSets 水平扩展订阅者。
想要测试 Dapr pubsub API 吗?通过以下快速入门和教程来查看 pubsub 的实际应用:
快速入门/教程 | 描述 |
---|---|
Pubsub 快速入门 | 使用发布和订阅 API 发送和接收消息。 |
Pubsub 教程 | 演示如何使用 Dapr 启用 pubsub 应用程序。使用 Redis 作为 pubsub 组件。 |
想要跳过快速入门?没问题。您可以直接在应用程序中试用 pubsub 构建块来发布消息并订阅主题。在 安装 Dapr 后,您可以从 pubsub 如何指南 开始使用 pubsub API。
既然您已经了解了Dapr pubsub构建块的功能,接下来我们来看看如何在您的服务中应用它。下面的代码示例描述了一个使用两个服务处理订单的应用程序,每个服务都有Dapr sidecar:
Dapr会自动将用户的负载封装在符合CloudEvents v1.0的格式中,并使用Content-Type
头的值作为datacontenttype
属性。了解更多关于CloudEvents的消息。
以下示例展示了如何在您的应用程序中发布和订阅名为orders
的主题。
第一步是设置pubsub组件:
当您运行dapr init
时,Dapr会创建一个默认的Redis pubsub.yaml
并在您的本地机器上运行一个Redis容器,位置如下:
%UserProfile%\.dapr\components\pubsub.yaml
~/.dapr/components/pubsub.yaml
使用pubsub.yaml
组件,您可以轻松地更换底层组件而无需更改应用程序代码。在此示例中,使用RabbitMQ。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: order-pub-sub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
scopes:
- orderprocessing
- checkout
您可以通过创建一个包含该文件的组件目录(在此示例中为myComponents
)并使用dapr run
CLI命令的--resources-path
标志来覆盖此文件。
dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
dapr run --app-id myapp --resources-path ./myComponents -- npm start
要将其部署到Kubernetes集群中,请填写以下YAML中的pub/sub组件的metadata
连接详细信息,保存为pubsub.yaml
,然后运行kubectl apply -f pubsub.yaml
。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: order-pub-sub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: connectionString
value: "amqp://localhost:5672"
- name: protocol
value: amqp
- name: hostname
value: localhost
- name: username
value: username
- name: password
value: password
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
scopes:
- orderprocessing
- checkout
Dapr提供了三种方法来订阅主题:
在声明式、流式和编程式订阅文档中了解更多信息。此示例演示了声明式订阅。
创建一个名为subscription.yaml
的文件并粘贴以下内容:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order-pub-sub
spec:
topic: orders
routes:
default: /checkout
pubsubname: order-pub-sub
scopes:
- orderprocessing
- checkout
上面的示例显示了对主题orders
的事件订阅,针对pubsub组件order-pub-sub
。
route
字段指示Dapr将所有主题消息发送到应用程序中的/checkout
端点。scopes
字段指定此订阅适用于ID为orderprocessing
和checkout
的应用程序。将subscription.yaml
放在与您的pubsub.yaml
组件相同的目录中。当Dapr启动时,它会加载订阅和组件。
HotReload
功能门启用。
为了防止重新处理或丢失未处理的消息,在Dapr和您的应用程序之间的飞行消息在热重载事件期间不受影响。
以下是利用Dapr SDK订阅您在subscription.yaml
中定义的主题的代码示例。
//依赖项
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
using Dapr;
using Dapr.Client;
//代码
namespace CheckoutService.controller
{
[ApiController]
public class CheckoutServiceController : Controller
{
//订阅一个主题
[Topic("order-pub-sub", "orders")]
[HttpPost("checkout")]
public void getCheckout([FromBody] int orderId)
{
Console.WriteLine("订阅者接收到 : " + orderId);
}
}
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-protocol https dotnet run
//依赖项
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
//代码
@RestController
public class CheckoutServiceController {
private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
//订阅一个主题
@Topic(name = "orders", pubsubName = "order-pub-sub")
@PostMapping(path = "/checkout")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
log.info("订阅者接收到: " + cloudEvent.getData());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
#依赖项
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import logging
import json
#代码
app = App()
logging.basicConfig(level = logging.INFO)
#订阅一个主题
@app.subscribe(pubsub_name='order-pub-sub', topic='orders')
def mytopic(event: v1.Event) -> None:
data = json.loads(event.Data())
logging.info('订阅者接收到: ' + str(data))
app.run(6002)
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
//依赖项
import (
"log"
"net/http"
"context"
"github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/http"
)
//代码
var sub = &common.Subscription{
PubsubName: "order-pub-sub",
Topic: "orders",
Route: "/checkout",
}
func main() {
s := daprd.NewService(":6002")
//订阅一个主题
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
log.Fatalf("添加主题订阅时出错: %v", err)
}
if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("监听时出错: %v", err)
}
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("订阅者接收到: %s", e.Data)
return false, nil
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
//依赖项
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
//代码
const daprHost = "127.0.0.1";
const serverHost = "127.0.0.1";
const serverPort = "6002";
start().catch((e) => {
console.error(e);
process.exit(1);
});
async function start(orderId) {
const server = new DaprServer({
serverHost,
serverPort,
communicationProtocol: CommunicationProtocolEnum.HTTP,
clientOptions: {
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
},
});
//订阅一个主题
await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
console.log(`订阅者接收到: ${JSON.stringify(orderId)}`)
});
await server.start();
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和订阅者应用程序:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start
启动一个名为orderprocessing
的Dapr实例:
dapr run --app-id orderprocessing --dapr-http-port 3601
然后向orders
主题发布消息:
dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'
以下是利用Dapr SDK发布主题的代码示例。
//依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
//代码
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string PUBSUB_NAME = "order-pub-sub";
string TOPIC_NAME = "orders";
while(true) {
System.Threading.Thread.Sleep(5000);
Random random = new Random();
int orderId = random.Next(1,1000);
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using var client = new DaprClientBuilder().Build();
//使用Dapr SDK发布主题
await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
Console.WriteLine("发布的数据: " + orderId);
}
}
}
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-protocol https dotnet run
//依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;
//代码
@SpringBootApplication
public class OrderProcessingServiceApplication {
private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
public static void main(String[] args) throws InterruptedException{
String MESSAGE_TTL_IN_SECONDS = "1000";
String TOPIC_NAME = "orders";
String PUBSUB_NAME = "order-pub-sub";
while(true) {
TimeUnit.MILLISECONDS.sleep(5000);
Random random = new Random();
int orderId = random.nextInt(1000-1) + 1;
DaprClient client = new DaprClientBuilder().build();
//使用Dapr SDK发布主题
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
orderId,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
log.info("发布的数据:" + orderId);
}
}
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
#依赖项
import random
from time import sleep
import requests
import logging
import json
from dapr.clients import DaprClient
#代码
logging.basicConfig(level = logging.INFO)
while True:
sleep(random.randrange(50, 5000) / 1000)
orderId = random.randint(1, 1000)
PUBSUB_NAME = 'order-pub-sub'
TOPIC_NAME = 'orders'
with DaprClient() as client:
#使用Dapr SDK发布主题
result = client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=TOPIC_NAME,
data=json.dumps(orderId),
data_content_type='application/json',
)
logging.info('发布的数据: ' + str(orderId))
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
//依赖项
import (
"context"
"log"
"math/rand"
"time"
"strconv"
dapr "github.com/dapr/go-sdk/client"
)
//代码
var (
PUBSUB_NAME = "order-pub-sub"
TOPIC_NAME = "orders"
)
func main() {
for i := 0; i < 10; i++ {
time.Sleep(5000)
orderId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
//使用Dapr SDK发布主题
if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
err != nil {
panic(err)
}
log.Println("发布的数据: " + strconv.Itoa(orderId))
}
}
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
//依赖项
import { DaprServer, DaprClient, CommunicationProtocolEnum } from '@dapr/dapr';
const daprHost = "127.0.0.1";
var main = function() {
for(var i=0;i<10;i++) {
sleep(5000);
var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
start(orderId).catch((e) => {
console.error(e);
process.exit(1);
});
}
}
async function start(orderId) {
const PUBSUB_NAME = "order-pub-sub"
const TOPIC_NAME = "orders"
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
communicationProtocol: CommunicationProtocolEnum.HTTP
});
console.log("发布的数据:" + orderId)
//使用Dapr SDK发布主题
await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
main();
导航到包含上述代码的目录,然后运行以下命令以启动Dapr sidecar和发布者应用程序:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
为了告诉Dapr消息已成功处理,返回200 OK
响应。如果Dapr收到的返回状态码不是200
,或者您的应用程序崩溃,Dapr将尝试根据至少一次语义重新传递消息。
观看此演示视频以了解更多关于Dapr的pubsub消息传递。
为了实现消息路由并为每条消息提供额外的上下文,Dapr 采用 CloudEvents 1.0 规范 作为其消息格式。通过 Dapr 发送到主题的任何消息都会自动被包装在 CloudEvents 信封中,使用 Content-Type
头部值 作为 datacontenttype
属性。
Dapr 使用 CloudEvents 为事件负载提供额外的上下文,从而实现以下功能:
您可以选择以下三种方法之一通过发布订阅发布 CloudEvent:
向 Dapr 发送发布操作会自动将其包装在一个包含以下字段的 CloudEvent 信封中:
id
source
specversion
type
traceparent
traceid
tracestate
topic
pubsubname
time
datacontenttype
(可选)以下示例演示了 Dapr 为发布到 orders
主题的操作生成的 CloudEvent,其中包括:
traceid
,唯一标识消息data
和 CloudEvent 的字段,其中数据内容被序列化为 JSON{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data": {
"orderId": 1
},
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "checkout",
"type": "com.dapr.event.sent",
"time": "2020-09-23T06:23:21Z",
"traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
作为另一个 v1.0 CloudEvent 的示例,以下显示了在 CloudEvent 消息中以 JSON 序列化的 XML 内容的数据:
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data" : "<note><to></to><from>user2</from><message>Order</message></note>",
"id" : "id-1234-5678-9101",
"specversion" : "1.0",
"datacontenttype" : "text/xml",
"subject" : "Test XML Message",
"source" : "https://example.com/message",
"type" : "xml.message",
"time" : "2020-09-23T06:23:21Z"
}
Dapr 自动生成多个 CloudEvent 属性。您可以通过提供以下可选元数据键/值来替换这些生成的 CloudEvent 属性:
cloudevent.id
: 覆盖 id
cloudevent.source
: 覆盖 source
cloudevent.type
: 覆盖 type
cloudevent.traceid
: 覆盖 traceid
cloudevent.tracestate
: 覆盖 tracestate
cloudevent.traceparent
: 覆盖 traceparent
使用这些元数据属性替换 CloudEvents 属性的能力适用于所有发布订阅组件。
例如,要替换代码中上述 CloudEvent 示例中的 source
和 id
值:
with DaprClient() as client:
order = {'orderId': i}
# 使用 Dapr 发布订阅发布事件/消息
result = client.publish_event(
pubsub_name='order_pub_sub',
topic_name='orders',
publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
)
var order = new Order(i);
using var client = new DaprClientBuilder().Build();
// 覆盖 cloudevent 元数据
var metadata = new Dictionary<string,string>() {
{ "cloudevent.source", "payment" },
{ "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}
// 使用 Dapr 发布订阅发布事件/消息
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);
await Task.Delay(TimeSpan.FromSeconds(1));
然后 JSON 负载反映新的 source
和 id
值:
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data": {
"orderId": 1
},
"id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "payment",
"type": "com.dapr.event.sent",
"time": "2020-09-23T06:23:21Z",
"traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
traceid
/traceparent
和 tracestate
,但这样做可能会干扰事件跟踪并在跟踪工具中报告不一致的结果。建议使用 Open Telemetry 进行分布式跟踪。了解更多关于分布式跟踪的信息。
如果您想使用自己的 CloudEvent,请确保将 datacontenttype
指定为 application/cloudevents+json
。
如果应用程序编写的 CloudEvent 不包含 CloudEvent 规范中最低要求的字段,则消息将被拒绝。如果缺少,Dapr 会将以下字段添加到 CloudEvent 中:
time
traceid
traceparent
tracestate
topic
pubsubname
source
type
specversion
您可以向自定义 CloudEvent 添加不属于官方 CloudEvent 规范的其他字段。Dapr 将按原样传递这些字段。
发布一个 CloudEvent 到 orders
主题:
dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'
发布一个 CloudEvent 到 orders
主题:
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
发布一个 CloudEvent 到 orders
主题:
Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'
使用 Dapr 创建的 CloudEvents 时,信封中包含一个 id
字段,应用程序可以使用该字段执行消息去重。Dapr 不会自动进行去重处理。Dapr 支持使用本身具备消息去重功能的消息代理。
在将Dapr集成到您的应用程序时,由于兼容性原因或某些应用程序不使用Dapr,某些服务可能仍需要通过不封装在CloudEvents中的pub/sub消息进行通信。这些消息被称为“原始”pub/sub消息。Dapr允许应用程序发布和订阅原始事件,这些事件未封装在CloudEvent中以实现兼容性。
Dapr应用程序可以将原始事件发布到pub/sub主题中,而不需要CloudEvent封装,以便与非Dapr应用程序兼容。
要禁用CloudEvent封装,请在发布请求中将rawPayload
元数据设置为true
。这样,订阅者可以接收这些消息而无需解析CloudEvent架构。
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"order-number": "345"}'
from dapr.clients import DaprClient
with DaprClient() as d:
req_data = {
'order-number': '345'
}
# 创建一个带有内容类型和主体的类型化消息
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_A',
data=json.dumps(req_data),
publish_metadata={'rawPayload': 'true'}
)
# 打印请求
print(req_data, flush=True)
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
$publisher->topic('TOPIC_A')->publish('data', ['rawPayload' => 'true']);
});
Dapr应用程序还可以订阅来自不使用CloudEvent封装的现有pub/sub主题的原始事件。
在以编程方式订阅时,添加rawPayload
的额外元数据条目,以便Dapr sidecar自动将负载封装到与当前Dapr SDK兼容的CloudEvent中。
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'deathStarStatus',
'route': 'dsstatus',
'metadata': {
'rawPayload': 'true',
} }]
return jsonify(subscriptions)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus', metadata: [ 'rawPayload' => 'true'] ),
]]));
$app->post('/dsstatus', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
同样,您可以通过在订阅规范中添加rawPayload
元数据条目来声明式地订阅原始事件。
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
routes:
default: /dsstatus
pubsubname: pubsub
metadata:
rawPayload: "true"
scopes:
- app1
- app2
pubsub 路由实现了基于内容的路由,这是一种使用 DSL 而不是命令式应用程序代码的消息模式。通过 pubsub 路由,您可以根据 CloudEvents 的内容,将消息路由到应用程序中的不同 URI/路径和事件处理程序。如果没有匹配的路由,则可以使用可选的默认路由。随着您的应用程序扩展以支持多个事件版本或特殊情况,这种方法将非常有用。
虽然可以通过代码实现路由,但将路由规则与应用程序分离可以提高可移植性。
此功能适用于声明式和编程式订阅方法,但不适用于流式订阅。
对于声明式订阅,使用 dapr.io/v2alpha1
作为 apiVersion
。以下是使用路由的 subscriptions.yaml
示例:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
pubsubname: pubsub
topic: inventory
routes:
rules:
- match: event.type == "widget"
path: /widgets
- match: event.type == "gadget"
path: /gadgets
default: /products
scopes:
- app1
- app2
在编程方法中,返回的是 routes
结构而不是 route
。JSON 结构与声明式 YAML 匹配:
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'inventory',
'routes': {
'rules': [
{
'match': 'event.type == "widget"',
'path': '/widgets'
},
{
'match': 'event.type == "gadget"',
'path': '/gadgets'
},
],
'default': '/products'
}
}]
return jsonify(subscriptions)
@app.route('/products', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "inventory",
routes: {
rules: [
{
match: 'event.type == "widget"',
path: '/widgets'
},
{
match: 'event.type == "gadget"',
path: '/gadgets'
},
],
default: '/products'
}
}
]);
})
app.post('/products', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
[HttpPost("widgets")]
public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
[HttpPost("gadgets")]
public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory")]
[HttpPost("products")]
public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
type rule struct {
Match string `json:"match"`
Path string `json:"path"`
}
// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "inventory",
Routes: routes{
Rules: []rule{
{
Match: `event.type == "widget"`,
Path: "/widgets",
},
{
Match: `event.type == "gadget"`,
Path: "/gadgets",
},
},
Default: "/products",
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'inventory', routes: (
rules: => [
('match': 'event.type == "widget"', path: '/widgets'),
('match': 'event.type == "gadget"', path: '/gadgets'),
]
default: '/products')),
]]));
$app->post('/products', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
在这些示例中,根据 event.type
,应用程序将被调用于:
/widgets
/gadgets
/products
表达式是用通用表达式语言 (CEL)编写的,其中 event
代表云事件。表达式中可以引用 CloudEvents 核心规范中的任何属性。
匹配“重要”消息:
has(event.data.important) && event.data.important == true
匹配大于 $10,000 的存款:
event.type == "deposit" && int(event.data.amount) > 10000
匹配消息的多个版本:
event.type == "mymessage.v1"
event.type == "mymessage.v2"
作为参考,以下属性来自 CloudEvents 规范。
根据术语 data 的定义,CloudEvents 可能 包含有关事件发生的领域特定信息。当存在时,此信息将被封装在 data
中。
datacontenttype
属性指定(例如 application/json),并在这些相应属性存在时遵循 dataschema
格式。以下属性在所有 CloudEvents 中是必需的:
String
source
+ id
对于每个不同的事件都是唯一的。如果由于网络错误而重新发送重复事件,则它可能具有相同的 id
。消费者可以假设具有相同 source
和 id
的事件是重复的。类型: URI-reference
描述: 标识事件发生的上下文。通常包括以下信息:
URI 中编码的数据的确切语法和语义由事件生产者定义。
生产者 必须 确保 source
+ id
对于每个不同的事件都是唯一的。
应用程序可以:
source
,以便更容易生成唯一的 ID,并防止其他生产者具有相同的 source
。source
标识符。一个 source 可能包含多个生产者。在这种情况下,生产者 必须 合作以确保 source
+ id
对于每个不同的事件都是唯一的。
约束:
示例:
类型: String
描述: 事件使用的 CloudEvents 规范版本。这使得上下文的解释成为可能。合规的事件生产者 必须 在引用此版本的规范时使用 1.0
值。
目前,此属性仅包含“主要”和“次要”版本号。这允许在不更改此属性值的情况下对规范进行补丁更改。
注意:对于“候选发布”版本,可能会使用后缀进行测试。
约束:
String
type
的版本信息。有关更多信息,请参阅CloudEvents 的版本控制。以下属性在 CloudEvents 中是可选的。有关可选定义的更多信息,请参阅符号约定部分。
类型: String
根据 RFC 2046
描述: data
值的内容类型。此属性使 data
能够携带任何类型的内容,其中格式和编码可能与所选事件格式不同。
例如,使用 JSON 信封格式呈现的事件可能在 data
中携带 XML 负载。此属性被设置为 "application/xml"
,通知消费者。
不同 datacontenttype
值的数据内容呈现规则在事件格式规范中定义。例如,JSON 事件格式在第 3.1 节中定义了关系。
对于某些二进制模式协议绑定,此字段直接映射到相应协议的内容类型元数据属性。您可以在相应协议中找到二进制模式和内容类型元数据映射的规范规则。
在某些事件格式中,您可以省略 datacontenttype
属性。例如,如果 JSON 格式事件没有 datacontenttype
属性,则意味着 data
是符合 "application/json"
媒体类型的 JSON 值。换句话说:没有 datacontenttype
的 JSON 格式事件与 datacontenttype="application/json"
的事件完全等效。
当将没有 datacontenttype
属性的事件消息转换为不同格式或协议绑定时,目标 datacontenttype
应明确设置为源的隐含 datacontenttype
。
约束:
有关媒体类型示例,请参阅 IANA 媒体类型
URI
data
遵循的模式。与模式不兼容的更改应通过不同的 URI 反映。有关更多信息,请参阅CloudEvents 的版本控制。类型: String
描述: 这描述了事件生产者(由 source
标识)上下文中的事件主题。在发布-订阅场景中,订阅者通常会订阅由 source
发出的事件。如果 source
上下文具有内部子结构,则仅 source
标识符可能不足以作为任何特定事件的限定符。
在上下文元数据中(而不是仅在 data
负载中)识别事件的主题在通用订阅过滤场景中很有帮助,其中中间件无法解释 data
内容。在上述示例中,订阅者可能只对名称以 ‘.jpg’ 或 ‘.jpeg’ 结尾的 blob 感兴趣。使用 subject
属性,您可以为该事件子集构建简单而高效的字符串后缀过滤器。
约束:
示例:
订阅者可能会注册对在 blob 存储容器中创建新 blob 时的兴趣。在这种情况下:
source
标识订阅范围(存储容器)type
标识“blob 创建”事件id
唯一标识事件实例,以区分同名 blob 的单独创建事件。新创建的 blob 的名称在 subject
中传递:
source
: https://example.com/storage/tenant/containersubject
: mynewfile.jpgTimestamp
source
的生产者 必须 在这方面保持一致。换句话说,要么他们都使用事件发生的实际时间,要么他们都使用相同的算法来确定使用的值。观看此视频以了解如何使用 pubsub 进行消息路由:
Dapr 应用程序可以通过三种订阅类型来订阅已发布的主题,这三种类型支持相同的功能:声明式、流式和编程式。
订阅类型 | 描述 |
---|---|
声明式 | 订阅在外部文件中定义。声明式方法将 Dapr 的依赖从代码中移除,允许现有应用程序无需更改代码即可订阅主题。 |
流式 | 订阅在应用程序代码中定义。流式订阅是动态的,允许在运行时添加或删除订阅。它们不需要在应用程序中设置订阅端点(这是编程式和声明式订阅所需的),使其在代码中易于配置。流式订阅也不需要应用程序配置 sidecar 来接收消息。 |
编程式 | 订阅在应用程序代码中定义。编程式方法实现了静态订阅,并需要在代码中设置一个端点。 |
下面的示例演示了通过 orders
主题在 checkout
应用程序和 orderprocessing
应用程序之间的发布/订阅消息。示例首先以声明式,然后以编程式演示了相同的 Dapr 发布/订阅组件。
HotReload
功能门控启用。
为了防止重新处理或丢失未处理的消息,在 Dapr 和您的应用程序之间的飞行消息在热重载事件期间不受影响。
您可以使用外部组件文件声明性地订阅一个主题。此示例使用名为 subscription.yaml
的 YAML 组件文件:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order
spec:
topic: orders
routes:
default: /orders
pubsubname: pubsub
scopes:
- orderprocessing
这里的订阅名为 order
:
pubsub
的发布/订阅组件订阅名为 orders
的主题。route
字段以将所有主题消息发送到应用程序中的 /orders
端点。scopes
字段以将此订阅的访问范围仅限于 ID 为 orderprocessing
的应用程序。运行 Dapr 时,设置 YAML 组件文件路径以指向 Dapr 的组件。
dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- npm start
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
在 Kubernetes 中,将组件应用到集群:
kubectl apply -f subscription.yaml
在您的应用程序代码中,订阅 Dapr 发布/订阅组件中指定的主题。
//订阅一个主题
[HttpPost("orders")]
public void getCheckout([FromBody] int orderId)
{
Console.WriteLine("Subscriber received : " + orderId);
}
import io.dapr.client.domain.CloudEvent;
//订阅一个主题
@PostMapping(path = "/orders")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
log.info("Subscriber received: " + cloudEvent.getData());
}
});
}
from cloudevents.sdk.event import v1
#订阅一个主题
@app.route('/orders', methods=['POST'])
def checkout(event: v1.Event) -> None:
data = json.loads(event.Data())
logging.info('Subscriber received: ' + str(data))
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
// 监听声明式路由
app.post('/orders', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
//订阅一个主题
var sub = &common.Subscription{
PubsubName: "pubsub",
Topic: "orders",
Route: "/orders",
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("Subscriber received: %s", e.Data)
return false, nil
}
/orders
端点与订阅中定义的 route
匹配,这是 Dapr 发送所有主题消息的地方。
流式订阅是在应用程序代码中定义的订阅,可以在运行时动态停止和启动。 消息由应用程序从 Dapr 拉取。这意味着不需要端点来订阅主题,并且可以在没有任何应用程序配置在 sidecar 上的情况下进行订阅。 可以同时订阅任意数量的发布/订阅和主题。 由于消息被发送到给定的消息处理代码,因此没有路由或批量订阅的概念。
注意: 每个应用程序一次只能订阅一个发布/订阅/主题对。
下面的示例展示了不同的流式订阅主题的方法。
您可以使用 subscribe
方法,该方法返回一个 Subscription
对象,并允许您通过调用 next_message
方法从流中拉取消息。这在主线程中运行,并可能在等待消息时阻塞主线程。
import time
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
counter = 0
def process_message(message):
global counter
counter += 1
# 在此处处理消息
print(f'Processing message: {message.data()} from {message.topic()}...')
return 'success'
def main():
with DaprClient() as client:
global counter
subscription = client.subscribe(
pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
)
try:
while counter < 5:
try:
message = subscription.next_message()
except StreamInactiveError as e:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
if message is None:
print('No message received within timeout period.')
continue
# 处理消息
response_status = process_message(message)
if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
finally:
print("Closing subscription...")
subscription.close()
if __name__ == '__main__':
main()
您还可以使用 subscribe_with_handler
方法,该方法接受一个回调函数,该函数为从流中接收到的每条消息执行。此方法在单独的线程中运行,因此不会阻塞主线程。
import time
from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
counter = 0
def process_message(message):
# 在此处处理消息
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
return TopicEventResponse('success')
def main():
with (DaprClient() as client):
# 这将启动一个新线程,该线程将监听消息
# 并在 `process_message` 函数中处理它们
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub', topic='orders', handler_fn=process_message,
dead_letter_topic='orders_dead'
)
while counter < 5:
time.sleep(1)
print("Closing subscription...")
close_fn()
if __name__ == '__main__':
main()
package main
import (
"context"
"log"
"github.com/dapr/go-sdk/client"
)
func main() {
cl, err := client.NewClient()
if err != nil {
log.Fatal(err)
}
sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "orders",
})
if err != nil {
panic(err)
}
// 必须始终调用 Close。
defer sub.Close()
for {
msg, err := sub.Receive()
if err != nil {
panic(err)
}
// 处理事件
// 我们 _必须_ 始终表示消息处理的结果,否则
// 消息将不会被视为已处理,并将被重新传递或
// 死信。
// msg.Retry()
// msg.Drop()
if err := msg.Success(); err != nil {
panic(err)
}
}
}
或
package main
import (
"context"
"log"
"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
)
func main() {
cl, err := client.NewClient()
if err != nil {
log.Fatal(err)
}
stop, err := cl.SubscribeWithHandler(context.Background(),
client.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "orders",
},
eventHandler,
)
if err != nil {
panic(err)
}
// 必须始终调用 Stop。
defer stop()
<-make(chan struct{})
}
func eventHandler(e *common.TopicEvent) common.SubscriptionResponseStatus {
// 在此处处理消息
// common.SubscriptionResponseStatusRetry
// common.SubscriptionResponseStatusDrop
common.SubscriptionResponseStatusDrop, status)
}
return common.SubscriptionResponseStatusSuccess
}
观看 此视频以了解流式订阅的概述:
动态编程式方法在代码中返回 routes
JSON 结构,与声明式方法的 route
YAML 结构不同。
注意: 编程式订阅仅在应用程序启动时读取一次。您不能 动态 添加新的编程式订阅,只能在编译时添加新的。
在下面的示例中,您在应用程序代码中定义了在上面的声明式 YAML 订阅中找到的值。
[Topic("pubsub", "orders")]
[HttpPost("/orders")]
public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient)
{
// 逻辑
return order;
}
或
// Dapr 订阅在 [Topic] 中将 orders 主题路由到此路由
app.MapPost("/orders", [Topic("pubsub", "orders")] (Order order) => {
Console.WriteLine("Subscriber received : " + order);
return Results.Ok(order);
});
上面定义的两个处理程序还需要映射以配置 dapr/subscribe
端点。这是在定义端点时在应用程序启动代码中完成的。
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
});
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Topic(name = "orders", pubsubName = "pubsub")
@PostMapping(path = "/orders")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber received: " + cloudEvent.getData());
System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'orders',
'routes': {
'rules': [
{
'match': 'event.type == "order"',
'path': '/orders'
},
],
'default': '/orders'
}
}]
return jsonify(subscriptions)
@app.route('/orders', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "orders",
routes: {
rules: [
{
match: 'event.type == "order"',
path: '/orders'
},
],
default: '/products'
}
}
]);
})
app.post('/orders', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
type rule struct {
Match string `json:"match"`
Path string `json:"path"`
}
// 处理 /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "orders",
Routes: routes{
Rules: []rule{
{
Match: `event.type == "order"`,
Path: "/orders",
},
},
Default: "/orders",
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
在某些情况下,应用程序可能由于各种原因无法处理消息。例如,可能会出现获取处理消息所需数据的临时问题,或者应用程序的业务逻辑失败并返回错误。死信主题用于处理这些无法投递的消息,并将其转发到订阅应用程序。这可以减轻应用程序处理失败消息的负担,使开发人员可以编写代码从死信主题中读取消息,修复后重新发送,或者选择放弃这些消息。
死信主题通常与重试策略和处理死信主题消息的订阅一起使用。
当配置了死信主题时,任何无法投递到应用程序的消息都会被放置在死信主题中,以便转发到处理这些消息的订阅。这可以是同一个应用程序或完全不同的应用程序。
即使底层系统不支持,Dapr 也为其所有的 pubsub 组件启用了死信主题。例如,AWS SNS 组件有一个死信队列,RabbitMQ有死信主题。您需要确保正确配置这些组件。
下图展示了死信主题的工作原理。首先,消息从 orders
主题的发布者发送。Dapr 代表订阅者应用程序接收消息,但 orders
主题的消息未能投递到应用程序的 /checkout
端点,即使经过重试也是如此。由于投递失败,消息被转发到 poisonMessages
主题,该主题将其投递到 /failedMessages
端点进行处理,在这种情况下是在同一个应用程序上。failedMessages
处理代码可以选择丢弃消息或重新发送新消息。
以下 YAML 显示了如何为从 orders
主题消费的消息配置名为 poisonMessages
的死信主题。此订阅的范围限定为具有 checkout
ID 的应用程序。
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order
spec:
topic: orders
routes:
default: /checkout
pubsubname: pubsub
deadLetterTopic: poisonMessages
scopes:
- checkout
var deadLetterTopic = "poisonMessages"
sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "orders",
DeadLetterTopic: &deadLetterTopic,
})
从 /subscribe
端点返回的 JSON 显示了如何为从 orders
主题消费的消息配置名为 poisonMessages
的死信主题。
app.get('/dapr/subscribe', (_req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "orders",
route: "/checkout",
deadLetterTopic: "poisonMessages"
}
]);
});
默认情况下,当设置了死信主题时,任何失败的消息会立即进入死信主题。因此,建议在订阅中使用死信主题时始终设置重试策略。 要在将消息发送到死信主题之前启用消息重试,请对 pubsub 组件应用 重试策略。
此示例显示了如何为 pubsub
pubsub 组件设置名为 pubsubRetry
的常量重试策略,每 5 秒应用一次,最多尝试投递 10 次。
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: myresiliency
spec:
policies:
retries:
pubsubRetry:
policy: constant
duration: 5s
maxRetries: 10
targets:
components:
pubsub:
inbound:
retry: pubsubRetry
请记得配置一个订阅来处理死信主题。例如,您可以创建另一个声明式订阅,在同一个或不同的应用程序上接收这些消息。下面的示例显示了 checkout 应用程序通过另一个订阅订阅 poisonMessages
主题,并将这些消息发送到 /failedmessages
端点进行处理。
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: deadlettertopics
spec:
topic: poisonMessages
routes:
rules:
- match:
path: /failedMessages
pubsubname: pubsub
scopes:
- checkout
您已经配置了 Dapr 的 pub/sub API 构建块,并且您的应用程序正在使用集中式消息代理顺利地发布和订阅主题。如果您想为应用程序执行简单的 A/B 测试、蓝/绿部署,甚至金丝雀部署,该怎么办?即使使用 Dapr,这也可能很困难。
Dapr 通过其 pub/sub 命名空间消费者组机制解决了大规模的多租户问题。
假设您有一个 Kubernetes 集群,其中两个应用程序(App1 和 App2)部署在同一个命名空间(namespace-a)中。App2 发布到一个名为 order
的主题,而 App1 订阅名为 order
的主题。这将创建两个以您的应用程序命名的消费者组(App1 和 App2)。
为了在使用集中式消息代理时进行简单的测试和部署,您创建了另一个命名空间,其中包含两个具有相同 app-id
的应用程序,App1 和 App2。
Dapr 使用单个应用程序的 app-id
创建消费者组,因此消费者组名称将保持为 App1 和 App2。
为了避免这种情况,您需要在代码中“潜入”一些东西来更改 app-id
,具体取决于您运行的命名空间。这种方法既麻烦又容易出错。
Dapr 不仅允许您使用 UUID 和 pod 名称的 consumerID 更改消费者组的行为,还提供了一个存在于 pub/sub 组件元数据中的 命名空间机制。例如,使用 Redis 作为您的消息代理:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: consumerID
value: "{namespace}"
通过将 consumerID
配置为 {namespace}
值,您可以在不同的命名空间中使用相同的 app-id
和相同的主题。
在上图中,您有两个命名空间,每个命名空间都有相同 app-id
的应用程序,发布和订阅相同的集中式消息代理 orders
。然而这次,Dapr 创建了以它们运行的命名空间为前缀的消费者组名称。
无需更改您的代码或 app-id
,命名空间消费者组允许您:
app-id
只需在您的组件元数据中包含 "{namespace}"
消费者组机制。您无需在元数据中手动编码命名空间。Dapr 会自动识别其运行的命名空间并为您填充命名空间值,就像由运行时注入的动态元数据值一样。
与在Deployments中Pod是临时的不同,StatefulSets通过为每个Pod保持固定的身份,使得在Kubernetes上可以部署有状态应用程序。
以下是一个使用Dapr的StatefulSet示例:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: python-subscriber
spec:
selector:
matchLabels:
app: python-subscriber # 必须匹配.spec.template.metadata.labels
serviceName: "python-subscriber"
replicas: 3
template:
metadata:
labels:
app: python-subscriber # 必须匹配.spec.selector.matchLabels
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "python-subscriber"
dapr.io/app-port: "5001"
spec:
containers:
- name: python-subscriber
image: ghcr.io/dapr/samples/pubsub-python-subscriber:latest
ports:
- containerPort: 5001
imagePullPolicy: Always
在通过Dapr订阅pubsub主题时,应用程序可以定义一个consumerID
,这个ID决定了订阅者在队列或主题中的位置。利用StatefulSets中Pod的固定身份,您可以为每个Pod分配一个唯一的consumerID
,从而实现订阅者应用程序的水平扩展。Dapr会跟踪每个Pod的名称,并可以在组件中使用{podName}
标记来声明。
当扩展某个主题的订阅者数量时,每个Dapr组件都有特定的设置来决定其行为。通常,对于多个消费者有两种选择:
Kafka通过consumerID
为每个订阅者分配独立的位置。当实例重新启动时,它会使用相同的consumerID
继续从上次的位置处理消息,而不会遗漏任何消息。以下组件示例展示了如何让多个Pod使用Kafka组件:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
- name: consumerID
value: "{podName}"
- name: authRequired
value: "false"
MQTT3协议支持共享主题,允许多个订阅者“竞争”处理来自主题的消息,这意味着每条消息仅由其中一个订阅者处理。例如:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mqtt-pubsub
spec:
type: pubsub.mqtt3
version: v1
metadata:
- name: consumerID
value: "{podName}"
- name: cleanSession
value: "true"
- name: url
value: "tcp://admin:public@localhost:1883"
- name: qos
value: 1
- name: retain
value: "false"
命名空间或组件范围可以用来限制组件的访问权限,使其仅对特定应用程序可用。这些应用程序范围的设置确保只有具有特定 ID 的应用程序才能使用该组件。
除了这种通用的组件范围外,还可以对 pub/sub 组件进行以下限制:
这被称为 pub/sub 主题范围控制。
为每个 pub/sub 组件定义 pub/sub 范围。您可能有一个名为 pubsub
的 pub/sub 组件,它有一组范围,另一个 pubsub2
则有不同的范围。
要使用此主题范围,可以为 pub/sub 组件设置三个元数据属性:
spec.metadata.publishingScopes
publishingScopes
中未指定任何内容(默认行为),则所有应用程序都可以发布到所有主题app1=;app2=topic2
)app1=topic1;app2=topic2,topic3;app3=
将允许 app1 仅发布到 topic1,app2 仅发布到 topic2 和 topic3,app3 则不能发布到任何主题。spec.metadata.subscriptionScopes
subscriptionScopes
中未指定任何内容(默认行为),则所有应用程序都可以订阅所有主题app1=topic1;app2=topic2,topic3
将允许 app1 仅订阅 topic1,app2 仅订阅 topic2 和 topic3spec.metadata.allowedTopics
allowedTopics
(默认行为),则所有主题都是有效的。如果存在,subscriptionScopes
和 publishingScopes
仍然生效。publishingScopes
或 subscriptionScopes
可以与 allowedTopics
结合使用以添加细粒度限制spec.metadata.protectedTopics
publishingScopes
或 subscriptionScopes
明确授予应用程序发布或订阅权限才能发布/订阅该主题。这些元数据属性可用于所有 pub/sub 组件。以下示例使用 Redis 作为 pub/sub 组件。
在某些情况下,限制哪些应用程序可以发布/订阅主题是有用的,例如当您有包含敏感信息的主题时,只有一部分应用程序被允许发布或订阅这些主题。
它也可以用于所有主题,以始终拥有一个“真实来源”,以了解哪些应用程序作为发布者/订阅者使用哪些主题。
以下是三个应用程序和三个主题的示例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: publishingScopes
value: "app1=topic1;app2=topic2,topic3;app3="
- name: subscriptionScopes
value: "app2=;app3=topic1"
下表显示了哪些应用程序被允许发布到主题:
topic1 | topic2 | topic3 | |
---|---|---|---|
app1 | ✅ | ||
app2 | ✅ | ✅ | |
app3 |
下表显示了哪些应用程序被允许订阅主题:
topic1 | topic2 | topic3 | |
---|---|---|---|
app1 | ✅ | ✅ | ✅ |
app2 | |||
app3 | ✅ |
注意:如果应用程序未列出(例如 subscriptionScopes 中的 app1),则允许其订阅所有主题。因为未使用
allowedTopics
,且 app1 没有任何订阅范围,它也可以使用上面未列出的其他主题。
如果 Dapr 应用程序向其发送消息,则会创建一个主题。在某些情况下,这种主题创建应该受到管理。例如:
在这些情况下可以使用 allowedTopics
。
以下是三个允许的主题的示例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: allowedTopics
value: "topic1,topic2,topic3"
所有应用程序都可以使用这些主题,但仅限于这些主题,不允许其他主题。
allowedTopics
和范围有时您希望结合两者范围,从而仅拥有一组固定的允许主题,并指定对某些应用程序的范围。
以下是三个应用程序和两个主题的示例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: allowedTopics
value: "A,B"
- name: publishingScopes
value: "app1=A"
- name: subscriptionScopes
value: "app1=;app2=A"
注意:第三个应用程序未列出,因为如果应用程序未在范围内指定,则允许其使用所有主题。
下表显示了哪个应用程序被允许发布到主题:
A | B | C | |
---|---|---|---|
app1 | ✅ | ||
app2 | ✅ | ✅ | |
app3 | ✅ | ✅ |
下表显示了哪个应用程序被允许订阅主题:
A | B | C | |
---|---|---|---|
app1 | |||
app2 | ✅ | ||
app3 | ✅ | ✅ |
如果您的主题涉及敏感数据,则每个新应用程序必须在 publishingScopes
和 subscriptionScopes
中明确列出,以确保其无法读取或写入该主题。或者,您可以将主题指定为“保护”(使用 protectedTopics
),并仅授予真正需要的特定应用程序访问权限。
以下是三个应用程序和三个主题的示例,其中两个主题是保护的:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: protectedTopics
value: "A,B"
- name: publishingScopes
value: "app1=A,B;app2=B"
- name: subscriptionScopes
value: "app1=A,B;app2=B"
在上面的示例中,主题 A 和 B 被标记为保护。因此,即使 app3
未列在 publishingScopes
或 subscriptionScopes
中,它也无法与这些主题交互。
下表显示了哪个应用程序被允许发布到主题:
A | B | C | |
---|---|---|---|
app1 | ✅ | ✅ | |
app2 | ✅ | ||
app3 | ✅ |
下表显示了哪个应用程序被允许订阅主题:
A | B | C | |
---|---|---|---|
app1 | ✅ | ✅ | |
app2 | ✅ | ||
app3 | ✅ |
Dapr 支持为每条消息设置生存时间 (TTL)。这意味着应用程序可以为每条消息指定生存时间,过期后订阅者将不会收到这些消息。
所有 Dapr 发布/订阅组件 都兼容消息 TTL,因为 Dapr 在运行时内处理 TTL 逻辑。只需在发布消息时设置 ttlInSeconds
元数据即可。
在某些组件中,例如 Kafka,可以通过 retention.ms
在主题中配置生存时间,详见文档。使用 Dapr 的消息 TTL,使用 Kafka 的应用程序现在可以为每条消息设置生存时间,而不仅限于每个主题。
当发布/订阅组件原生支持消息生存时间时,Dapr 仅转发生存时间配置而不添加额外逻辑,保持行为的可预测性。这在组件以不同方式处理过期消息时非常有用。例如,在 Azure Service Bus 中,过期消息会被存储在死信队列中,而不是简单地删除。
Azure Service Bus 支持实体级别的生存时间。这意味着消息有默认的生存时间,但也可以在发布时设置为更短的时间跨度。Dapr 传播消息的生存时间元数据,并让 Azure Service Bus 直接处理过期。
如果消息由不使用 Dapr 的订阅者消费,过期消息不会自动丢弃,因为过期是由 Dapr 运行时在 Dapr sidecar 接收到消息时处理的。然而,订阅者可以通过在云事件中添加逻辑来处理 expiration
属性,以编程方式丢弃过期消息,该属性遵循 RFC3339 格式。
当非 Dapr 订阅者使用诸如 Azure Service Bus 等原生处理消息 TTL 的组件时,他们不会收到过期消息。在这种情况下,不需要额外的逻辑。
消息 TTL 可以在发布请求的元数据中设置:
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.ttlInSeconds=120 -H "Content-Type: application/json" -d '{"order-number": "345"}'
from dapr.clients import DaprClient
with DaprClient() as d:
req_data = {
'order-number': '345'
}
# 创建一个带有内容类型和主体的类型化消息
resp = d.publish_event(
pubsub_name='pubsub',
topic='TOPIC_A',
data=json.dumps(req_data),
publish_metadata={'ttlInSeconds': '120'}
)
# 打印请求
print(req_data, flush=True)
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
$publisher->topic('TOPIC_A')->publish('data', ['ttlInSeconds' => '120']);
});
请参阅本指南以获取发布/订阅 API 的参考。
通过批量发布和订阅API,您可以在单个请求中发布和订阅多个消息。在开发需要发送或接收大量消息的应用程序时,使用批量操作可以通过减少Dapr sidecar、应用程序和底层pubsub代理之间的请求总数来提高吞吐量。
批量发布API允许您通过单个请求将多个消息发布到一个主题。它是非事务性的,这意味着在一个批量请求中,某些消息可能会成功发布,而某些可能会失败。如果有消息发布失败,批量发布操作将返回失败消息的列表。
批量发布操作不保证消息的顺序。
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class BulkPublisher {
private static final String PUBSUB_NAME = "my-pubsub-name";
private static final String TOPIC_NAME = "topic-a";
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// 创建要发布的消息列表
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = String.format("这是消息 #%d", i);
messages.add(message);
}
// 使用批量发布API发布消息列表
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
}
}
}
import { DaprClient } from "@dapr/dapr";
const pubSubName = "my-pubsub-name";
const topic = "topic-a";
async function start() {
const client = new DaprClient();
// 向主题发布多个消息。
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
// 使用显式批量发布消息向主题发布多个消息。
const bulkPublishMessages = [
{
entryID: "entry-1",
contentType: "application/json",
event: { hello: "foo message 1" },
},
{
entryID: "entry-2",
contentType: "application/cloudevents+json",
event: {
specversion: "1.0",
source: "/some/source",
type: "example",
id: "1234",
data: "foo message 2",
datacontenttype: "text/plain"
},
},
{
entryID: "entry-3",
contentType: "text/plain",
event: "foo message 3",
},
];
await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
using System;
using System.Collections.Generic;
using Dapr.Client;
const string PubsubName = "my-pubsub-name";
const string TopicName = "topic-a";
IReadOnlyList<object> BulkPublishData = new List<object>() {
new { Id = "17", Amount = 10m },
new { Id = "18", Amount = 20m },
new { Id = "19", Amount = 30m }
};
using var client = new DaprClientBuilder().Build();
var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
if (res == null) {
throw new Exception("从dapr返回的响应为空");
}
if (res.FailedEntries.Count > 0)
{
Console.WriteLine("某些事件发布失败!");
foreach (var failedEntry in res.FailedEntries)
{
Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " 错误信息: " +
failedEntry.ErrorMessage);
}
}
else
{
Console.WriteLine("所有事件已发布!");
}
import requests
import json
base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
pubsub_name = "my-pubsub-name"
topic_name = "topic-a"
payload = [
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]
response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
print(response.status_code)
package main
import (
"fmt"
"strings"
"net/http"
"io/ioutil"
)
const (
pubsubName = "my-pubsub-name"
topicName = "topic-a"
baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
)
func main() {
url := fmt.Sprintf(baseUrl, pubsubName, topicName)
method := "POST"
payload := strings.NewReader(`[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]`)
client := &http.Client {}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
res, err := client.Do(req)
// ...
}
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
-H 'Content-Type: application/json' \
-d '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
-Body '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
批量订阅API允许您在单个请求中从一个主题订阅多个消息。正如我们从如何:发布和订阅主题中所知,有三种方式可以订阅主题:
要批量订阅主题,我们只需使用bulkSubscribe
规范属性,如下所示:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order-pub-sub
spec:
topic: orders
routes:
default: /checkout
pubsubname: order-pub-sub
bulkSubscribe:
enabled: true
maxMessagesCount: 100
maxAwaitDurationMs: 40
scopes:
- orderprocessing
- checkout
在上面的示例中,bulkSubscribe
是_可选的_。如果您使用bulkSubscribe
,那么:
enabled
是必需的,用于启用或禁用此主题的批量订阅。maxMessagesCount
)。
对于不支持批量订阅的组件,maxMessagesCount
的默认值为100,即应用程序和Dapr之间的默认批量事件。请参阅组件如何处理发布和订阅批量消息。
如果组件支持批量订阅,则该参数的默认值可以在该组件文档中找到。maxAwaitDurationMs
)。
对于不支持批量订阅的组件,maxAwaitDurationMs
的默认值为1000,即应用程序和Dapr之间的默认批量事件。请参阅组件如何处理发布和订阅批量消息。
如果组件支持批量订阅,则该参数的默认值可以在该组件文档中找到。应用程序接收与批量消息中的每个条目(单个消息)关联的EntryId
。应用程序必须使用此EntryId
来传达该特定条目的状态。如果应用程序未能通知EntryId
状态,则被视为RETRY
。
需要发送一个带有每个条目处理状态的JSON编码的有效负载体:
{
"statuses":
[
{
"entryId": "<entryId1>",
"status": "<status>"
},
{
"entryId": "<entryId2>",
"status": "<status>"
}
]
}
可能的状态值:
状态 | 描述 |
---|---|
SUCCESS |
消息处理成功 |
RETRY |
消息由Dapr重试 |
DROP |
记录警告并丢弃消息 |
请参阅批量订阅的预期HTTP响应以获取更多见解。
以下代码示例演示如何使用批量订阅。
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import reactor.core.publisher.Mono;
class BulkSubscriber {
@BulkSubscribe()
// @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
@Topic(name = "topicbulk", pubsubName = "orderPubSub")
@PostMapping(path = "/topicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("批量订阅者收到: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
import { DaprServer } from "@dapr/dapr";
const pubSubName = "orderPubSub";
const topic = "topicbulk";
const daprHost = process.env.DAPR_HOST || "127.0.0.1";
const daprPort = process.env.DAPR_HTTP_PORT || "3502";
const serverHost = process.env.SERVER_HOST || "127.0.0.1";
const serverPort = process.env.APP_PORT || 5001;
async function start() {
const server = new DaprServer({
serverHost,
serverPort,
clientOptions: {
daprHost,
daprPort,
},
});
// 使用默认配置向主题发布多个消息。
await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("订阅者收到: " + JSON.stringify(data)));
// 使用特定的maxMessagesCount和maxAwaitDurationMs向主题发布多个消息。
await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("订阅者收到: " + JSON.stringify(data)), 100, 40);
}
using Microsoft.AspNetCore.Mvc;
using Dapr.AspNetCore;
using Dapr;
namespace DemoApp.Controllers;
[ApiController]
[Route("[controller]")]
public class BulkMessageController : ControllerBase
{
private readonly ILogger<BulkMessageController> logger;
public BulkMessageController(ILogger<BulkMessageController> logger)
{
this.logger = logger;
}
[BulkSubscribe("messages", 10, 10)]
[Topic("pubsub", "messages")]
public ActionResult<BulkSubscribeAppResponse> HandleBulkMessages([FromBody] BulkSubscribeMessage<BulkMessageModel<BulkMessageModel>> bulkMessages)
{
List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>();
logger.LogInformation($"收到 {bulkMessages.Entries.Count()} 条消息");
foreach (var message in bulkMessages.Entries)
{
try
{
logger.LogInformation($"收到一条数据为 '{message.Event.Data.MessageData}' 的消息");
responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(responseEntries);
}
public class BulkMessageModel
{
public string MessageData { get; set; }
}
}
目前,您只能使用HTTP客户端在Python中进行批量订阅。
import json
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
# 定义批量订阅配置
subscriptions = [{
"pubsubname": "pubsub",
"topic": "TOPIC_A",
"route": "/checkout",
"bulkSubscribe": {
"enabled": True,
"maxMessagesCount": 3,
"maxAwaitDurationMs": 40
}
}]
print('Dapr pub/sub已订阅: ' + json.dumps(subscriptions))
return jsonify(subscriptions)
# 定义处理传入消息的端点
@app.route('/checkout', methods=['POST'])
def checkout():
messages = request.json
print(messages)
for message in messages:
print(f"收到消息: {message}")
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
if __name__ == '__main__':
app.run(port=5000)
对于事件发布/订阅,涉及两种网络传输。
这些是可以进行优化的机会。当优化时,进行批量请求,从而减少总体调用次数,从而提高吞吐量并提供更好的延迟。
启用批量发布和/或批量订阅时,应用程序和Dapr sidecar之间的通信(上面第1点)针对所有组件进行了优化。
从Dapr sidecar到pubsub代理的优化取决于许多因素,例如:
目前,以下组件已更新以支持此级别的优化:
组件 | 批量发布 | 批量订阅 |
---|---|---|
Kafka | 是 | 是 |
Azure Servicebus | 是 | 是 |
Azure Eventhubs | 是 | 是 |
观看以下关于批量pubsub的演示和演讲。