This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

发布/订阅代理组件规范

支持与Dapr接口的发布/订阅代理

下表展示了Dapr发布/订阅模块支持的发布和订阅代理。了解如何为Dapr配置不同的发布/订阅代理。

Table headers to note:

HeaderDescriptionExample
StatusComponent certification statusAlpha
Beta
Stable
Component versionThe version of the componentv1
Since runtime versionThe version of the Dapr runtime when the component status was set or updated1.11

Generic

ComponentStatusComponent versionSince runtime version
Apache KafkaStablev11.5
In-memoryStablev11.7
JetStreamBetav11.10
KubeMQBetav11.10
MQTT3Stablev11.7
PulsarStablev11.10
RabbitMQStablev11.7
Redis StreamsStablev11.0
RocketMQAlphav11.8
Solace-AMQPBetav11.10

Amazon Web Services (AWS)

ComponentStatusComponent versionSince runtime version
AWS SNS/SQSStablev11.10

Google Cloud Platform (GCP)

ComponentStatusComponent versionSince runtime version
GCP Pub/SubStablev11.11

Microsoft Azure

ComponentStatusComponent versionSince runtime version
Azure Event HubsStablev11.8
Azure Service Bus QueuesBetav11.10
Azure Service Bus TopicsStablev11.0

1 - Apache Kafka

Apache Kafka pubsub 组件的详细说明文档

组件格式

要设置 Apache Kafka 的发布/订阅功能,您需要创建一个类型为 pubsub.kafka 的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何:发布和订阅指南 了解如何创建和应用发布/订阅配置。

所有组件的元数据字段值可以使用 模板化的元数据值,这些值会在 Dapr sidecar 启动时解析。例如,您可以选择使用 {namespace} 作为 consumerGroup,以便在不同命名空间中使用相同的 appId 和主题,如 本文 所述。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "{namespace}"
  - name: consumerID # 可选字段。如果未提供,运行时将自动创建。
    value: "channel1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "password"
  - name: saslUsername # 如果 authType 是 `password`,则必需。
    value: "adminuser"
  - name: saslPassword # 如果 authType 是 `password`,则必需。
    secretKeyRef:
      name: kafka-secrets
      key: saslPasswordSecret
  - name: saslMechanism
    value: "SHA-512"
  - name: maxMessageBytes # 可选字段。
    value: 1024
  - name: consumeRetryInterval # 可选字段。
    value: 200ms
  - name: heartbeatInterval # 可选字段。
    value: 5s
  - name: sessionTimeout # 可选字段。
    value: 15s
  - name: version # 可选字段。
    value: 2.0.0
  - name: disableTls # 可选字段。禁用 TLS。在生产环境中不安全!请阅读 `Mutual TLS` 部分以了解如何使用 TLS。
    value: "true"
  - name: consumerFetchMin # 可选字段。高级设置。请求中要获取的最小消息字节数 - broker 将等待直到至少有这么多可用。
    value: 1
  - name: consumerFetchDefault # 可选字段。高级设置。每个请求中从 broker 获取的默认消息字节数。
    value: 2097152
  - name: channelBufferSize # 可选字段。高级设置。内部和外部通道中要缓冲的事件数量。
    value: 512
  - name: schemaRegistryURL # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry URL。
    value: http://localhost:8081
  - name: schemaRegistryAPIKey # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry API Key。
    value: XYAXXAZ
  - name: schemaRegistryAPISecret # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Secret。
    value: "ABCDEFGMEADFF"
  - name: schemaCachingEnabled # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。启用模式缓存。
    value: true
  - name: schemaLatestVersionCacheTTL # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。发布具有最新模式的消息时的模式缓存 TTL。
    value: 5m
  - name: escapeHeaders # 可选字段。
    value: false

有关使用 secretKeyRef 的详细信息,请参阅 如何在组件中引用 secrets 的指南。

规格元数据字段

字段必需详情示例
brokersY逗号分隔的 Kafka brokers 列表。"localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093"
consumerGroupN监听的 kafka 消费者组。发布到主题的每条记录都会传递给订阅该主题的每个消费者组中的一个消费者。如果提供了 consumerGroup 的值,则忽略 consumerID 的任何值 - 将为 consumerID 设置消费者组和随机唯一标识符的组合。"group1"
consumerIDN消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。如果提供了 consumerGroup 的值,则忽略 consumerID 的任何值 - 将为 consumerID 设置消费者组和随机唯一标识符的组合。可以设置为字符串值(例如上例中的 "channel1")或字符串格式值(例如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
clientIDN用户提供的字符串,随每个请求发送到 Kafka brokers,用于日志记录、调试和审计。默认为 Kubernetes 模式的 "namespace.appID" 或 Self-Hosted 模式的 "appID""my-namespace.my-dapr-app""my-dapr-app"
authRequiredN已弃用 启用 SASL 认证与 Kafka brokers。"true""false"
authTypeY配置或禁用认证。支持的值:nonepasswordmtlsoidcawsiam"password""none"
saslUsernameN用于认证的 SASL 用户名。仅在 authType 设置为 "password" 时需要。"adminuser"
saslPasswordN用于认证的 SASL 密码。可以是 secretKeyRef 以使用 secret 引用。仅在 authType 设置为 "password" 时需要。"""KeFg23!"
saslMechanismN您希望使用的 SASL 认证机制。仅在 authType 设置为 "password" 时需要。默认为 PLAINTEXT"SHA-512", "SHA-256", "PLAINTEXT"
initialOffsetN如果没有先前提交的偏移量,则使用的初始偏移量。应为 “newest” 或 “oldest”。默认为 “newest”。"oldest"
maxMessageBytesN允许的单个 Kafka 消息的最大字节大小。默认为 1024。2048
consumeRetryIntervalN尝试消费主题时的重试间隔。将没有后缀的数字视为毫秒。默认为 100ms。200ms
consumeRetryEnabledN通过设置 "false" 禁用消费重试"true""false"
versionNKafka 集群版本。默认为 2.0.0。请注意,如果您使用 Azure EventHubs 和 Kafka,则必须将其设置为 1.0.00.10.2.0
caCertN证书颁发机构证书,使用 TLS 时需要。可以是 secretKeyRef 以使用 secret 引用"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCertN客户端证书,authTypemtls 时需要。可以是 secretKeyRef 以使用 secret 引用"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKeyN客户端密钥,authTypemtls 时需要。可以是 secretKeyRef 以使用 secret 引用"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
skipVerifyN跳过 TLS 验证,不建议在生产中使用。默认为 "false""true""false"
disableTlsN禁用传输安全的 TLS。要禁用,您不需要将值设置为 "true"。不建议在生产中使用。默认为 "false""true""false"
oidcTokenEndpointNOAuth2 身份提供者访问令牌端点的完整 URL。当 authType 设置为 oidc 时需要https://identity.example.com/v1/token"
oidcClientIDN在身份提供者中配置的 OAuth2 客户端 ID。当 authType 设置为 oidc 时需要dapr-kafka
oidcClientSecretN在身份提供者中配置的 OAuth2 客户端 secret:当 authType 设置为 oidc 时需要"KeFg23!"
oidcScopesN用于请求访问令牌的 OAuth2/OIDC 范围的逗号分隔列表。当 authType 设置为 oidc 时推荐。默认为 "openid""openid,kafka-prod"
oidcExtensionsN包含 OAuth2/OIDC 扩展的 JSON 编码字典的字符串,用于请求访问令牌{"cluster":"kafka","poolid":"kafkapool"}
awsRegionN这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘region’。Kafka 集群部署到的 AWS 区域。当 authType 设置为 awsiam 时需要us-west-1
awsAccessKeyN这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘accessKey’。与 IAM 账户关联的 AWS 访问密钥。"accessKey"
awsSecretKeyN这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘secretKey’。与访问密钥关联的 secret 密钥。"secretKey"
awsSessionTokenN这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘sessionToken’。要使用的 AWS 会话令牌。仅在使用临时安全凭证时需要会话令牌。"sessionToken"
awsIamRoleArnN这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘assumeRoleArn’。具有访问 AWS 管理的 Apache Kafka (MSK) 的 IAM 角色。这是除 AWS 凭证外的另一种与 MSK 认证的选项。"arn:aws:iam::123456789:role/mskRole"
awsStsSessionNameN这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘sessionName’。表示假设角色的会话名称。"DaprDefaultSession"
schemaRegistryURLN使用 Schema Registry Avro 序列化/反序列化时需要。Schema Registry URL。http://localhost:8081
schemaRegistryAPIKeyN使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Key。XYAXXAZ
schemaRegistryAPISecretN使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Secret。ABCDEFGMEADFF
schemaCachingEnabledN使用 Schema Registry Avro 序列化/反序列化时。启用模式缓存。默认为 truetrue
schemaLatestVersionCacheTTLN使用 Schema Registry Avro 序列化/反序列化时。发布具有最新模式的消息时的模式缓存 TTL。默认为 5 分钟5m
clientConnectionTopicMetadataRefreshIntervalN客户端连接的主题元数据与 broker 刷新的间隔,以 Go 持续时间表示。默认为 9m"4m"
clientConnectionKeepAliveIntervalN客户端连接与 broker 保持活动的最长时间,以 Go 持续时间表示,然后关闭连接。零值(默认)表示无限期保持活动。"4m"
consumerFetchMinN请求中要获取的最小消息字节数 - broker 将等待直到至少有这么多可用。默认值为 1,因为 0 会导致消费者在没有消息可用时旋转。相当于 JVM 的 fetch.min.bytes"2"
consumerFetchDefaultN每个请求中从 broker 获取的默认消息字节数。默认值为 "1048576" 字节。"2097152"
channelBufferSizeN内部和外部通道中要缓冲的事件数量。这允许生产者和消费者在用户代码工作时继续在后台处理一些消息,从而大大提高吞吐量。默认为 256"512"
heartbeatIntervalN向消费者协调器发送心跳的间隔。最多应将值设置为 sessionTimeout 值的 1/3。默认为 “3s”。"5s"
sessionTimeoutN使用 Kafka 的组管理功能时用于检测客户端故障的超时时间。如果 broker 在此会话超时之前未收到任何来自消费者的心跳,则消费者将被移除并启动重新平衡。默认为 “10s”。"20s"
escapeHeadersN启用对消费者接收到的消息头值的 URL 转义。允许接收通常不允许在 HTTP 头中使用的特殊字符内容。默认为 falsetrue

上面的 secretKeyRef 引用了一个 kubernetes secrets store 以访问 tls 信息。访问 此处 了解有关如何配置 secret store 组件的更多信息。

注意

使用 Azure EventHubs 和 Kafka 时,元数据 version 必须设置为 1.0.0

认证

Kafka 支持多种认证方案,Dapr 支持几种:SASL 密码、mTLS、OIDC/OAuth2。随着添加的认证方法,authRequired 字段已从 v1.6 版本中弃用,取而代之的是 authType 字段。如果 authRequired 设置为 true,Dapr 将尝试根据 saslPassword 的值正确配置 authTypeauthType 的有效值为:

  • none
  • password
  • certificate
  • mtls
  • oidc
  • awsiam

None

authType 设置为 none 将禁用任何认证。这在生产中不推荐

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-noauth
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "none"
  - name: maxMessageBytes # 可选字段。
    value: 1024
  - name: consumeRetryInterval # 可选字段。
    value: 200ms
  - name: heartbeatInterval # 可选字段。
    value: 5s
  - name: sessionTimeout # 可选字段。
    value: 15s
  - name: version # 可选字段。
    value: 0.10.2.0
  - name: disableTls
    value: "true"

SASL 密码

authType 设置为 password 启用 SASL 认证。这需要设置 saslUsernamesaslPassword 字段。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-sasl
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "password"
  - name: saslUsername # 如果 authType 是 `password`,则必需。
    value: "adminuser"
  - name: saslPassword # 如果 authType 是 `password`,则必需。
    secretKeyRef:
      name: kafka-secrets
      key: saslPasswordSecret
  - name: saslMechanism
    value: "SHA-512"
  - name: maxMessageBytes # 可选字段。
    value: 1024
  - name: consumeRetryInterval # 可选字段。
    value: 200ms
  - name: heartbeatInterval # 可选字段。
    value: 5s
  - name: sessionTimeout # 可选字段。
    value: 15s
  - name: version # 可选字段。
    value: 0.10.2.0
  - name: caCert
    secretKeyRef:
      name: kafka-tls
      key: caCert

Mutual TLS

authType 设置为 mtls 使用 x509 客户端证书(clientCert 字段)和密钥(clientKey 字段)进行认证。请注意,mTLS 作为认证机制与通过加密保护传输层的 TLS 使用是不同的。mTLS 需要 TLS 传输(意味着 disableTls 必须为 false),但保护传输层不需要使用 mTLS。请参阅 使用 TLS 进行通信 以配置底层 TLS 传输。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-mtls
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "mtls"
  - name: caCert
    secretKeyRef:
      name: kafka-tls
      key: caCert
  - name: clientCert
    secretKeyRef:
      name: kafka-tls
      key: clientCert
  - name: clientKey
    secretKeyRef:
      name: kafka-tls
      key: clientKey
  - name: maxMessageBytes # 可选字段。
    value: 1024
  - name: consumeRetryInterval # 可选字段。
    value: 200ms
  - name: heartbeatInterval # 可选字段。
    value: 5s
  - name: sessionTimeout # 可选字段。
    value: 15s
  - name: version # 可选字段。
    value: 0.10.2.0

OAuth2 或 OpenID Connect

authType 设置为 oidc 启用通过 OAUTHBEARER 机制的 SASL 认证。这支持从外部 OAuth2 或 OIDC 身份提供者指定一个持有者令牌。目前,仅支持 client_credentials 授权。

配置 oidcTokenEndpoint 为身份提供者访问令牌端点的完整 URL。

设置 oidcClientIDoidcClientSecret 为在身份提供者中配置的客户端凭证。

如果在组件配置中指定了 caCert,则证书将附加到系统 CA 信任中以验证身份提供者证书。同样,如果在组件配置中指定了 skipVerify,则在访问身份提供者时也将跳过验证。

默认情况下,令牌请求的唯一范围是 openid;强烈建议通过 oidcScopes 以逗号分隔的列表指定其他范围,并由 Kafka broker 验证。如果不使用其他范围来缩小访问令牌的有效性, 被破坏的 Kafka broker 可能会重放令牌以访问其他服务作为 Dapr clientID。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "oidc"
  - name: oidcTokenEndpoint # 如果 authType 是 `oidc`,则必需。
    value: "https://identity.example.com/v1/token"
  - name: oidcClientID      # 如果 authType 是 `oidc`,则必需。
    value: "dapr-myapp"
  - name: oidcClientSecret  # 如果 authType 是 `oidc`,则必需。
    secretKeyRef:
      name: kafka-secrets
      key: oidcClientSecret
  - name: oidcScopes        # 如果 authType 是 `oidc`,则推荐。
    value: "openid,kafka-dev"
  - name: caCert            # 也应用于验证 OIDC 提供者证书
    secretKeyRef:
      name: kafka-tls
      key: caCert
  - name: maxMessageBytes # 可选字段。
    value: 1024
  - name: consumeRetryInterval # 可选字段。
    value: 200ms
  - name: heartbeatInterval # 可选字段。
    value: 5s
  - name: sessionTimeout # 可选字段。
    value: 15s
  - name: version # 可选字段。
    value: 0.10.2.0

AWS IAM

支持使用 MSK 进行 AWS IAM 认证。将 authType 设置为 awsiam 使用 AWS SDK 生成认证令牌进行认证。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-awsiam
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "awsiam"
  - name: region # 必需字段。
    value: "us-west-1"
  - name: accessKey # 可选字段。
    value: <AWS_ACCESS_KEY>
  - name: secretKey # 可选字段。
    value: <AWS_SECRET_KEY>
  - name: sessionToken # 可选字段。
    value: <AWS_SESSION_KEY>
  - name: assumeRoleArn # 可选字段。
    value: "arn:aws:iam::123456789:role/mskRole"
  - name: sessionName # 可选字段。
    value: "DaprDefaultSession"

使用 TLS 进行通信

默认情况下,启用 TLS 以保护到 Kafka 的传输层。要禁用 TLS,请将 disableTls 设置为 true。启用 TLS 时,您可以 使用 skipVerify 控制服务器证书验证以禁用验证(不推荐在生产环境中使用)和 caCert 指定受信任的 TLS 证书颁发机构(CA)。如果没有指定 caCert,将使用系统 CA 信任。要配置 mTLS 认证, 请参阅 认证 部分。 下面是一个配置为使用传输层 TLS 的 Kafka pubsub 组件示例:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "certificate"
  - name: consumeRetryInterval # 可选字段。
    value: 200ms
  - name: heartbeatInterval # 可选字段。
    value: 5s
  - name: sessionTimeout # 可选字段。
    value: 15s
  - name: version # 可选字段。
    value: 0.10.2.0
  - name: maxMessageBytes # 可选字段。
    value: 1024
  - name: caCert # 证书颁发机构证书。
    secretKeyRef:
      name: kafka-tls
      key: caCert
auth:
  secretStore: <SECRET_STORE_NAME>

从多个主题消费

当使用单个 pub/sub 组件从多个主题消费时,无法保证您的消费者组中的消费者如何在主题分区之间平衡。

例如,假设您订阅了两个主题,每个主题有 10 个分区,并且您有 20 个服务副本从这两个主题消费。无法保证 10 个将分配给第一个主题,10 个将分配给第二个主题。相反,分区可能会不均匀地划分,超过 10 个分配给第一个主题,其余分配给第二个主题。

这可能导致第一个主题的消费者空闲,而第二个主题的消费者过度扩展,反之亦然。当使用自动缩放器(如 HPA 或 KEDA)时,也可以观察到这种行为。

如果您遇到此特定问题,建议您为每个主题配置一个单独的 pub/sub 组件,并为每个组件定义唯一的消费者组。这可以确保您的服务的所有副本都完全分配给唯一的消费者组,其中每个消费者组针对一个特定主题。

例如,您可以定义两个 Dapr 组件,具有以下配置:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-topic-one
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: consumerGroup
    value: "{appID}-topic-one"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-topic-two
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: consumerGroup
    value: "{appID}-topic-two"

发送和接收多条消息

Apache Kafka 组件支持使用批量 Pub/sub API 在单个操作中发送和接收多条消息。

配置批量订阅

订阅主题时,您可以配置 bulkSubscribe 选项。有关更多详细信息,请参阅 批量订阅消息。了解更多关于 批量订阅 API 的信息。

Apache Kafka 支持以下批量元数据选项:

配置默认值
maxBulkAwaitDurationMs10000 (10s)
maxBulkSubCount80

每次调用的元数据字段

分区键

调用 Kafka pub/sub 时,可以通过在请求 URL 中使用 metadata 查询参数提供可选的分区键。

参数名称可以是 partitionKey__key

示例:

curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

消息头

所有其他元数据键/值对(不是 partitionKey__key)都设置为 Kafka 消息中的头。以下是为消息设置 correlationId 的示例。

curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

Kafka Pubsub 在消费者端接收到的特殊消息头

消费消息时,特殊消息元数据会自动作为头传递。这些是:

  • __key:如果可用,消息键
  • __topic:消息的主题
  • __partition:消息的分区号
  • __offset:消息在分区中的偏移量
  • __timestamp:消息的时间戳

您可以在消费者端点中访问它们,如下所示:

from fastapi import APIRouter, Body, Response, status
import json
import sys

app = FastAPI()

router = APIRouter()


@router.get('/dapr/subscribe')
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'my-topic',
                      'route': 'my_topic_subscriber',
                      }]
    return subscriptions

@router.post('/my_topic_subscriber')
def my_topic_subscriber(
      key: Annotated[str, Header(alias="__key")],
      offset: Annotated[int, Header(alias="__offset")],
      event_data=Body()):
    print(f"key={key} - offset={offset} - data={event_data}", flush=True)
      return Response(status_code=status.HTTP_200_OK)

app.include_router(router)

接收带有特殊字符的消息头

消费者应用程序可能需要接收包含特殊字符的消息头,这可能会导致 HTTP 协议验证错误。 HTTP 头值必须遵循规范,使得某些字符不被允许。了解更多关于协议的信息。 在这种情况下,您可以启用 escapeHeaders 配置设置,该设置使用 URL 转义在消费者端对头值进行编码。

escapeHeaders 设置为 true 以进行 URL 转义。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-escape-headers
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # 必需字段,Kafka broker 连接设置
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # 可选字段,用于输入绑定。
    value: "group1"
  - name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
    value: "my-dapr-app-id"
  - name: authType # 必需字段。
    value: "none"
  - name: escapeHeaders
    value: "true"

Avro Schema Registry 序列化/反序列化

您可以配置 pub/sub 以使用 Avro 二进制序列化 发布或消费数据,利用 Apache Schema Registry(例如,Confluent Schema RegistryApicurio)。

配置

配置 Kafka pub/sub 组件元数据时,您必须定义:

  • Schema Registry URL
  • API key/secret(如果适用)

模式主题是根据主题名称自动派生的,使用标准命名约定。例如,对于名为 my-topic 的主题,模式主题将是 my-topic-value。 在服务中与消息负载交互时,它是 JSON 格式。负载在 Dapr 组件中透明地序列化/反序列化。 日期/日期时间字段必须作为其 Epoch Unix 时间戳 等效值传递(而不是典型的 Iso8601)。例如:

  • 2024-01-10T04:36:05.986Z 应传递为 1704861365986(自 1970 年 1 月 1 日以来的毫秒数)
  • 2024-01-10 应传递为 19732(自 1970 年 1 月 1 日以来的天数)

发布 Avro 消息

为了向 Kafka pub/sub 组件指示消息应使用 Avro 序列化,必须在 metadata 中设置 valueSchemaTypeAvro

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order_number': '345',
        'created_date': 1704861365986
    }
    # 创建一个带有内容类型和主体的类型化消息
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic_name='my-topic',
        data=json.dumps(req_data),
        publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
    )
    # 打印请求
    print(req_data, flush=True)

订阅 Avro 主题

为了向 Kafka pub/sub 组件指示消息应使用 Avro 进行反序列化,必须在订阅元数据中设置 valueSchemaTypeAvro

from fastapi import APIRouter, Body, Response, status
import json
import sys

app = FastAPI()

router = APIRouter()


@router.get('/dapr/subscribe')
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'my-topic',
                      'route': 'my_topic_subscriber',
                      'metadata': {
                          'valueSchemaType': 'Avro',
                      } }]
    return subscriptions

@router.post('/my_topic_subscriber')
def my_topic_subscriber(event_data=Body()):
    print(event_data, flush=True)
      return Response(status_code=status.HTTP_200_OK)

app.include_router(router)

创建一个 Kafka 实例

您可以使用 这个 Docker 镜像在本地运行 Kafka。 要在没有 Docker 的情况下运行,请参阅 此处 的入门指南。

要在 Kubernetes 上运行 Kafka,您可以使用任何 Kafka operator,例如 Strimzi

相关链接

2 - AWS SNS/SQS

关于AWS SNS/SQS pubsub组件的详细文档

组件格式

要设置AWS SNS/SQS pub/sub,创建一个类型为pubsub.aws.snssqs的组件。

默认情况下,AWS SNS/SQS组件会:

  • 创建SNS主题
  • 配置SQS队列
  • 设置队列到主题的订阅
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: snssqs-pubsub
spec:
  type: pubsub.aws.snssqs
  version: v1
  metadata:
    - name: accessKey
      value: "AKIAIOSFODNN7EXAMPLE"
    - name: secretKey
      value: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    - name: region
      value: "us-east-1"
    # - name: consumerID # 可选。如果未提供,运行时将创建一个。
    #   value: "channel1"
    # - name: endpoint # 可选。
    #   value: "http://localhost:4566"
    # - name: sessionToken  # 可选(如果使用AssignedRole则必须;例如,临时accessKey和secretKey)
    #   value: "TOKEN"
    # - name: messageVisibilityTimeout # 可选
    #   value: 10
    # - name: messageRetryLimit # 可选
    #   value: 10
    # - name: messageReceiveLimit # 可选
    #   value: 10
    # - name: sqsDeadLettersQueueName # 可选
    # - value: "myapp-dlq"
    # - name: messageWaitTimeSeconds # 可选
    #   value: 1
    # - name: messageMaxNumber # 可选
    #   value: 10
    # - name: fifo # 可选
    #   value: "true"
    # - name: fifoMessageGroupID # 可选
    #   value: "app1-mgi"
    # - name: disableEntityManagement # 可选
    #   value: "false"
    # - name: disableDeleteOnRetryLimit # 可选
    #   value: "false"
    # - name: assetsManagementTimeoutSeconds # 可选
    #   value: 5
    # - name: concurrencyMode # 可选
    #   value: "single"
    # - name: concurrencyLimit # 可选
    #   value: "0"

规格元数据字段

字段必需详情示例
accessKeyY具有适当权限的AWS账户/角色的ID,用于SNS和SQS(见下文)"AKIAIOSFODNN7EXAMPLE"
secretKeyYAWS用户/角色的secret。如果使用AssumeRole访问,还需要提供sessionToken"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
regionYSNS/SQS资产所在或将创建的AWS区域。请参阅此页面以获取有效区域。确保SNS和SQS在该区域可用"us-east-1"
consumerIDN消费者ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者ID的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供consumerID,Dapr运行时将其设置为Dapr应用程序ID(appID)值。请参阅pub/sub broker组件文件以了解如何自动生成ConsumerID。可以设置为字符串值(如上例中的"channel1")或字符串格式值(如"{podName}"等)。查看您可以在组件元数据中使用的所有模板标签。
endpointN组件使用的AWS端点。仅用于本地开发,例如使用localstack。在生产AWS上运行时不需要endpoint"http://localhost:4566"
sessionTokenN要使用的AWS会话令牌。仅在使用临时安全凭证时需要会话令牌"TOKEN"
messageReceiveLimitN消息接收的次数,在处理该消息失败后,一旦达到该次数,将导致从队列中删除该消息。如果指定了sqsDeadLettersQueueNamemessageReceiveLimit是消息接收的次数,在处理该消息失败后,一旦达到该次数,将导致将消息移动到SQS死信队列。默认值:1010
sqsDeadLettersQueueNameN此应用程序的死信队列的名称"myapp-dlq"
messageVisibilityTimeoutN消息在发送给订阅者后从接收请求中隐藏的时间(以秒为单位)。默认值:1010
messageRetryLimitN在处理消息失败后重新发送消息的次数,然后从队列中删除该消息。默认值:1010
messageWaitTimeSecondsN调用等待消息到达队列的持续时间(以秒为单位),然后返回。如果有消息可用,调用会比messageWaitTimeSeconds更早返回。如果没有消息可用且等待时间到期,调用会成功返回一个空消息列表。默认值:11
messageMaxNumberN一次从队列中接收的最大消息数。默认值:10,最大值:1010
fifoN使用SQS FIFO队列提供消息排序和去重。默认值:"false"。有关SQS FIFO的更多详细信息"true""false"
fifoMessageGroupIDN如果启用了fifo,指示Dapr为pubsub部署使用自定义消息组ID。这不是强制性的,因为Dapr为每个生产者创建一个自定义消息组ID,从而确保每个Dapr生产者的消息排序。默认值:"""app1-mgi"
disableEntityManagementN当设置为true时,SNS主题、SQS队列和SQS到SNS的订阅不会自动创建。默认值:"false""true""false"
disableDeleteOnRetryLimitN当设置为true时,在重试并失败messageRetryLimit次处理消息后,重置消息可见性超时,以便其他消费者可以尝试处理,而不是从SQS中删除消息(默认行为)。默认值:"false""true""false"
assetsManagementTimeoutSecondsNAWS资产管理操作的超时时间(以秒为单位),在超时并取消之前。资产管理操作是对STS、SNS和SQS执行的任何操作,除了实现默认Dapr组件重试行为的消息发布和消费操作。该值可以设置为任何非负浮点数/整数。默认值:50.510
concurrencyModeN当从SQS批量接收消息时,按顺序调用订阅者(一次“单个”消息),或并发调用(“并行”)。默认值:"parallel""single""parallel"
concurrencyLimitN定义处理消息的最大并发工作者数量。当concurrencyMode设置为"single"时,此值被忽略。要避免限制并发工作者的数量,请将其设置为0。默认值:0100

其他信息

符合AWS规范

Dapr创建的SNS主题和SQS队列名称符合AWS规范。默认情况下,Dapr根据消费者app-id创建SQS队列名称,因此Dapr可能会执行名称标准化以符合AWS规范。

SNS/SQS组件行为

当pub/sub SNS/SQS组件配置SNS主题时,SQS队列和订阅在组件代表消息生产者(没有订阅者应用程序部署)操作的情况下,与存在订阅者应用程序(没有发布者部署)的情况下表现不同。

由于SNS在没有SQS订阅的情况下的工作方式_仅发布者设置_,SQS队列和订阅表现为依赖于订阅者监听主题消息的“经典”pub/sub系统。没有这些订阅者,消息:

  • 无法传递并有效地丢弃
  • 不可用于未来的订阅者(当订阅者最终订阅时没有消息重播)

SQS FIFO

根据AWS规范,使用SQS FIFO(fifo元数据字段设置为"true")提供消息排序和去重,但会导致较低的SQS处理吞吐量,以及其他注意事项。

指定fifoMessageGroupID限制FIFO队列的并发消费者数量为1,但保证应用程序的Dapr sidecar发布的消息的全局排序。请参阅这篇AWS博客文章以更好地理解消息组ID和FIFO队列的主题。

为了避免丢失传递给消费者的消息顺序,SQS组件的FIFO配置要求将concurrencyMode元数据字段设置为"single"

默认并行concurrencyMode

自v1.8.0以来,组件支持"parallel" concurrencyMode作为其默认模式。在之前的版本中,组件的默认行为是一次调用订阅者一个消息并等待其响应。

SQS死信队列

在使用SQS死信队列配置PubSub组件时,元数据字段messageReceiveLimitsqsDeadLettersQueueName必须都设置为一个值。对于messageReceiveLimit,值必须大于0,而sqsDeadLettersQueueName不能是空字符串。

SNS/SQS与Dapr的争用

从根本上说,SNS通过为这些主题创建SQS订阅,将来自多个发布者主题的消息聚合到一个SQS队列中。作为订阅者,SNS/SQS pub/sub组件从该唯一的SQS队列中消费消息。

然而,像任何SQS消费者一样,组件无法选择性地检索其特定订阅的SNS主题发布的消息。这可能导致组件接收到没有关联处理程序的主题发布的消息。通常,这发生在:

  • 组件初始化: 如果基础设施订阅在组件订阅处理程序之前准备好,或
  • 关闭: 如果组件处理程序在基础设施订阅之前被移除。

由于此问题影响任何多个SNS主题的SQS消费者,组件无法防止从缺少处理程序的主题中消费消息。当这种情况发生时,组件会记录一个错误,指示这些消息被错误地检索。

在这些情况下,未处理的消息将在每次拉取后以其接收计数递减的状态重新出现在SQS中。因此,存在未处理的消息可能超过其messageReceiveLimit并丢失的风险。

创建SNS/SQS实例

对于本地开发,localstack项目用于集成AWS SNS/SQS。按照这些说明运行localstack。

要从命令行使用Docker本地运行localstack,请应用以下命令:

docker run --rm -it -p 4566:4566 -p 4571:4571 -e SERVICES="sts,sns,sqs" -e AWS_DEFAULT_REGION="us-east-1" localstack/localstack

为了在您的pub/sub绑定中使用localstack,您需要在组件元数据中提供endpoint配置。在生产AWS上运行时不需要endpoint

请参阅认证到AWS以获取有关认证相关属性的信息。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: snssqs-pubsub
spec:
  type: pubsub.aws.snssqs
  version: v1
  metadata:
    - name: accessKey
      value: "anyString"
    - name: secretKey
      value: "anyString"
    - name: endpoint
      value: http://localhost:4566
    # 使用us-east-1或提供给localstack的任何其他区域,由"AWS_DEFAULT_REGION"环境变量定义
    - name: region
      value: us-east-1

要在Kubernetes上运行localstack,您可以应用以下配置。然后可以通过DNS名称http://localstack.default.svc.cluster.local:4566(假设这是应用于默认命名空间)访问localstack,应将其用作endpoint

apiVersion: apps/v1
kind: Deployment
metadata:
  name: localstack
spec:
  # 使用选择器,我们将公开正在运行的部署
  # 这就是Kubernetes知道给定服务属于部署的方式
  selector:
    matchLabels:
      app: localstack
  replicas: 1
  template:
    metadata:
      labels:
        app: localstack
    spec:
      containers:
      - name: localstack
        image: localstack/localstack:latest
        ports:
          # 暴露边缘端点
          - containerPort: 4566
---
kind: Service
apiVersion: v1
metadata:
  name: localstack
  labels:
    app: localstack
spec:
  selector:
    app: localstack
  ports:
  - protocol: TCP
    port: 4566
    targetPort: 4566
  type: LoadBalancer

为了在AWS中运行,创建或分配一个具有SNS和SQS服务权限的IAM用户,策略如下:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "YOUR_POLICY_NAME",
      "Effect": "Allow",
      "Action": [
        "sns:CreateTopic",
        "sns:GetTopicAttributes",
        "sns:ListSubscriptionsByTopic",
        "sns:Publish",
        "sns:Subscribe",
        "sns:TagResource",
        "sqs:ChangeMessageVisibility",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ReceiveMessage",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue"
      ],
      "Resource": [
        "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:*",
        "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:*"
      ]
    }
  ]
}

AWS账户IDAWS账户secret插入组件元数据中的accessKeysecretKey,使用Kubernetes secret和secretKeyRef

或者,假设您希望使用自己的工具(例如Terraform)来配置SNS和SQS资产,同时防止Dapr动态执行此操作。您需要启用disableEntityManagement并为使用Dapr的应用程序分配一个IAM角色,策略如下:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "YOUR_POLICY_NAME",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sns:Publish",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes"

      ],
      "Resource": [
        "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:APP_TOPIC_NAME",
        "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:APP_ID"
      ]
    }
  ]
}

在上述示例中,您在EKS集群上运行应用程序,并进行动态资产创建(默认Dapr行为)。

相关链接

3 - Azure Event Hubs

Azure Event Hubs pubsub 组件的详细文档

组件格式

要配置 Azure Event Hubs 的发布/订阅功能,请创建一个类型为 pubsub.azure.eventhubs 的组件。有关 ConsumerID 自动生成的详细信息,请参阅 pub/sub broker 组件文件。要了解如何创建和应用 pub/sub 配置,请阅读 发布和订阅指南

除了下文列出的配置元数据字段,Azure Event Hubs 还支持 Azure 身份验证 机制。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    # connectionString 和 eventHubNamespace 必须二选一
    # 不使用 Microsoft Entra ID 时使用 connectionString
    - name: connectionString
      value: "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}"
    # 使用 Microsoft Entra ID 时使用 eventHubNamespace
    - name: eventHubNamespace
      value: "namespace"
    - name: consumerID # 可选。如果未提供,运行时将使用 Dapr 应用程序 ID (`appID`)。
      value: "channel1"
    - name: enableEntityManagement
      value: "false"
    - name: enableInOrderMessageDelivery
      value: "false"
    # 仅当 enableEntityManagement 设置为 true 时才需要以下四个属性
    - name: resourceGroupName
      value: "test-rg"
    - name: subscriptionID
      value: "Azure 订阅 ID 的值"
    - name: partitionCount
      value: "1"
    - name: messageRetentionInDays
      value: "3"
    # 检查点存储属性
    - name: storageAccountName
      value: "myeventhubstorage"
    - name: storageAccountKey
      value: "112233445566778899"
    - name: storageContainerName
      value: "myeventhubstoragecontainer"
    # 传递 storageAccountKey 的替代方法
    - name: storageConnectionString
      value: "DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<account-key>"

规格元数据字段

字段必需详情示例
connectionString是*Event Hub 或 Event Hub 命名空间的连接字符串。
* 与 eventHubNamespace 字段互斥。
* 不使用 Microsoft Entra ID 身份验证 时必需
"Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}""Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key}"
eventHubNamespace是*Event Hub 命名空间名称。
* 与 connectionString 字段互斥。
* 使用 Microsoft Entra ID 身份验证 时必需
"namespace"
consumerID消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看可以在组件元数据中使用的所有模板标签。
enableEntityManagement布尔值,允许管理 EventHub 命名空间和存储帐户。默认值:false"true", "false"
enableInOrderMessageDelivery布尔值,允许消息按发布顺序传递。这假设在发布或发布时设置了 partitionKey 以确保跨分区的顺序。默认值:false"true", "false"
storageAccountName用于检查点存储的存储帐户名称。"myeventhubstorage"
storageAccountKey是*检查点存储帐户的存储帐户密钥。
* 使用 Microsoft Entra ID 时,如果服务主体也有权访问存储帐户,则可以省略此项。
"112233445566778899"
storageConnectionString是*检查点存储的连接字符串,指定 storageAccountKey 的替代方法"DefaultEndpointsProtocol=https;AccountName=myeventhubstorage;AccountKey=<account-key>"
storageContainerName存储帐户名称的存储容器名称。"myeventhubstoragecontainer"
resourceGroupNameEvent Hub 命名空间所属的资源组名称。启用实体管理时必需"test-rg"
subscriptionIDAzure 订阅 ID 值。启用实体管理时必需"azure subscription id"
partitionCount新 Event Hub 命名空间的分区数。仅在启用实体管理时使用。默认值:"1""2"
messageRetentionInDays在新创建的 Event Hub 命名空间中保留消息的天数。仅在启用实体管理时使用。默认值:"1""90"

Microsoft Entra ID 身份验证

Azure Event Hubs pub/sub 组件支持使用所有 Microsoft Entra ID 机制进行身份验证。有关更多信息以及根据选择的 Microsoft Entra ID 身份验证机制提供的相关组件元数据字段,请参阅 Azure 身份验证文档

示例配置

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    # 使用 Azure 身份验证
    - name: azureTenantId
      value: "***"
    - name: azureClientId
      value: "***"
    - name: azureClientSecret
      value: "***"
    - name: eventHubNamespace 
      value: "namespace"
    - name: enableEntityManagement
      value: "false"
    # 仅当 enableEntityManagement 设置为 true 时才需要以下四个属性
    - name: resourceGroupName
      value: "test-rg"
    - name: subscriptionID
      value: "Azure 订阅 ID 的值"
    - name: partitionCount
      value: "1"
    - name: messageRetentionInDays
    # 检查点存储属性
    # 在这种情况下,我们也使用 Microsoft Entra ID 访问存储帐户
    - name: storageAccountName
      value: "myeventhubstorage"
    - name: storageContainerName
      value: "myeventhubstoragecontainer"

发送和接收多条消息

Azure Eventhubs 支持使用批量 pub/sub API 在单个操作中发送和接收多条消息。

配置批量发布

要设置批量发布操作的元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 API 参考中所述

元数据默认值
metadata.maxBulkPubBytes1000000

配置批量订阅

订阅主题时,可以配置 bulkSubscribe 选项。请参阅 批量订阅消息 了解更多详细信息,并了解 批量订阅 API

配置默认值
maxMessagesCount100
maxAwaitDurationMs10000

配置检查点频率

订阅主题时,可以通过 在 HTTP 或 gRPC 订阅请求中设置元数据 来配置分区中的检查点频率。此元数据允许在分区事件序列中配置的事件数量后进行检查点。通过将频率设置为 0 来禁用检查点。

了解更多关于检查点的信息

元数据默认值
metadata.checkPointFrequencyPerPartition1

以下示例显示了一个使用 checkPointFrequencyPerPartition 元数据的 声明性订阅 示例订阅文件。同样,您也可以在 编程订阅 中传递元数据。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
  metadata:
    checkPointFrequencyPerPartition: 1
scopes:
- orderprocessing
- checkout

创建 Azure Event Hub

按照 文档 中的说明设置 Azure Event Hubs。

由于此组件使用 Azure 存储作为检查点存储,您还需要一个 Azure 存储帐户。按照 文档 中的说明管理存储帐户访问密钥。

请参阅 文档 了解如何获取 Event Hubs 连接字符串(注意这不是 Event Hubs 命名空间的连接字符串)。

为每个订阅者创建消费者组

对于每个想要订阅事件的 Dapr 应用程序,创建一个以 Dapr 应用程序 ID 命名的 Event Hubs 消费者组。例如,在 Kubernetes 上运行的 Dapr 应用程序,其 dapr.io/app-id: "myapp" 将需要一个名为 myapp 的 Event Hubs 消费者组。

注意:Dapr 将消费者组的名称传递给 Event Hub,因此这不在元数据中提供。

实体管理

当在元数据中启用实体管理时,只要应用程序具有操作 Event Hub 命名空间的正确角色和权限,Dapr 就可以自动为您创建 Event Hub 和消费者组。

Event Hub 名称是发布或订阅请求中的 topic 字段,而消费者组名称是订阅给定 Event Hub 的 Dapr 应用程序的名称。例如,在 Kubernetes 上运行的 Dapr 应用程序,其名称为 dapr.io/app-id: "myapp" 需要一个名为 myapp 的 Event Hubs 消费者组。

实体管理仅在使用 Microsoft Entra ID 身份验证 且不使用连接字符串时才可能。

Dapr 将消费者组的名称传递给 Event Hub,因此这不在元数据中提供。

订阅 Azure IoT Hub 事件

Azure IoT Hub 提供了一个 与 Event Hubs 兼容的端点,因此 Azure Event Hubs pubsub 组件也可以用于订阅 Azure IoT Hub 事件。

由 Azure IoT Hub 设备创建的设备到云事件将包含额外的 IoT Hub 系统属性,Dapr 的 Azure Event Hubs pubsub 组件将在响应元数据中返回以下内容:

系统属性名称描述和路由查询关键字
iothub-connection-auth-generation-id发送消息的设备的 connectionDeviceGenerationId。请参阅 IoT Hub 设备身份属性
iothub-connection-auth-method发送消息的设备使用的 connectionAuthMethod
iothub-connection-device-id发送消息的设备的 deviceId。请参阅 IoT Hub 设备身份属性
iothub-connection-module-id发送消息的设备的 moduleId。请参阅 IoT Hub 设备身份属性
iothub-enqueuedtime设备到云消息被 IoT Hub 接收的 enqueuedTime,格式为 RFC3339。
message-id用户可设置的 AMQP messageId

例如,传递的 HTTP 订阅消息的头将包含:

{
  'user-agent': 'fasthttp',
  'host': '127.0.0.1:3000',
  'content-type': 'application/json',
  'content-length': '120',
  'iothub-connection-device-id': 'my-test-device',
  'iothub-connection-auth-generation-id': '637618061680407492',
  'iothub-connection-auth-method': '{"scope":"module","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}',
  'iothub-connection-module-id': 'my-test-module-a',
  'iothub-enqueuedtime': '2021-07-13T22:08:09Z',
  'message-id': 'my-custom-message-id',
  'x-opt-sequence-number': '35',
  'x-opt-enqueued-time': '2021-07-13T22:08:09Z',
  'x-opt-offset': '21560',
  'traceparent': '00-4655608164bc48b985b42d39865f3834-ed6cf3697c86e7bd-01'
}

相关链接

4 - Azure Service Bus Topics

Azure Service Bus Topics pubsub 组件的详细文档

组件格式

要配置 Azure Service Bus Topics pub/sub,需创建一个类型为 pubsub.azure.servicebus.topics 的组件。请参考 pub/sub broker 组件文件 了解 ConsumerID 的自动生成方式。阅读 发布和订阅指南 以获取创建和应用 pub/sub 配置的步骤。

此组件使用 Azure Service Bus 的主题功能;请查看官方文档了解 主题和队列 的区别。
如需使用队列,请参阅 Azure Service Bus Queues pubsub 组件

连接字符串认证

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
  # 不使用 Microsoft Entra ID 认证时必需
  - name: connectionString
    value: "Endpoint=sb://{ServiceBusNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={ServiceBus}"
  # - name: consumerID # 可选:默认为应用程序自身的 ID
  #   value: channel1 
  # - name: timeoutInSec # 可选
  #   value: 60
  # - name: handlerTimeoutInSec # 可选
  #   value: 60
  # - name: disableEntityManagement # 可选
  #   value: "false"
  # - name: maxDeliveryCount # 可选
  #   value: 3
  # - name: lockDurationInSec # 可选
  #   value: 60
  # - name: lockRenewalInSec # 可选
  #   value: 20
  # - name: maxActiveMessages # 可选
  #   value: 10000
  # - name: maxConcurrentHandlers # 可选
  #   value: 10
  # - name: defaultMessageTimeToLiveInSec # 可选
  #   value: 10
  # - name: autoDeleteOnIdleInSec # 可选
  #   value: 3600
  # - name: minConnectionRecoveryInSec # 可选
  #   value: 2
  # - name: maxConnectionRecoveryInSec # 可选
  #   value: 300
  # - name: maxRetriableErrorsPerSec # 可选
  #   value: 10
  # - name: publishMaxRetries # 可选
  #   value: 5
  # - name: publishInitialRetryIntervalInMs # 可选
  #   value: 500

注意: 上述设置适用于使用此组件的所有主题。

规格元数据字段

字段必需详情示例
connectionStringService Bus 的共享访问策略连接字符串。除非使用 Microsoft Entra ID 认证,否则必需。见上例
namespaceName设置 Service Bus 命名空间地址的参数,作为完全限定的域名。使用 Microsoft Entra ID 认证时必需。"namespace.servicebus.windows.net"
consumerID消费者 ID 用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
timeoutInSec发送消息和管理操作的超时时间。默认:6030
handlerTimeoutInSec调用应用程序处理程序的超时时间。默认:6030
lockRenewalInSec定义缓冲消息锁将被续订的频率。默认:2020
maxActiveMessages定义一次处理或缓冲的最大消息数。此值应至少与最大并发处理程序一样大。默认:10002000
maxConcurrentHandlers定义最大并发消息处理程序数。默认:0(无限制)10
disableEntityManagement设置为 true 时,队列和订阅不会自动创建。默认:"false""true""false"
defaultMessageTimeToLiveInSec默认消息生存时间,以秒为单位。仅在订阅创建期间使用。10
autoDeleteOnIdleInSec在自动删除空闲订阅之前等待的时间,以秒为单位。仅在订阅创建期间使用。必须为 300 秒或更长。默认:0(禁用)3600
maxDeliveryCount定义服务器尝试传递消息的次数。仅在订阅创建期间使用。由服务器设置默认值。10
lockDurationInSec定义消息在过期前被锁定的时间长度,以秒为单位。仅在订阅创建期间使用。由服务器设置默认值。30
minConnectionRecoveryInSec在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最小间隔(以秒为单位)。默认:25
maxConnectionRecoveryInSec在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最大间隔(以秒为单位)。每次尝试后,组件在最小和最大之间等待一个随机秒数,每次增加。默认:300(5 分钟)600
maxRetriableErrorsPerSec每秒处理的最大可重试错误数。如果消息因可重试错误而无法处理,组件会在开始处理另一条消息之前添加延迟,以避免立即重新处理失败的消息。默认:1010
publishMaxRetries当 Azure Service Bus 响应“过于繁忙”以限制消息时的最大重试次数。默认:55
publishInitialRetryIntervalInMs当 Azure Service Bus 限制消息时,初始指数退避的时间(以毫秒为单位)。默认:500500

Microsoft Entra ID 认证

Azure Service Bus Topics pubsub 组件支持使用所有 Microsoft Entra ID 机制进行认证,包括托管身份。有关更多信息以及根据选择的 Microsoft Entra ID 认证机制提供的相关组件元数据字段,请参阅 Azure 认证文档

示例配置

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
  - name: namespaceName
    # 使用 Azure 认证时必需。
    # 必须是完全限定的域名
    value: "servicebusnamespace.servicebus.windows.net"
  - name: azureTenantId
    value: "***"
  - name: azureClientId
    value: "***"
  - name: azureClientSecret
    value: "***"

消息元数据

Azure Service Bus 消息通过附加上下文元数据扩展了 Dapr 消息格式。一些元数据字段由 Azure Service Bus 自行设置(只读),其他字段可以在发布消息时由客户端设置。

发送带有元数据的消息

要在发送消息时设置 Azure Service Bus 元数据,请在 HTTP 请求上设置查询参数或 gRPC 元数据,如此处所述。

  • metadata.MessageId
  • metadata.CorrelationId
  • metadata.SessionId
  • metadata.Label
  • metadata.ReplyTo
  • metadata.PartitionKey
  • metadata.To
  • metadata.ContentType
  • metadata.ScheduledEnqueueTimeUtc
  • metadata.ReplyToSessionId

注意: metadata.MessageId 属性不会设置 Dapr 返回的云事件的 id 属性,应单独处理。

注意: 如果未设置 metadata.SessionId 属性,但主题需要会话,则将使用空会话 ID。

注意: metadata.ScheduledEnqueueTimeUtc 属性支持 RFC1123RFC3339 时间戳格式。

接收带有元数据的消息

当 Dapr 调用您的应用程序时,它将使用 HTTP 头或 gRPC 元数据将 Azure Service Bus 消息元数据附加到请求中。 除了上述可设置的元数据外,您还可以访问以下只读消息元数据。

  • metadata.DeliveryCount
  • metadata.LockedUntilUtc
  • metadata.LockToken
  • metadata.EnqueuedTimeUtc
  • metadata.SequenceNumber

要了解这些元数据属性的详细用途,请参阅官方 Azure Service Bus 文档

此外,原始 Azure Service Bus 消息的所有 ApplicationProperties 条目都作为 metadata.<application property's name> 附加。

注意:所有时间均由服务器填充,并未调整时钟偏差。

订阅启用会话的主题

要订阅启用会话的主题,您可以在订阅元数据中提供以下属性。

  • requireSessions (默认: false)
  • sessionIdleTimeoutInSec (默认: 60)
  • maxConcurrentSessions (默认: 8)

为主题创建 Azure Service Bus broker

请按照此处的说明设置 Azure Service Bus Topics。

相关链接

5 - Azure Service Bus 队列

关于 Azure Service Bus 队列发布/订阅组件的详细文档

组件格式

要配置 Azure Service Bus 队列的发布/订阅功能,创建一个类型为 pubsub.azure.servicebus.queues 的组件。请参考 发布/订阅代理组件文件 了解 ConsumerID 是如何自动生成的。请阅读 发布和订阅指南 了解如何创建和应用发布/订阅配置。

该组件在 Azure Service Bus 上使用队列;请查看官方文档了解 主题和队列 之间的区别。 若要使用主题,请参阅 Azure Service Bus 主题发布/订阅组件

连接字符串认证

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.queues
  version: v1
  metadata:
  # 不使用 Microsoft Entra ID 认证时必需
  - name: connectionString
    value: "Endpoint=sb://{ServiceBusNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={ServiceBus}"
  # - name: consumerID # 可选
  #   value: channel1
  # - name: timeoutInSec # 可选
  #   value: 60
  # - name: handlerTimeoutInSec # 可选
  #   value: 60
  # - name: disableEntityManagement # 可选
  #   value: "false"
  # - name: maxDeliveryCount # 可选
  #   value: 3
  # - name: lockDurationInSec # 可选
  #   value: 60
  # - name: lockRenewalInSec # 可选
  #   value: 20
  # - name: maxActiveMessages # 可选
  #   value: 10000
  # - name: maxConcurrentHandlers # 可选
  #   value: 10
  # - name: defaultMessageTimeToLiveInSec # 可选
  #   value: 10
  # - name: autoDeleteOnIdleInSec # 可选
  #   value: 3600
  # - name: minConnectionRecoveryInSec # 可选
  #   value: 2
  # - name: maxConnectionRecoveryInSec # 可选
  #   value: 300
  # - name: maxRetriableErrorsPerSec # 可选
  #   value: 10
  # - name: publishMaxRetries # 可选
  #   value: 5
  # - name: publishInitialRetryIntervalInMs # 可选
  #   value: 500

规格元数据字段

字段必需详情示例
connectionStringYService Bus 的共享访问策略连接字符串。除非使用 Microsoft Entra ID 认证,否则必需。见上例
consumerIDN消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
namespaceNameN设置 Service Bus 命名空间地址的参数,作为完全限定的域名。使用 Microsoft Entra ID 认证时必需。"namespace.servicebus.windows.net"
timeoutInSecN发送消息和管理操作的超时时间。默认:6030
handlerTimeoutInSecN调用应用程序处理程序的超时时间。默认:6030
lockRenewalInSecN定义缓冲消息锁将被续订的频率。默认:2020
maxActiveMessagesN定义一次处理或缓冲的最大消息数。此值应至少与最大并发处理程序一样大。默认:10002000
maxConcurrentHandlersN定义最大并发消息处理程序数。默认:0(无限制)10
disableEntityManagementN设置为 true 时,队列和订阅不会自动创建。默认:"false""true""false"
defaultMessageTimeToLiveInSecN默认消息生存时间,以秒为单位。仅在订阅创建期间使用。10
autoDeleteOnIdleInSecN在自动删除空闲订阅之前等待的时间,以秒为单位。仅在订阅创建期间使用。必须为 300 秒或更长。默认:0(禁用)3600
maxDeliveryCountN定义服务器尝试传递消息的次数。仅在订阅创建期间使用。服务器默认设置。10
lockDurationInSecN定义消息在过期前被锁定的时间长度,以秒为单位。仅在订阅创建期间使用。服务器默认设置。30
minConnectionRecoveryInSecN在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最小间隔(以秒为单位)。默认:25
maxConnectionRecoveryInSecN在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最大间隔(以秒为单位)。每次尝试后,组件在最小和最大之间等待一个随机秒数,每次增加。默认:300(5 分钟)600
maxRetriableErrorsPerSecN每秒处理的最大可重试错误数。如果消息处理失败并出现可重试错误,组件会在开始处理另一条消息之前添加延迟,以避免立即重新处理失败的消息。默认:1010
publishMaxRetriesN当 Azure Service Bus 响应“过于繁忙”以限制消息时的最大重试次数。默认:55
publishInitialRetryIntervalInMsN当 Azure Service Bus 限制消息时,初始指数回退的时间(以毫秒为单位)。默认:500500

Microsoft Entra ID 认证

Azure Service Bus 队列发布/订阅组件支持使用所有 Microsoft Entra ID 机制进行认证,包括托管身份。有关更多信息以及根据选择的 Microsoft Entra ID 认证机制提供的相关组件元数据字段,请参阅 Azure 认证文档

示例配置

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.queues
  version: v1
  metadata:
  - name: namespaceName
    # 使用 Azure 认证时必需。
    # 必须是完全限定的域名
    value: "servicebusnamespace.servicebus.windows.net"
  - name: azureTenantId
    value: "***"
  - name: azureClientId
    value: "***"
  - name: azureClientSecret
    value: "***"

消息元数据

Azure Service Bus 消息在 Dapr 消息格式的基础上增加了上下文元数据。一些元数据字段由 Azure Service Bus 本身设置(只读),其他字段可以在发布消息时由客户端设置。

发送带有元数据的消息

要在发送消息时设置 Azure Service Bus 元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 此处 所述。

  • metadata.MessageId
  • metadata.CorrelationId
  • metadata.SessionId
  • metadata.Label
  • metadata.ReplyTo
  • metadata.PartitionKey
  • metadata.To
  • metadata.ContentType
  • metadata.ScheduledEnqueueTimeUtc
  • metadata.ReplyToSessionId

接收带有元数据的消息

当 Dapr 调用您的应用程序时,它使用 HTTP 头或 gRPC 元数据将 Azure Service Bus 消息元数据附加到请求中。 除了 上述可设置的元数据 外,您还可以访问以下只读消息元数据。

  • metadata.DeliveryCount
  • metadata.LockedUntilUtc
  • metadata.LockToken
  • metadata.EnqueuedTimeUtc
  • metadata.SequenceNumber

要了解这些元数据属性的用途的更多详细信息,请参阅 官方 Azure Service Bus 文档

此外,原始 Azure Service Bus 消息的所有 ApplicationProperties 条目都作为 metadata.<application property's name> 附加。

发送和接收多条消息

Azure Service Bus 支持使用批量发布/订阅 API 在单个操作中发送和接收多条消息。

配置批量发布

要为批量发布操作设置元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 此处 所述。

元数据默认值
metadata.maxBulkPubBytes131072 (128 KiB)

配置批量订阅

订阅主题时,您可以配置 bulkSubscribe 选项。有关更多详细信息,请参阅 批量订阅消息。了解更多关于 批量订阅 API 的信息。

配置默认值
maxMessagesCount100

创建 Azure Service Bus 队列代理

按照 此处 的说明设置 Azure Service Bus 队列。

重试策略和死信队列

默认情况下,Azure Service Bus 队列有一个死信队列。消息会根据 maxDeliveryCount 的值进行重试。默认的 maxDeliveryCount 值为 10,但可以设置为最多 2000。这些重试发生得非常迅速,如果没有成功返回,消息将被放入死信队列。

Dapr 发布/订阅提供了自己的死信队列概念,允许您控制重试策略并通过 Dapr 订阅死信队列。

  1. 在 Azure Service Bus 命名空间中设置一个单独的队列作为死信队列,并定义一个弹性策略来定义如何重试。
  2. 订阅主题以获取失败的消息并处理它们。

例如,在订阅中设置一个死信队列 orders-dlq 和一个弹性策略,允许您订阅主题 orders-dlq 以处理失败的消息。

有关设置死信队列的更多详细信息,请参阅 死信文章

相关链接

6 - GCP

关于 GCP Pub/Sub 组件的详细文档

创建 Dapr 组件

要配置 GCP pub/sub,需创建一个类型为 pubsub.gcp.pubsub 的组件。参考 pub/sub broker 组件文件 了解 ConsumerID 的自动生成方式。查看 发布和订阅指南 了解如何创建和应用 pub/sub 配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: gcp-pubsub
spec:
  type: pubsub.gcp.pubsub
  version: v1
  metadata:
  - name: type
    value: service_account
  - name: projectId
    value: <PROJECT_ID> # 替换
  - name: endpoint # 可选
    value: "http://localhost:8085"
  - name: consumerID # 可选 - 默认为应用程序自身的 ID
    value: <CONSUMER_ID>
  - name: identityProjectId
    value: <IDENTITY_PROJECT_ID> # 替换
  - name: privateKeyId
    value: <PRIVATE_KEY_ID> # 替换
  - name: clientEmail
    value: <CLIENT_EMAIL> # 替换
  - name: clientId
    value: <CLIENT_ID> # 替换
  - name: authUri
    value: https://accounts.google.com/o/oauth2/auth
  - name: tokenUri
    value: https://oauth2.googleapis.com/token
  - name: authProviderX509CertUrl
    value: https://www.googleapis.com/oauth2/v1/certs
  - name: clientX509CertUrl
    value: https://www.googleapis.com/robot/v1/metadata/x509/<PROJECT_NAME>.iam.gserviceaccount.com # 替换 PROJECT_NAME
  - name: privateKey
    value: <PRIVATE_KEY> # 替换 x509 证书
  - name: disableEntityManagement
    value: "false"
  - name: enableMessageOrdering
    value: "false"
  - name: orderingKey # 可选
    value: <ORDERING_KEY>
  - name: maxReconnectionAttempts # 可选
    value: 30
  - name: connectionRecoveryInSec # 可选
    value: 2
  - name: deadLetterTopic # 可选
    value: <EXISTING_PUBSUB_TOPIC>
  - name: maxDeliveryAttempts # 可选
    value: 5
  - name: maxOutstandingMessages # 可选
    value: 1000
  - name: maxOutstandingBytes # 可选
    value: 1000000000
  - name: maxConcurrentConnections # 可选
    value: 10

规格元数据字段

字段必需详情示例
projectIdYGCP 项目 IDmyproject-123
endpointN组件使用的 GCP 端点。仅用于本地开发(例如)与 GCP Pub/Sub Emulator 一起使用。运行 GCP 生产 API 时不需要 endpoint"http://localhost:8085"
consumerIDNConsumer ID 将一个或多个消费者组织成一个组。具有相同 consumer ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。consumerID 与请求中提供的 topic 一起用于构建 Pub/Sub 订阅 ID可以设置为字符串值(例如 "channel1")或字符串格式值(例如 "{podName}" 等)。查看可以在组件元数据中使用的所有模板标签。
identityProjectIdN如果 GCP pubsub 项目与身份项目不同,使用此属性指定身份项目"myproject-123"
privateKeyIdN如果使用显式凭据,此字段应包含服务账户 JSON 文档中的 private_key_id 字段"my-private-key"
privateKeyN如果使用显式凭据,此字段应包含服务账户 JSON 中的 private_key 字段-----BEGIN PRIVATE KEY-----MIIBVgIBADANBgkqhkiG9w0B
clientEmailN如果使用显式凭据,此字段应包含服务账户 JSON 中的 client_email 字段"myservice@myproject-123.iam.gserviceaccount.com"
clientIdN如果使用显式凭据,此字段应包含服务账户 JSON 中的 client_id 字段106234234234
authUriN如果使用显式凭据,此字段应包含服务账户 JSON 中的 auth_uri 字段https://accounts.google.com/o/oauth2/auth
tokenUriN如果使用显式凭据,此字段应包含服务账户 JSON 中的 token_uri 字段https://oauth2.googleapis.com/token
authProviderX509CertUrlN如果使用显式凭据,此字段应包含服务账户 JSON 中的 auth_provider_x509_cert_url 字段https://www.googleapis.com/oauth2/v1/certs
clientX509CertUrlN如果使用显式凭据,此字段应包含服务账户 JSON 中的 client_x509_cert_url 字段https://www.googleapis.com/robot/v1/metadata/x509/myserviceaccount%40myproject.iam.gserviceaccount.com
disableEntityManagementN设置为 "true" 时,主题和订阅不会自动创建。默认值:"false""true""false"
enableMessageOrderingN设置为 "true" 时,订阅的消息将按顺序接收,具体取决于发布和权限配置。"true""false"
orderingKeyN请求中提供的键。当 enableMessageOrdering 设置为 true 时,用于根据该键对消息进行排序。“my-orderingkey”
maxReconnectionAttemptsN定义最大重连尝试次数。默认值:3030
connectionRecoveryInSecN连接恢复尝试之间的等待时间(以秒为单位)。默认值:22
deadLetterTopicNGCP Pub/Sub 主题的名称。此主题在使用此组件之前必须存在。"myapp-dlq"
maxDeliveryAttemptsN消息传递的最大尝试次数。如果指定了 deadLetterTopicmaxDeliveryAttempts 是消息处理失败的最大尝试次数。一旦达到该次数,消息将被移至死信主题。默认值:55
typeN已弃用 GCP 凭据类型。仅支持 service_account。默认为 service_accountservice_account
maxOutstandingMessagesN给定 streaming-pull 连接可以拥有的最大未完成消息数。默认值:100050
maxOutstandingBytesN给定 streaming-pull 连接可以拥有的最大未完成字节数。默认值:10000000001000000000
maxConcurrentConnectionsN要维护的最大并发 streaming-pull 连接数。默认值:102
ackDeadlineN消息确认持续时间截止时间。默认值:20s1m

GCP 凭据

由于 GCP Pub/Sub 组件使用 GCP Go 客户端库,默认情况下它使用 应用程序默认凭据 进行身份验证。这在 使用客户端库对 GCP 云服务进行身份验证 指南中有进一步解释。

创建 GCP Pub/Sub

对于本地开发,使用 GCP Pub/Sub Emulator 来测试 GCP Pub/Sub 组件。按照 这些说明 运行 GCP Pub/Sub Emulator。

要在本地使用 Docker 运行 GCP Pub/Sub Emulator,请使用以下 docker-compose.yaml

version: '3'
services:
  pubsub:
    image: gcr.io/google.com/cloudsdktool/cloud-sdk:422.0.0-emulators
    ports:
      - "8085:8085"
    container_name: gcp-pubsub
    entrypoint: gcloud beta emulators pubsub start --project local-test-prj --host-port 0.0.0.0:8085

为了使用 GCP Pub/Sub Emulator 与您的 pub/sub 绑定,您需要在组件元数据中提供 endpoint 配置。运行 GCP 生产 API 时不需要 endpoint

projectId 属性必须与 docker-compose.yaml 或 Docker 命令中使用的 --project 匹配。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: gcp-pubsub
spec:
  type: pubsub.gcp.pubsub
  version: v1
  metadata:
  - name: projectId
    value: "local-test-prj"
  - name: consumerID
    value: "testConsumer"
  - name: endpoint
    value: "localhost:8085"

您可以使用“显式”或“隐式”凭据来配置对 GCP pubsub 实例的访问。如果使用显式,大多数字段是必需的。隐式依赖于 dapr 在映射到具有访问 pubsub 所需权限的 Google 服务账户 (GSA) 的 Kubernetes 服务账户 (KSA) 下运行。在隐式模式下,只需要 projectId 属性,其他所有都是可选的。

按照 此处 的说明设置 Google Cloud Pub/Sub 系统。

相关链接

7 - JetStream

NATS JetStream 组件的详细说明文档

组件格式

要配置 JetStream 的发布/订阅功能,需要创建一个类型为 pubsub.jetstream 的组件。请参考 pubsub broker 组件文件 以了解 ConsumerID 的自动生成方式。阅读 发布和订阅指南 以获取创建和应用 pubsub 配置的步骤。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: jetstream-pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: jwt # 可选。用于分布式 JWT 认证。
    value: "eyJhbGciOiJ...6yJV_adQssw5c"
  - name: seedKey # 可选。用于分布式 JWT 认证。
    value: "SUACS34K232O...5Z3POU7BNIL4Y"
  - name: tls_client_cert # 可选。用于 TLS 客户端认证。
    value: "/path/to/tls.crt"
  - name: tls_client_key # 可选。用于 TLS 客户端认证。
    value: "/path/to/tls.key"
  - name: token # 可选。用于基于令牌的认证。
    value: "my-token"
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"
  - name: startSequence
    value: 1
  - name: startTime # Unix 时间戳格式
    value: 1630349391
  - name: flowControl
    value: false
  - name: ackWait
    value: 10s
  - name: maxDeliver
    value: 5
  - name: backOff
    value: "50ms, 1s, 10s"
  - name: maxAckPending
    value: 5000
  - name: replicas
    value: 1
  - name: memoryStorage
    value: false
  - name: rateLimit
    value: 1024
  - name: heartbeat
    value: 15s
  - name: ackPolicy
    value: explicit
  - name: deliverPolicy
    value: all
  - name: domain
    value: hub
  - name: apiPrefix
    value: PREFIX

规格元数据字段

字段必需详情示例
natsURLNATS 服务器地址 URL"nats://localhost:4222"
jwtNATS 分布式认证 JWT"eyJhbGciOiJ...6yJV_adQssw5c"
seedKeyNATS 分布式认证种子密钥"SUACS34K232O...5Z3POU7BNIL4Y"
tls_client_certNATS TLS 客户端认证证书"/path/to/tls.crt"
tls_client_keyNATS TLS 客户端认证密钥"/path/to/tls.key"
tokenNATS 基于令牌的认证"my-token"
nameNATS 连接名称"my-conn-name"
streamName要绑定的 JetStream 流名称"my-stream"
durableName持久名称"my-durable"
queueGroupName队列组名称"my-queue"
startSequence开始序列1
startTime开始时间,Unix 时间戳格式1630349391
flowControl流量控制true
ackWait确认等待10s
maxDeliver最大投递次数15
backOff退避"50ms, 1s, 5s, 10s"
maxAckPending最大确认待处理5000
replicas副本3
memoryStorage内存存储false
rateLimit速率限制1024
heartbeat心跳10s
ackPolicy确认策略explicit
deliverPolicy其中之一:all, last, new, sequence, timeall
domainJetStream LeafonodesHUB
apiPrefix[JetStream Leafnodes]PREFIX

创建 NATS 服务器

您可以使用 Docker 在本地运行启用 JetStream 的 NATS 服务器:

docker run -d -p 4222:4222 nats:latest -js

然后,您可以通过客户端端口与服务器交互:localhost:4222

使用 helm 在 Kubernetes 上安装 NATS JetStream:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install --set nats.jetstream.enabled=true my-nats nats/nats

这将在 default 命名空间中安装一个 NATS 服务器。要与 NATS 交互,请找到服务:

kubectl get svc my-nats

有关 helm chart 设置的更多信息,请参阅 Helm chart 文档

创建 JetStream

为特定主题创建 NATS JetStream 是至关重要的。例如,对于在本地运行的 NATS 服务器,使用:

nats -s localhost:4222 stream add myStream --subjects mySubject

示例:竞争消费者模式

假设您希望每条消息仅由具有相同 app-id 的一个应用程序或 pod 处理。通常,consumerID 元数据规范可以帮助您定义竞争消费者。

由于 NATS JetStream 不支持 consumerID,您需要指定 durableNamequeueGroupName 来实现竞争消费者模式。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"

相关链接

8 - KubeMQ

KubeMQ pubsub 组件的详细说明

组件格式

要配置 KubeMQ pub/sub,需创建一个类型为 pubsub.kubemq 的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何发布和订阅指南 以了解如何创建和应用 pub/sub 配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kubemq-pubsub
spec:
  type: pubsub.kubemq
  version: v1
  metadata:
    - name: address
      value: localhost:50000
    - name: store
      value: false
    - name: consumerID
      value: channel1

规格元数据字段

字段必需详情示例
addressYKubeMQ 服务器的地址"localhost:50000"
storeNpubsub 类型,true: pubsub 持久化 (EventsStore),false: pubsub 内存中 (Events)truefalse (默认是 false)
consumerIDN消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看可以在组件元数据中使用的所有模板标签。
clientIDN客户端 ID 连接的名称sub-client-12345
authTokenN连接的 Auth JWT 令牌 查看 KubeMQ 认证ew...
groupN用于负载均衡的订阅者组g1
disableReDeliveryN设置是否在应用程序出错时重新传递消息truefalse (默认是 false)

创建 KubeMQ broker

  1. 获取 KubeMQ 密钥
  2. 等待电子邮件确认您的密钥

您可以使用 Docker 运行 KubeMQ broker:

docker run -d -p 8080:8080 -p 50000:50000 -p 9090:9090 -e KUBEMQ_TOKEN=<your-key> kubemq/kubemq

然后您可以使用客户端端口与服务器交互:localhost:50000

  1. 获取 KubeMQ 密钥
  2. 等待电子邮件确认您的密钥

然后运行以下 kubectl 命令:

kubectl apply -f https://deploy.kubemq.io/init
kubectl apply -f https://deploy.kubemq.io/key/<your-key>

安装 KubeMQ CLI

前往 KubeMQ CLI 并下载最新版本的 CLI。

浏览 KubeMQ 仪表板

打开浏览器并导航到 http://localhost:8080

安装 KubeMQCTL 后,运行以下命令:

kubemqctl get dashboard

或者,安装 kubectl 后,运行端口转发命令:

kubectl port-forward svc/kubemq-cluster-api -n kubemq 8080:8080

KubeMQ 文档

访问 KubeMQ 文档 了解更多信息。

相关链接

9 - MQTT

MQTT pubsub组件的详细文档

组件格式

要配置MQTT pub/sub,您需要创建一个类型为pubsub.mqtt的组件。请参阅pub/sub broker组件文件以了解ConsumerID的自动生成方式。阅读操作指南:发布和订阅指南以了解如何创建和应用pub/sub配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "tcp://[username][:password]@host.domain[:port]"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: consumerID
    value: "channel1"

规格元数据字段

字段必需详情示例
urlYMQTT broker的地址。可以使用secretKeyRef来引用密钥。
对于非TLS通信,使用**tcp://** URI方案。
对于TLS通信,使用**ssl://** URI方案。
"tcp://[username][:password]@host.domain[:port]"
consumerIDN用于连接到MQTT broker的消费者连接的客户端ID。默认为Dapr应用ID。
注意:如果未设置producerID,则在此值后附加-consumer用于消费者连接
可以设置为字符串值(如上例中的"channel1")或字符串格式值(如"{podName}"等)。查看可以在组件元数据中使用的所有模板标签。
producerIDN用于连接到MQTT broker的生产者连接的客户端ID。默认为{consumerID}-producer"myMqttProducerApp"
qosN表示消息的服务质量级别(QoS)(更多信息)。默认为10, 1, 2
retainN定义broker是否将消息保存为指定主题的最后已知良好值。默认为"false""true", "false"
cleanSessionN如果为"true",则在连接消息中设置clean_session标志到MQTT broker(更多信息)。默认为"false""true", "false"
caCert使用TLS时必需用于验证服务器TLS证书的证书颁发机构(CA)证书,格式为PEM。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert使用TLS时必需TLS客户端证书,格式为PEM。必须与clientKey一起使用。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey使用TLS时必需TLS客户端密钥,格式为PEM。必须与clientCert一起使用。可以使用secretKeyRef来引用密钥。"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"

启用消息传递重试

MQTT pub/sub组件不支持内置的重试策略。这意味着sidecar只会向服务发送一次消息。如果服务标记消息为未处理,则消息不会被确认回broker。只有当broker重新发送消息时,才会重试。

要使Dapr使用更复杂的重试策略,可以将重试弹性策略应用于MQTT pub/sub组件。

两种重试方式之间有一个关键区别:

  1. 未确认消息的重新传递完全依赖于broker。Dapr不保证这一点。一些broker如emqxvernemq等支持它,但它不是MQTT3规范的一部分。

  2. 使用重试弹性策略使得同一个Dapr sidecar重试重新传递消息。因此是同一个Dapr sidecar和同一个应用接收相同的消息。

使用TLS进行通信

要配置使用TLS进行通信,请确保MQTT broker(例如,mosquitto)配置为支持证书,并在组件配置中提供caCertclientCertclientKey元数据。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "ssl://host.domain[:port]"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: myMqttClientKey
      key: myMqttClientKey
auth:
  secretStore: <SECRET_STORE_NAME>

注意,虽然caCertclientCert值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。

消费共享主题

在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run时使用不同的应用ID即可使它们从同一个共享主题中消费。然而,在Kubernetes上,应用pod的多个实例将共享相同的应用ID,禁止所有实例消费同一个主题。为了解决这个问题,配置组件的consumerID元数据为{uuid}标签,这将在启动时为每个实例提供一个随机生成的consumerID值。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
    - name: consumerID
      value: "{uuid}"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"
    - name: cleanSession
      value: "true"

注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此我们也将cleanSession设置为true。

创建MQTT broker

您可以使用Docker本地运行MQTT broker:

docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6

然后您可以使用客户端端口与服务器交互:mqtt://localhost:1883

您可以在kubernetes中使用以下yaml运行MQTT broker:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app-name: mqtt-broker
  template:
    metadata:
      labels:
        app-name: mqtt-broker
    spec:
      containers:
        - name: mqtt
          image: eclipse-mosquitto:1.6
          imagePullPolicy: IfNotPresent
          ports:
            - name: default
              containerPort: 1883
              protocol: TCP
            - name: websocket
              containerPort: 9001
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  type: ClusterIP
  selector:
    app-name: mqtt-broker
  ports:
    - port: 1883
      targetPort: default
      name: default
      protocol: TCP
    - port: 9001
      targetPort: websocket
      name: websocket
      protocol: TCP

然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883

相关链接

10 - MQTT3

MQTT3 发布订阅组件的详细文档

组件格式

要配置一个MQTT3发布/订阅组件,请创建一个类型为pubsub.mqtt3的组件。请参阅发布/订阅代理组件文件以了解如何自动生成ConsumerID。阅读操作指南:发布和订阅指南以了解如何创建和应用发布/订阅配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: url
      value: "tcp://[username][:password]@host.domain[:port]"
    # 可选
    - name: retain
      value: "false"
    - name: cleanSession
      value: "false"
    - name: qos
      value: "1"
    - name: consumerID
      value: "channel1"

规格元数据字段

字段必需详情示例
urlYMQTT broker的地址。可以使用secretKeyRef来引用密钥。
对于非TLS通信,使用**tcp://** URI方案。
对于TLS通信,使用**ssl://** URI方案。
"tcp://[username][:password]@host.domain[:port]"
consumerIDN用于连接到MQTT broker的客户端ID。默认为Dapr应用ID。可以设置为字符串值(如上例中的"channel1")或字符串格式值(如"{podName}"等)。查看您可以在组件元数据中使用的所有模板标签。
retainN定义消息是否由broker保存为指定主题的最后已知良好值。默认为"false""true""false"
cleanSessionN如果为"true",则在连接消息中设置clean_session标志到MQTT broker(更多信息)。默认为"false""true""false"
caCert使用TLS时必需用于验证服务器TLS证书的PEM格式的证书颁发机构(CA)证书。参见下面的示例
clientCert使用TLS时必需PEM格式的TLS客户端证书。必须与clientKey一起使用。参见下面的示例
clientKey使用TLS时必需PEM格式的TLS客户端密钥。必须与clientCert一起使用。可以使用secretKeyRef来引用密钥。参见下面的示例
qosN表示消息的服务质量级别(QoS)(更多信息)。默认为1012

使用TLS进行通信

要配置使用TLS进行通信,请确保MQTT broker(例如emqx)配置为支持证书,并在组件配置中提供caCertclientCertclientKey元数据。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: url
      value: "ssl://host.domain[:port]"
  # TLS配置
    - name: caCert
      value: |
        -----BEGIN CERTIFICATE-----
        ...
        -----END CERTIFICATE-----
    - name: clientCert
      value: |
        -----BEGIN CERTIFICATE-----
        ...
        -----END CERTIFICATE-----
    - name: clientKey
      secretKeyRef:
        name: myMqttClientKey
        key: myMqttClientKey
    # 可选
    - name: retain
      value: "false"
    - name: cleanSession
      value: "false"
    - name: qos
      value: 1

请注意,虽然caCertclientCert的值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。

消费共享主题

在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run时使用不同的应用ID即可让它们从同一个共享主题中消费。然而,在Kubernetes上,应用Pod的多个实例将共享相同的应用ID,禁止所有实例消费相同的主题。为了解决这个问题,可以在组件的consumerID元数据中配置一个{uuid}标签(这将在启动时为每个实例生成一个随机值)或{podName}(这将在Kubernetes上使用Pod的名称)。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: consumerID
      value: "{uuid}"
    - name: cleanSession
      value: "true"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"

请注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此您也应该将cleanSession设置为true

建议使用StatefulSets进行共享订阅。

创建一个MQTT3 broker

您可以使用Docker在本地运行一个像emqx这样的MQTT broker:

docker run -d -p 1883:1883 --name mqtt emqx:latest

然后您可以使用客户端端口与服务器交互:tcp://localhost:1883

您可以使用以下yaml在Kubernetes中运行一个MQTT3 broker:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app-name: mqtt-broker
  template:
    metadata:
      labels:
        app-name: mqtt-broker
    spec:
      containers:
        - name: mqtt
          image: emqx:latest
          imagePullPolicy: IfNotPresent
          ports:
            - name: default
              containerPort: 1883
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  type: ClusterIP
  selector:
    app-name: mqtt-broker
  ports:
    - port: 1883
      targetPort: default
      name: default
      protocol: TCP

然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883

相关链接

11 - Pulsar

关于 Pulsar 发布/订阅组件的详细文档

组件格式

要配置 Apache Pulsar 的发布/订阅(pub/sub)功能,需要创建一个类型为 pubsub.pulsar 的组件。请参阅 pub/sub broker 组件文件 以了解 ConsumerID 的自动生成方式。阅读 操作指南:发布和订阅 以了解如何创建和应用 pub/sub 配置。

有关 Apache Pulsar 的更多信息,请阅读官方文档

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pulsar-pubsub
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: enableTLS
    value: "false"
  - name: tenant
    value: "public"
  - name: token
    value: "eyJrZXlJZCI6InB1bHNhci1wajU0cXd3ZHB6NGIiLCJhbGciOiJIUzI1NiJ9.eyJzd"
  - name: consumerID
    value: "channel1"
  - name: namespace
    value: "default"
  - name: persistent
    value: "true"
  - name: disableBatching
    value: "false"
  - name: receiverQueueSize
    value: "1000"
  - name: <topic-name>.jsonschema # 为配置的主题设置 JSON schema 验证
    value: |
      {
        "type": "record",
        "name": "Example",
        "namespace": "test",
        "fields": [
          {"name": "ID","type": "int"},
          {"name": "Name","type": "string"}
        ]
      }
  - name: <topic-name>.avroschema # 为配置的主题设置 Avro schema 验证
    value: |
      {
        "type": "record",
        "name": "Example",
        "namespace": "test",
        "fields": [
          {"name": "ID","type": "int"},
          {"name": "Name","type": "string"}
        ]
      }

规格元数据字段

字段必需详情示例
hostYPulsar broker 的地址。默认值为 "localhost:6650""localhost:6650""http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080"
enableTLSN是否启用 TLS。默认值: "false""true", "false"
tenantN主题的租户。租户是 Pulsar 多租户的关键,并跨集群分布。默认值: "public""public"
consumerIDN用于设置订阅名称或消费者 ID。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看可以在组件元数据中使用的所有模板标签。
namespaceN主题的管理单元,作为相关主题的分组机制。默认值: "default""default"
persistentNPulsar 支持两种类型的主题:持久化非持久化。持久化主题的所有消息都存储在磁盘上,而非持久化主题的数据不会存储到磁盘。
disableBatchingN是否禁用批处理。启用批处理时,默认批处理延迟为 10 毫秒,默认批处理大小为 1000 条消息,设置 disableBatching: true 将使生产者单独发送消息。默认值: "false""true", "false"
receiverQueueSizeN设置消费者接收队列的大小。控制消费者在被 Dapr 显式调用读取消息之前可以累积多少消息。默认值: "1000""1000"
batchingMaxPublishDelayN设置消息发送的批处理时间段(如果启用了批处理消息)。如果设置为非零值,消息将排队直到此时间间隔或 batchingMaxMessages(见下文)或 batchingMaxSize(见下文)。有两种有效格式,一种是带单位后缀的分数格式,另一种是纯数字格式,处理为毫秒。有效的时间单位有 “ns”, “us” (或 “µs”), “ms”, “s”, “m”, “h”。默认值: "10ms""10ms", "10"
batchingMaxMessagesN设置批处理中允许的最大消息数。如果设置为大于 1 的值,消息将排队直到达到此阈值或 batchingMaxSize(见下文)或批处理间隔已过。默认值: "1000""1000"
batchingMaxSizeN设置批处理中允许的最大字节数。如果设置为大于 1 的值,消息将排队直到达到此阈值或 batchingMaxMessages(见上文)或批处理间隔已过。默认值: "128KB""131072"
.jsonschemaN为配置的主题强制执行 JSON schema 验证。
.avroschemaN为配置的主题强制执行 Avro schema 验证。
publicKeyN用于发布者和消费者加密的公钥。值可以是两种选项之一:本地 PEM 证书的文件路径,或证书数据字符串值
privateKeyN用于消费者加密的私钥。值可以是两种选项之一:本地 PEM 证书的文件路径,或证书数据字符串值
keysN包含 Pulsar 会话密钥 名称的逗号分隔字符串。与 publicKey 一起用于发布者加密
processModeN是否支持同时处理多条消息。默认值: "async""async", "sync"
subscribeTypeNPulsar 支持四种 订阅类型。默认值: "shared""shared", "exclusive", "failover", "key_shared"
partitionKeyN设置消息的路由策略键。默认值: ""
maxConcurrentHandlersN定义并发消息处理程序的最大数量。默认值: 10010

使用 Token 进行身份验证

要使用静态 JWT token 进行 Pulsar 身份验证,可以使用以下元数据字段:

字段必需详情示例
tokenN用于身份验证的 token。如何创建 Pulsar token
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "pulsar.example.com:6650"
  - name: token
    secretKeyRef:
      name: pulsar
      key:  token

使用 OIDC 进行身份验证

v3.0 起,Pulsar 支持 OIDC 身份验证。 要启用 OIDC 身份验证,您需要向组件规范提供以下 OAuth2 参数。 OAuth2 身份验证不能与 token 身份验证结合使用。 建议您使用 secret 引用来获取客户端 secret。 Pulsar 的 OAuth2 身份验证器不完全符合 OIDC,因此您有责任确保字段符合要求。例如,发行者 URL 必须使用 https 协议,请求的范围包括 openid 等。 如果省略 oauth2TokenCAPEM 字段,则系统的证书池将用于连接到 OAuth2 发行者(如果使用 https)。

字段必需详情示例
oauth2TokenURLN请求 OIDC client_credentials token 的 URL。不能为空。https://oauth.example.com/o/oauth2/token"`
oauth2TokenCAPEMN连接到 OAuth2 发行者的 CA PEM 证书包。如果未定义,将使用系统的证书池。"---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
oauth2ClientIDNOIDC 客户端 ID。不能为空。"my-client-id"
oauth2ClientSecretNOIDC 客户端 secret。不能为空。"my-client-secret"
oauth2AudiencesN请求的受众的逗号分隔列表。不能为空。"my-audience-1,my-audience-2"
oauth2ScopesN请求的范围的逗号分隔列表。不能为空。"openid,profile,email"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "pulsar.example.com:6650"
  - name: oauth2TokenURL
    value: https://oauth.example.com/o/oauth2/token
  - name: oauth2TokenCAPEM
    value: "---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
  - name: oauth2ClientID
    value: my-client-id
  - name: oauth2ClientSecret
    secretKeyRef:
      name: pulsar-oauth2
      key:  my-client-secret
  - name: oauth2Audiences
    value: "my.pulsar.example.com,another.pulsar.example.com"
  - name: oauth2Scopes
    value: "openid,profile,email"

启用消息传递重试

Pulsar pub/sub 组件没有内置的重试策略支持。这意味着 sidecar 仅向服务发送一次消息,失败时不会重试。要使 Dapr 使用更复杂的重试策略,您可以将 重试弹性策略 应用于 Pulsar pub/sub 组件。请注意,这将是同一个 Dapr sidecar 重试将消息重新传递到同一个应用实例,而不是其他实例。

延迟队列

在调用 Pulsar pub/sub 时,可以通过请求 URL 中的 metadata 查询参数提供可选的延迟队列。

这些可选参数名称是 metadata.deliverAtmetadata.deliverAfter

  • deliverAt: 延迟消息在指定时间(RFC3339 格式)交付;例如,"2021-09-01T10:00:00Z"
  • deliverAfter: 延迟消息在指定时间后交付;例如,"4h5m3s"

示例:

curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAt='2021-09-01T10:00:00Z' \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAfter='4h5m3s' \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

端到端加密

Dapr 支持设置公钥和私钥对以启用 Pulsar 的 端到端加密功能

从文件证书启用发布者加密

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value: ./public.key
  - name: keys
    value: myapp.key

从文件证书启用消费者加密

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value: ./public.key
  - name: privateKey
    value: ./private.key

从值启用发布者加密

注意:建议 从 secret 引用公钥

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value:  "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
  - name: keys
    value: myapp.key

从值启用消费者加密

注意:建议 从 secret 引用公钥和私钥

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
  - name: privateKey
    value: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n"

分区键

在调用 Pulsar pub/sub 时,可以通过请求 URL 中的 metadata 查询参数提供可选的分区键。

参数名称是 partitionKey

示例:

curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

消息头

所有其他元数据键/值对(不是 partitionKey)都设置为 Pulsar 消息中的头。例如,为消息设置 correlationId

curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

顺序保证

为了确保消息按顺序到达订阅特定键的每个消费者,必须满足三个条件。

  1. subscribeType 应设置为 key_shared
  2. 必须设置 partitionKey
  3. processMode 应设置为 sync

创建一个 Pulsar 实例

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:2.5.1 \
  bin/pulsar standalone

请参考以下 Helm chart 文档。

相关链接

12 - RabbitMQ

RabbitMQ pubsub 组件的详细说明文档

组件格式

要设置 RabbitMQ 的发布/订阅功能,请创建一个类型为 pubsub.rabbitmq 的组件。请参阅 pub/sub broker 组件文件 以了解消费者ID(ConsumerID)是如何自动生成的。阅读 How-to: 发布和订阅指南 以了解如何创建和应用 pub/sub 配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbitmq-pubsub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: connectionString
    value: "amqp://localhost:5672"
  - name: protocol
    value: amqp  
  - name: hostname
    value: localhost 
  - name: username
    value: username
  - name: password
    value: password  
  - name: consumerID
    value: channel1
  - name: durable
    value: false
  - name: deletedWhenUnused
    value: false
  - name: autoAck
    value: false
  - name: deliveryMode
    value: 0
  - name: requeueInFailure
    value: false
  - name: prefetchCount
    value: 0
  - name: reconnectWait
    value: 0
  - name: concurrencyMode
    value: parallel
  - name: publisherConfirm
    value: false
  - name: enableDeadLetter # 可选,是否启用死信
    value: true
  - name: maxLen # 可选,队列中的最大消息数
    value: 3000
  - name: maxLenBytes # 可选,队列的最大字节长度
    value: 10485760
  - name: exchangeKind
    value: fanout
  - name: saslExternal
    value: false
  - name: ttlInSeconds
    value: 60
  - name: clientName
    value: {podName}
  - name: heartBeat
    value: 10s

规格元数据字段

字段必需详情示例
connectionStringY*RabbitMQ 连接字符串。*与 protocol、hostname、username、password 字段互斥amqp://user:pass@localhost:5672
protocolN*RabbitMQ 协议。*与 connectionString 字段互斥amqp
hostnameN*RabbitMQ 主机名。*与 connectionString 字段互斥localhost
usernameN*RabbitMQ 用户名。*与 connectionString 字段互斥username
passwordN*RabbitMQ 密码。*与 connectionString 字段互斥password
consumerIDN消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时会将其设置为 Dapr 应用程序 ID (appID) 的值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
durableN是否使用 持久化 队列。默认为 "false""true""false"
deletedWhenUnusedN队列是否应配置为 自动删除 默认为 "true""true""false"
autoAckN队列消费者是否应 自动确认 消息。默认为 "false""true""false"
deliveryModeN发布消息时的持久性模式。默认为 "0"。RabbitMQ 将 "2" 视为持久性,其他数字视为非持久性"0""2"
requeueInFailureN在失败情况下发送 负确认 时是否重新排队。默认为 "false""true""false"
prefetchCountN预取 的消息数量。考虑将其更改为非零值以用于生产环境。默认为 "0",这意味着将预取所有可用消息。"2"
publisherConfirmN如果启用,客户端在发布消息后等待 发布者确认。默认为 "false""true""false"
reconnectWaitN如果发生连接故障,重新连接前等待的时间(以秒为单位)"0"
concurrencyModeNparallel 是默认值,允许并行处理多个消息(如果配置了 app-max-concurrency 注释,则受其限制)。设置为 single 以禁用并行处理。在大多数情况下,没有理由更改此设置。parallelsingle
enableDeadLetterN启用将无法处理的消息转发到死信主题。默认为 "false""true""false"
maxLenN队列及其死信队列(如果启用了死信)的最大消息数。如果同时设置了 maxLenmaxLenBytes,则两者都将适用;首先达到的限制将被强制执行。默认为无限制。"1000"
maxLenBytesN队列及其死信队列(如果启用了死信)的最大字节长度。如果同时设置了 maxLenmaxLenBytes,则两者都将适用;首先达到的限制将被强制执行。默认为无限制。"1048576"
exchangeKindNrabbitmq 交换的交换类型。默认为 "fanout""fanout""topic"
saslExternalN使用 TLS 时,用户名是否应从附加字段(例如 CN)中获取。请参阅 RabbitMQ 认证机制。默认为 "false""true""false"
ttlInSecondsN在组件级别设置消息 TTL,可以通过每个请求的消息级别 TTL 覆盖。"60"
caCert使用 TLS 时必需用于验证服务器 TLS 证书的 PEM 格式的证书颁发机构(CA)证书。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert使用 TLS 时必需PEM 格式的 TLS 客户端证书。必须与 clientKey 一起使用。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey使用 TLS 时必需PEM 格式的 TLS 客户端密钥。必须与 clientCert 一起使用。可以是 secretKeyRef 以使用 secret 引用。"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
clientNameN这个 RabbitMQ 客户端提供的连接名称 是一个自定义标识符。如果设置,标识符将在 RabbitMQ 服务器日志条目和管理 UI 中提及。可以设置为 {uuid}、{podName} 或 {appID},Dapr 运行时将其替换为实际值。"app1"{uuid}{podName}{appID}
heartBeatN定义与服务器的心跳间隔,检测与 RabbitMQ 服务器的对等 TCP 连接的存活性。默认为 10s"10s"

使用 TLS 进行通信

要配置使用 TLS 的通信,请确保 RabbitMQ 节点已启用 TLS,并在组件配置中提供 caCertclientCertclientKey 元数据。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbitmq-pubsub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqps://localhost:5671"
  - name: consumerID
    value: myapp
  - name: durable
    value: false
  - name: deletedWhenUnused
    value: false
  - name: autoAck
    value: false
  - name: deliveryMode
    value: 0
  - name: requeueInFailure
    value: false
  - name: prefetchCount
    value: 0
  - name: reconnectWait
    value: 0
  - name: concurrencyMode
    value: parallel
  - name: publisherConfirm
    value: false
  - name: enableDeadLetter # 可选,是否启用死信
    value: true
  - name: maxLen # 可选,队列中的最大消息数
    value: 3000
  - name: maxLenBytes # 可选,队列的最大字节长度
    value: 10485760
  - name: exchangeKind
    value: fanout
  - name: saslExternal
    value: false
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: myRabbitMQClientKey
      key: myRabbitMQClientKey

请注意,虽然 caCertclientCert 值可能不是 secrets,但为了方便起见,它们也可以从 Dapr secret 存储中引用。

启用消息传递重试

RabbitMQ pub/sub 组件不支持内置的重试策略。这意味着 sidecar 仅将消息发送到服务一次。当服务返回结果时,无论消息是否正确处理,消息都将被标记为已消费。请注意,这在所有 Dapr PubSub 组件中都是常见的,而不仅仅是 RabbitMQ。 当 autoAck 设置为 falserequeueInFailure 设置为 true 时,Dapr 可以尝试第二次重新传递消息。

要使 Dapr 使用更复杂的重试策略,您可以将 重试弹性策略 应用于 RabbitMQ pub/sub 组件。

两种重试消息的方法之间有一个关键区别:

  1. 使用 autoAck = falserequeueInFailure = true 时,RabbitMQ 负责重新传递消息,任何 订阅者都可以获取重新传递的消息。如果您的消费者有多个实例,那么可能会有另一个消费者获取它。这通常是更好的方法,因为如果存在瞬态故障,另一个工作者更有可能成功处理消息。
  2. 使用 Resiliency 使同一个 Dapr sidecar 重试重新传递消息。因此,将是同一个 Dapr sidecar 和同一个应用程序接收相同的消息。

创建 RabbitMQ 服务器

您可以使用 Docker 在本地运行 RabbitMQ 服务器:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

然后,您可以使用客户端端口与服务器交互:localhost:5672

在 Kubernetes 上安装 RabbitMQ 的最简单方法是使用 Helm chart

helm install rabbitmq stable/rabbitmq

查看 chart 输出并获取用户名和密码。

这将 RabbitMQ 安装到 default 命名空间。要与 RabbitMQ 交互,请使用以下命令查找服务:kubectl get svc rabbitmq

例如,如果使用上述示例进行安装,RabbitMQ 服务器客户端地址将是:

rabbitmq.default.svc.cluster.local:5672

使用主题交换路由消息

exchangeKind 设置为 "topic" 使用主题交换,这通常用于消息的多播路由。为了使用主题交换路由消息,您必须设置以下元数据:

  • routingKey:
    带有路由键的消息根据订阅时元数据中定义的 routing key 路由到一个或多个队列。

  • queueName:
    如果您没有设置 queueName,则只会创建一个队列,所有路由键将路由到该队列。这意味着所有订阅者将绑定到该队列,这不会产生预期的结果。

例如,如果应用程序配置了路由键 keyAqueueNamequeue-A

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: orderspubsub
spec:
  topic: B
  routes: 
    default: /B
  pubsubname: pubsub
  metadata:
    routingKey: keyA
    queueName: queue-A

它将接收路由键为 keyA 的消息,而其他路由键的消息将不被接收。

// 发布路由键为 `keyA` 的消息,这些消息将被上述示例接收。
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is a message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyA"}))
// 发布路由键为 `keyB` 的消息,这些消息将不被上述示例接收。
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyB"}))

绑定多个 routingKey

多个路由键可以用逗号分隔。
下面的示例绑定了三个 routingKeykeyAkeyB""。请注意空键的绑定方法。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: orderspubsub
spec:
  topic: B
  routes: 
    default: /B
  pubsubname: pubsub
  metadata:
    routingKey: keyA,keyB,

有关更多信息,请参阅 rabbitmq 交换

使用优先级队列

Dapr 支持 RabbitMQ 优先级队列。要为队列设置优先级,请使用 maxPriority 主题订阅元数据。

声明式优先级队列示例

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: pubsub
spec:
  topic: checkout
  routes: 
    default: /orders
  pubsubname: order-pub-sub
  metadata:
    maxPriority: 3

编程优先级队列示例

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'checkout',
        'routes': {
          'default': '/orders'
        },
        'metadata': {'maxPriority': '3'}
      }
    ]
    return jsonify(subscriptions)
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "checkout",
      routes: {
        default: '/orders'
      },
      metadata: {
        maxPriority: '3'
      }
    }
  ]);
})
package main

	"encoding/json"
	"net/http"

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

// 处理 /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "checkout",
			Routes: routes{
				Default: "/orders",
			},
      Metadata: map[string]string{
        "maxPriority": "3"
      },
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

发布消息时设置优先级

要在消息上设置优先级,请将发布元数据键 maxPriority 添加到发布端点或 SDK 方法。

curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders?metadata.priority=3 -H "Content-Type: application/json" -d '{"orderId": "100"}'
with DaprClient() as client:
        result = client.publish_event(
            pubsub_name=PUBSUB_NAME,
            topic_name=TOPIC_NAME,
            data=json.dumps(orderId),
            data_content_type='application/json',
            metadata= { 'priority': '3' })
await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId, { 'priority': '3' });
client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)), map[string]string{"priority": "3"})

使用仲裁队列

默认情况下,Dapr 创建 经典 队列。要创建 仲裁 队列,请将以下元数据添加到您的 pub/sub 订阅

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: pubsub
spec:
  topic: checkout
  routes: 
    default: /orders
  pubsubname: order-pub-sub
  metadata:
    queueType: quorum

生存时间

您可以在消息级别或组件级别设置生存时间(TTL)值。使用组件规范 ttlInSeconds 字段在组件中设置默认组件级别 TTL。

单一活动消费者

RabbitMQ 单一活动消费者 设置确保一次只有一个消费者从队列中处理消息,并在活动消费者被取消或失败时切换到另一个注册的消费者。当消息必须按到达队列的确切顺序消费且不支持多实例分布式处理时,可能需要这种方法。 当 Dapr 在队列上启用此选项时,Dapr 运行时的一个实例将是单一活动消费者。为了在故障情况下允许另一个应用程序实例接管,Dapr 运行时必须 探测应用程序的健康状况 并从 pub/sub 组件中取消订阅。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: pubsub
spec:
  topic: orders
  routes:
    default: /orders
  pubsubname: order-pub-sub
  metadata:
    singleActiveConsumer: "true"

相关链接

13 - Redis Streams

关于 Redis Streams pubsub 组件的详细文档

组件格式

要设置 Redis Streams pub/sub,创建一个类型为 pubsub.redis 的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 操作指南:发布和订阅指南 了解如何创建和应用 pub/sub 配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: "KeFg23!"
  - name: consumerID
    value: "channel1"
  - name: useEntraID
    value: "true"
  - name: enableTLS
    value: "false"

规格元数据字段

字段必需详情示例
redisHostYRedis 主机的连接字符串。如果 "redisType""cluster",可以是多个主机用逗号分隔,或仅一个主机localhost:6379, redis-master.default.svc.cluster.local:6379
redisPasswordNRedis 主机的密码。无默认值。可以是 secretKeyRef 以使用密钥引用"", "KeFg23!"
redisUsernameNRedis 主机的用户名。默认为空。确保您的 Redis 服务器版本为 6 或更高,并正确创建了 ACL 规则。"", "default"
consumerIDN消费者组 ID。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
useEntraIDN实现对 Azure Cache for Redis 的 EntraID 支持。启用此功能之前:
  • 必须以 "server:port" 的形式指定 redisHost 名称
  • 必须启用 TLS
了解更多关于此设置的信息,请参阅 创建 Redis 实例 > Azure Cache for Redis
"true", "false"
enableTLSN如果 Redis 实例支持带有公共证书的 TLS,可以配置为启用或禁用。默认为 "false""true", "false"
clientCertN客户端证书的内容,用于需要客户端证书的 Redis 实例。必须与 clientKey 一起使用,并且 enableTLS 必须设置为 true。建议使用密钥存储,如 此处 所述"----BEGIN CERTIFICATE-----\nMIIC..."
clientKeyN客户端私钥的内容,与 clientCert 一起用于身份验证。建议使用密钥存储,如 此处 所述"----BEGIN PRIVATE KEY-----\nMIIE..."
redeliverIntervalN检查待处理消息以重新传递的间隔。可以使用 Go duration 字符串(例如 “ms”, “s”, “m”)或毫秒数。默认为 "60s""0" 禁用重新传递。"30s", "5000"
processingTimeoutN消息在尝试重新传递之前必须挂起的时间量。可以使用 Go duration 字符串(例如 “ms”, “s”, “m”)或毫秒数。默认为 "15s""0" 禁用重新传递。"60s", "600000"
queueDepthN处理消息的队列大小。默认为 "100""1000"
concurrencyN处理消息的并发工作者数量。默认为 "10""15"
redisTypeNRedis 的类型。有两个有效值,一个是 "node" 表示单节点模式,另一个是 "cluster" 表示 Redis 集群模式。默认为 "node""cluster"
redisDBN连接到 Redis 后选择的数据库。如果 "redisType""cluster",此选项将被忽略。默认为 "0""0"
redisMaxRetriesN在放弃之前重试命令的最大次数。默认情况下不重试失败的命令。"5"
redisMinRetryIntervalN每次重试之间 Redis 命令的最小回退时间。默认为 "8ms""-1" 禁用回退。"8ms"
redisMaxRetryIntervalN每次重试之间 Redis 命令的最大回退时间。默认为 "512ms""-1" 禁用回退。"5s"
dialTimeoutN建立新连接的拨号超时时间。默认为 "5s""5s"
readTimeoutN套接字读取的超时时间。如果达到,Redis 命令将因超时而失败而不是阻塞。默认为 "3s""-1" 表示无超时。"3s"
writeTimeoutN套接字写入的超时时间。如果达到,Redis 命令将因超时而失败而不是阻塞。默认值为 readTimeout。"3s"
poolSizeN最大套接字连接数。默认是每个 CPU 10 个连接,由 runtime.NumCPU 报告。"20"
poolTimeoutN如果所有连接都忙,客户端等待连接的时间量,然后返回错误。默认是 readTimeout + 1 秒。"5s"
maxConnAgeN客户端退役(关闭)连接的连接年龄。默认是不关闭老化连接。"30m"
minIdleConnsN为了避免创建新连接的性能下降,保持打开的最小空闲连接数。默认为 "0""2"
idleCheckFrequencyN空闲连接清理器进行空闲检查的频率。默认为 "1m""-1" 禁用空闲连接清理器。"-1"
idleTimeoutN客户端关闭空闲连接的时间量。应小于服务器的超时时间。默认为 "5m""-1" 禁用空闲超时检查。"10m"
failoverN启用故障转移配置的属性。需要设置 sentinalMasterName。默认为 "false""true", "false"
sentinelMasterNameNSentinel 主名称。参见 Redis Sentinel 文档"", "127.0.0.1:6379"
maxLenApproxN流内的最大项目数。当达到指定长度时,旧条目会自动被驱逐,以便流保持恒定大小。默认为无限制。"10000"

创建 Redis 实例

Dapr 可以使用任何 Redis 实例 - 无论是容器化的、在本地开发机器上运行的,还是托管的云服务,只要 Redis 的版本是 5.x 或 6.x。

Dapr CLI 会自动为您创建并设置一个 Redis Streams 实例。 当您运行 dapr init 时,Redis 实例将通过 Docker 安装,并且组件文件将创建在默认目录中。($HOME/.dapr/components 目录 (Mac/Linux) 或 %USERPROFILE%\.dapr\components 在 Windows 上)。

您可以使用 Helm 快速在 Kubernetes 集群中创建一个 Redis 实例。此方法需要 安装 Helm

  1. 将 Redis 安装到您的集群中。

    helm repo add bitnami https://charts.bitnami.com/bitnami
    helm install redis bitnami/redis --set image.tag=6.2
    
  2. 运行 kubectl get pods 查看现在在您的集群中运行的 Redis 容器。

  3. 在您的 redis.yaml 文件中将 redis-master:6379 添加为 redisHost。例如:

        metadata:
        - name: redisHost
          value: redis-master:6379
    
  4. 接下来,我们将获取我们的 Redis 密码,这在不同操作系统上略有不同:

    • Windows: 运行 kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" > encoded.b64,这将创建一个包含您编码密码的文件。接下来,运行 certutil -decode encoded.b64 password.txt,这将把您的 Redis 密码放入一个名为 password.txt 的文本文件中。复制密码并删除这两个文件。

    • Linux/MacOS: 运行 kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" | base64 --decode 并复制输出的密码。

    将此密码作为 redisPassword 值添加到您的 redis.yaml 文件中。例如:

        - name: redisPassword
          value: "lhDOkwTlp0"
    
  1. 使用官方 Microsoft 文档创建 Azure Cache for Redis 实例。

  2. 一旦您的实例创建完成,从 Azure 门户获取主机名(FQDN)和您的访问密钥。

    • 对于主机名:
      • 导航到资源的 概览 页面。
      • 复制 主机名 值。
    • 对于您的访问密钥:
      • 导航到 设置 > 访问密钥
      • 复制并保存您的密钥。
  3. 将您的密钥和主机名添加到 Dapr 可以应用到您集群的 redis.yaml 文件中。

    • 如果您正在运行一个示例,将主机和密钥添加到提供的 redis.yaml 中。
    • 如果您从头开始创建项目,请按照 组件格式部分 中的说明创建一个 redis.yaml 文件。
  4. redisHost 键设置为 [上一步中的主机名]:6379,并将 redisPassword 键设置为您之前保存的密钥。

    注意: 在生产级应用程序中,请按照 密钥管理 指南安全地管理您的密钥。

  5. 启用 EntraID 支持:

    • 在您的 Azure Redis 服务器上启用 Entra ID 身份验证。这可能需要几分钟。
    • useEntraID 设置为 "true" 以实现对 Azure Cache for Redis 的 EntraID 支持。
  6. enableTLS 设置为 "true" 以支持 TLS。

注意:useEntraID 假设您的 UserPrincipal(通过 AzureCLICredential)或 SystemAssigned 托管身份具有 RedisDataOwner 角色权限。如果使用用户分配的身份,您需要指定 azureClientID 属性

相关链接

14 - RocketMQ

关于 RocketMQ pubsub 组件的详细文档

组件格式

要设置 RocketMQ pub/sub,创建一个类型为 pubsub.rocketmq 的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何:发布和订阅指南 了解如何创建和应用 pub/sub 配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rocketmq-pubsub
spec:
  type: pubsub.rocketmq
  version: v1
  metadata:
    - name: instanceName
      value: dapr-rocketmq-test
    - name: consumerGroup
      value: dapr-rocketmq-test-g-c
    - name: producerGroup 
      value: dapr-rocketmq-test-g-p
    - name: consumerID
      value: channel1
    - name: nameSpace
      value: dapr-test
    - name: nameServer
      value: "127.0.0.1:9876,127.0.0.2:9876"
    - name: retries
      value: 3
    - name: consumerModel
      value: "clustering"
    - name: consumeOrderly
      value: false

规格元数据字段

字段必需详情默认值示例
instanceNameN实例名称time.Now().String()dapr-rocketmq-test
consumerGroupN消费者组名称。建议使用。如果 producerGroupnull,则使用 groupNamedapr-rocketmq-test-g-c
producerGroup (consumerID)N生产者组名称。建议使用。如果 producerGroupnull,则使用 consumerID。如果 consumerID 也为 null,则使用 groupNamedapr-rocketmq-test-g-p
consumerIDN消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,一条消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看可以在组件元数据中使用的所有模板标签。
groupNameN消费者/生产者组名称。已弃用dapr-rocketmq-test-g
nameSpaceNRocketMQ 命名空间dapr-rocketmq
nameServerDomainNRocketMQ 名称服务器域名https://my-app.net:8080/nsaddr
nameServerNRocketMQ 名称服务器,使用 “,” 或 “;” 分隔127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKeyN访问密钥(用户名)"admin"
secretKeyN密钥(密码)"password"
securityTokenN安全令牌
retriesN向 broker 发送消息的重试次数33
producerQueueSelector (queueSelector)N生产者队列选择器。有五种队列选择器实现:hashrandommanualroundRobindaprdaprhash
consumerModelN定义消息如何传递到每个消费者客户端的消息模型。RocketMQ 支持两种消息模型:clusteringbroadcastingclusteringbroadcasting , clustering
fromWhere (consumeFromWhere)N消费者启动时的消费点。有三个消费点:CONSUME_FROM_LAST_OFFSETCONSUME_FROM_FIRST_OFFSETCONSUME_FROM_TIMESTAMPCONSUME_FROM_LAST_OFFSETCONSUME_FROM_LAST_OFFSET
consumeTimestampN以秒为精度回溯消费时间。时间格式为 yyyymmddhhmmss。例如,20131223171201 表示 2013 年 12 月 23 日 17:12:01time.Now().Add(time.Minute * (-30)).Format("20060102150405")20131223171201
consumeOrderlyN确定是否使用 FIFO 顺序的有序消息。falsefalse
consumeMessageBatchMaxSizeN批量消费大小,范围 [1, 1024]51210
consumeConcurrentlyMaxSpanN并发最大跨度偏移。这对顺序消费没有影响。范围:[1, 65535]10001000
maxReconsumeTimesN最大重新消费次数。-1 表示 16 次。如果消息在成功前被重新消费超过 {@link maxReconsumeTimes} 次,它们将被定向到删除队列。顺序消息为 MaxInt32;并发消息为 1616
autoCommitN启用自动提交truefalse
consumeTimeoutN消息可能阻塞消费线程的最大时间。时间单位:分钟1515
consumerPullTimeoutN套接字超时时间,单位为毫秒
pullIntervalN消息拉取间隔100100
pullBatchSizeN一次从 broker 拉取的消息数量。如果 pullBatchSizenull,使用 ConsumerBatchSizepullBatchSize 范围 [1, 1024]3210
pullThresholdForQueueN队列级别的流量控制阈值。默认情况下,每个消息队列将缓存最多 1000 条消息。考虑 PullBatchSize - 瞬时值可能超过限制。范围:[1, 65535]10241000
pullThresholdForTopicN主题级别的流量控制阈值。如果 pullThresholdForQueue 不是无限制的,将被 pullThresholdForTopic 的值覆盖并计算。例如,如果 pullThresholdForTopic 的值为 1000,并且为此消费者分配了 10 个消息队列,则 pullThresholdForQueue 将设置为 100。范围:[1, 6553500]-1(无限制)10
pullThresholdSizeForQueueN限制队列级别的缓存消息大小。考虑 pullBatchSize - 瞬时值可能超过限制。消息的大小仅通过消息体测量,因此不准确。范围:[1, 1024]100100
pullThresholdSizeForTopicN限制主题级别的缓存消息大小。如果 pullThresholdSizeForQueue 不是无限制的,将被 pullThresholdSizeForTopic 的值覆盖并计算。例如,如果 pullThresholdSizeForTopic 的值为 1000 MiB,并且为此消费者分配了 10 个消息队列,则 pullThresholdSizeForQueue 将设置为 100 MiB。范围:[1, 102400]-1100
content-typeN消息内容类型。"text/plain""application/cloudevents+json; charset=utf-8", "application/octet-stream"
logLevelN日志级别warninfo
sendTimeOutN连接 RocketMQ 的 broker 发送消息超时,以纳秒为单位。已弃用3 秒10000000000
sendTimeOutSecN发布消息的超时时间,以秒为单位。如果 sendTimeOutSecnull,则使用 sendTimeOut3 秒3
mspPropertiesNRocketMQ 消息属性集合中的属性传递给应用程序,数据用 “,” 分隔多个属性key,mkey

出于向后兼容的原因,元数据中的以下值仍然支持,尽管不推荐使用。

字段(支持但已弃用)必需详情示例
groupNameNRocketMQ 发布者的生产者组名称"my_unique_group_name"
sendTimeOutN发布消息的超时时间,以纳秒为单位0
consumerBatchSizeN一次从 broker 拉取的消息数量32

设置 RocketMQ

请参阅 https://rocketmq.apache.org/docs/quick-start/ 以设置本地 RocketMQ 实例。

每次调用的元数据字段

分区键

在调用 RocketMQ pub/sub 时,可以通过在请求 URL 中使用 metadata 查询参数提供可选的分区键。

您需要在 metadata 中指定 rocketmq-tag"rocketmq-key"rocketmq-shardingkeyrocketmq-queue

示例:

curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

队列选择器

RocketMQ 组件提供了五种队列选择器。RocketMQ 客户端提供以下队列选择器:

  • HashQueueSelector
  • RandomQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

要了解有关这些 RocketMQ 客户端队列选择器的更多信息,请阅读 RocketMQ 文档

Dapr RocketMQ 组件实现了以下队列选择器:

  • DaprQueueSelector

本文重点介绍 DaprQueueSelector 的设计。

DaprQueueSelector

DaprQueueSelector 集成了三个队列选择器:

  • HashQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

DaprQueueSelector 从请求参数中获取队列 ID。您可以通过运行以下命令设置队列 ID:

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1

ManualQueueSelector 是通过上述方法实现的。

接下来,DaprQueueSelector 尝试:

  • 获取 ShardingKey
  • 哈希 ShardingKey 以确定队列 ID。

您可以通过以下方式设置 ShardingKey

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key

如果 ShardingKey 不存在,则使用 RoundRobin 算法确定队列 ID。

相关链接

15 - Solace-AMQP

关于 Solace-AMQP 发布/订阅组件的详细文档

组件格式

要配置 Solace-AMQP 发布/订阅组件,请创建一个类型为 pubsub.solace.amqp 的组件。请参考 发布/订阅代理组件文件 了解 ConsumerID 的自动生成方式。参阅 操作指南:发布和订阅指南 以获取创建和应用发布/订阅配置的步骤。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: solace
spec:
  type: pubsub.solace.amqp
  version: v1
  metadata:
    - name: url
      value: 'amqp://localhost:5672'
    - name: username
      value: 'default'
    - name: password
      value: 'default'
    - name: consumerID
      value: 'channel1'

规格元数据字段

字段必需详情示例
urlYAMQP 代理的地址。可以使用 secretKeyRef 引用密钥。
使用 amqp:// URI 方案进行非 TLS 通信。
使用 amqps:// URI 方案进行 TLS 通信。
"amqp://host.domain[:port]"
usernameY连接到代理的用户名。仅在未启用匿名连接或设置为 false 时需要。default
passwordY连接到代理的密码。仅在未启用匿名连接或设置为 false 时需要。default
consumerIDN消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID) 值。可以设置为字符串值(如上例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
anonymousN在不进行凭证验证的情况下连接到代理。仅在代理上启用时有效。如果设置为 true,则不需要用户名和密码。true
caCert使用 TLS 时必需用于验证服务器 TLS 证书的 PEM 格式的证书颁发机构 (CA) 证书。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert使用 TLS 时必需PEM 格式的 TLS 客户端证书。必须与 clientKey 一起使用。"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey使用 TLS 时必需PEM 格式的 TLS 客户端密钥。必须与 clientCert 一起使用。可以使用 secretKeyRef 引用密钥。"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"

使用 TLS 进行通信

要配置使用 TLS 进行通信:

  1. 确保 Solace 代理已配置为支持证书。
  2. 在组件配置中提供 caCertclientCertclientKey 元数据。

例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: solace
spec:
  type: pubsub.solace.amqp
  version: v1
  metadata:
  - name: url
    value: "amqps://host.domain[:port]"
  - name: username
    value: 'default'
  - name: password
    value: 'default'
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: mySolaceClientKey
      key: mySolaceClientKey
auth:
  secretStore: <SECRET_STORE_NAME>

虽然 caCertclientCert 的值可能不是密钥,但为了方便起见,它们也可以从 Dapr 密钥存储中引用。

发布/订阅主题和队列

默认情况下,消息通过主题发布和订阅。如果您希望目标是队列,请在主题前加上 queue: 前缀,Solace AMQP 组件将连接到队列。

创建 Solace 代理

您可以使用 Docker 本地运行 Solace 代理

docker run -d -p 8080:8080 -p 55554:55555 -p 8008:8008 -p 1883:1883 -p 8000:8000 -p 5672:5672 -p 9000:9000 -p 2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard

然后您可以使用客户端端口与服务器交互:mqtt://localhost:5672

您还可以在 Solace Cloud 上注册一个免费的 SaaS 代理。

相关链接

16 - 内存

关于内存 pubsub 组件的详细文档

内存 pub/sub 组件运行在单个 Dapr sidecar 中。这主要用于开发目的。状态不会在多个 sidecar 之间复制,并且在 Dapr sidecar 重启时会丢失。

组件格式

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.in-memory
  version: v1
  metadata: []

注意:内存组件不需要特定的元数据即可工作,但 spec.metadata 是必填字段。

相关链接