发布/订阅代理组件规范 支持与Dapr接口的发布/订阅代理
下表展示了Dapr发布/订阅模块支持的发布和订阅代理。了解如何为Dapr配置不同的发布/订阅代理。
发布/订阅组件的重试机制与入站弹性 每个发布/订阅组件都有其自带的重试机制。在应用
Dapr弹性策略 之前,请确保您了解所使用的发布/订阅组件的默认重试策略。Dapr弹性策略并不是替代这些内置重试,而是对其进行补充,这可能导致消息的重复处理。
Table headers to note:
Header Description Example Status Component certification statusAlpha Beta Stable Component version The version of the component v1 Since runtime version The version of the Dapr runtime when the component status was set or updated 1.11
Generic Amazon Web Services (AWS) Component Status Component version Since runtime version AWS SNS/SQS Stable v1 1.10
Google Cloud Platform (GCP) Component Status Component version Since runtime version GCP Pub/Sub Stable v1 1.11
Microsoft Azure 1 - Apache Kafka Apache Kafka pubsub 组件的详细说明文档
组件格式 要设置 Apache Kafka 的发布/订阅功能,您需要创建一个类型为 pubsub.kafka
的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何:发布和订阅指南 了解如何创建和应用发布/订阅配置。
所有组件的元数据字段值可以使用 模板化的元数据值 ,这些值会在 Dapr sidecar 启动时解析。例如,您可以选择使用 {namespace}
作为 consumerGroup
,以便在不同命名空间中使用相同的 appId
和主题,如 本文 所述。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "{namespace}"
- name : consumerID # 可选字段。如果未提供,运行时将自动创建。
value : "channel1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "password"
- name : saslUsername # 如果 authType 是 `password`,则必需。
value : "adminuser"
- name : saslPassword # 如果 authType 是 `password`,则必需。
secretKeyRef :
name : kafka-secrets
key : saslPasswordSecret
- name : saslMechanism
value : "SHA-512"
- name : maxMessageBytes # 可选字段。
value : 1024
- name : consumeRetryInterval # 可选字段。
value : 200ms
- name : heartbeatInterval # 可选字段。
value : 5s
- name : sessionTimeout # 可选字段。
value : 15s
- name : version # 可选字段。
value : 2.0.0
- name : disableTls # 可选字段。禁用 TLS。在生产环境中不安全!请阅读 `Mutual TLS` 部分以了解如何使用 TLS。
value : "true"
- name : consumerFetchMin # 可选字段。高级设置。请求中要获取的最小消息字节数 - broker 将等待直到至少有这么多可用。
value : 1
- name : consumerFetchDefault # 可选字段。高级设置。每个请求中从 broker 获取的默认消息字节数。
value : 2097152
- name : channelBufferSize # 可选字段。高级设置。内部和外部通道中要缓冲的事件数量。
value : 512
- name : schemaRegistryURL # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry URL。
value : http://localhost:8081
- name : schemaRegistryAPIKey # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry API Key。
value : XYAXXAZ
- name : schemaRegistryAPISecret # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Secret。
value : "ABCDEFGMEADFF"
- name : schemaCachingEnabled # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。启用模式缓存。
value : true
- name : schemaLatestVersionCacheTTL # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。发布具有最新模式的消息时的模式缓存 TTL。
value : 5m
- name : escapeHeaders # 可选字段。
value : false
有关使用 secretKeyRef
的详细信息,请参阅 如何在组件中引用 secrets 的指南。
规格元数据字段 字段 必需 详情 示例 brokers Y 逗号分隔的 Kafka brokers 列表。 "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093"
consumerGroup N 监听的 kafka 消费者组。发布到主题的每条记录都会传递给订阅该主题的每个消费者组中的一个消费者。如果提供了 consumerGroup
的值,则忽略 consumerID
的任何值 - 将为 consumerID
设置消费者组和随机唯一标识符的组合。 "group1"
consumerID N 消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。如果提供了 consumerGroup
的值,则忽略 consumerID
的任何值 - 将为 consumerID
设置消费者组和随机唯一标识符的组合。 可以设置为字符串值(例如上例中的 "channel1"
)或字符串格式值(例如 "{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 clientID N 用户提供的字符串,随每个请求发送到 Kafka brokers,用于日志记录、调试和审计。默认为 Kubernetes 模式的 "namespace.appID"
或 Self-Hosted 模式的 "appID"
。 "my-namespace.my-dapr-app"
,"my-dapr-app"
authRequired N 已弃用 启用 SASL 认证与 Kafka brokers。"true"
,"false"
authType Y 配置或禁用认证。支持的值:none
,password
,mtls
,oidc
或 awsiam
"password"
,"none"
saslUsername N 用于认证的 SASL 用户名。仅在 authType
设置为 "password"
时需要。 "adminuser"
saslPassword N 用于认证的 SASL 密码。可以是 secretKeyRef
以使用 secret 引用 。仅在 authType
设置为 "password"
时需要。 ""
,"KeFg23!"
saslMechanism N 您希望使用的 SASL 认证机制。仅在 authType
设置为 "password"
时需要。默认为 PLAINTEXT
"SHA-512", "SHA-256", "PLAINTEXT"
initialOffset N 如果没有先前提交的偏移量,则使用的初始偏移量。应为 “newest” 或 “oldest”。默认为 “newest”。 "oldest"
maxMessageBytes N 允许的单个 Kafka 消息的最大字节大小。默认为 1024。 2048
consumeRetryInterval N 尝试消费主题时的重试间隔。将没有后缀的数字视为毫秒。默认为 100ms。 200ms
consumeRetryEnabled N 通过设置 "false"
禁用消费重试 "true"
,"false"
version N Kafka 集群版本。默认为 2.0.0。请注意,如果您使用 Azure EventHubs 和 Kafka,则必须将其设置为 1.0.0
。 0.10.2.0
caCert N 证书颁发机构证书,使用 TLS 时需要。可以是 secretKeyRef
以使用 secret 引用 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert N 客户端证书,authType
为 mtls
时需要。可以是 secretKeyRef
以使用 secret 引用 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey N 客户端密钥,authType
为 mtls
时需要。可以是 secretKeyRef
以使用 secret 引用 "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
skipVerify N 跳过 TLS 验证,不建议在生产中使用。默认为 "false"
"true"
,"false"
disableTls N 禁用传输安全的 TLS。要禁用,您不需要将值设置为 "true"
。不建议在生产中使用。默认为 "false"
。 "true"
,"false"
oidcTokenEndpoint N OAuth2 身份提供者访问令牌端点的完整 URL。当 authType
设置为 oidc
时需要 “https://identity.example.com/v1/token" oidcClientID N 在身份提供者中配置的 OAuth2 客户端 ID。当 authType
设置为 oidc
时需要 dapr-kafka
oidcClientSecret N 在身份提供者中配置的 OAuth2 客户端 secret:当 authType
设置为 oidc
时需要 "KeFg23!"
oidcScopes N 用于请求访问令牌的 OAuth2/OIDC 范围的逗号分隔列表。当 authType
设置为 oidc
时推荐。默认为 "openid"
"openid,kafka-prod"
oidcExtensions N 包含 OAuth2/OIDC 扩展的 JSON 编码字典的字符串,用于请求访问令牌 {"cluster":"kafka","poolid":"kafkapool"}
awsRegion N 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘region’。Kafka 集群部署到的 AWS 区域。当 authType
设置为 awsiam
时需要 us-west-1
awsAccessKey N 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘accessKey’。与 IAM 账户关联的 AWS 访问密钥。 "accessKey"
awsSecretKey N 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘secretKey’。与访问密钥关联的 secret 密钥。 "secretKey"
awsSessionToken N 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘sessionToken’。要使用的 AWS 会话令牌。仅在使用临时安全凭证时需要会话令牌。 "sessionToken"
awsIamRoleArn N 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘assumeRoleArn’。具有访问 AWS 管理的 Apache Kafka (MSK) 的 IAM 角色。这是除 AWS 凭证外的另一种与 MSK 认证的选项。 "arn:aws:iam::123456789:role/mskRole"
awsStsSessionName N 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘sessionName’。表示假设角色的会话名称。 "DaprDefaultSession"
schemaRegistryURL N 使用 Schema Registry Avro 序列化/反序列化时需要。Schema Registry URL。 http://localhost:8081
schemaRegistryAPIKey N 使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Key。 XYAXXAZ
schemaRegistryAPISecret N 使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Secret。 ABCDEFGMEADFF
schemaCachingEnabled N 使用 Schema Registry Avro 序列化/反序列化时。启用模式缓存。默认为 true
true
schemaLatestVersionCacheTTL N 使用 Schema Registry Avro 序列化/反序列化时。发布具有最新模式的消息时的模式缓存 TTL。默认为 5 分钟 5m
clientConnectionTopicMetadataRefreshInterval N 客户端连接的主题元数据与 broker 刷新的间隔,以 Go 持续时间表示。默认为 9m
。 "4m"
clientConnectionKeepAliveInterval N 客户端连接与 broker 保持活动的最长时间,以 Go 持续时间表示,然后关闭连接。零值(默认)表示无限期保持活动。 "4m"
consumerFetchMin N 请求中要获取的最小消息字节数 - broker 将等待直到至少有这么多可用。默认值为 1
,因为 0
会导致消费者在没有消息可用时旋转。相当于 JVM 的 fetch.min.bytes
。 "2"
consumerFetchDefault N 每个请求中从 broker 获取的默认消息字节数。默认值为 "1048576"
字节。 "2097152"
channelBufferSize N 内部和外部通道中要缓冲的事件数量。这允许生产者和消费者在用户代码工作时继续在后台处理一些消息,从而大大提高吞吐量。默认为 256
。 "512"
heartbeatInterval N 向消费者协调器发送心跳的间隔。最多应将值设置为 sessionTimeout
值的 1/3。默认为 “3s”。 "5s"
sessionTimeout N 使用 Kafka 的组管理功能时用于检测客户端故障的超时时间。如果 broker 在此会话超时之前未收到任何来自消费者的心跳,则消费者将被移除并启动重新平衡。默认为 “10s”。 "20s"
escapeHeaders N 启用对消费者接收到的消息头值的 URL 转义。允许接收通常不允许在 HTTP 头中使用的特殊字符内容。默认为 false
。 true
上面的 secretKeyRef
引用了一个 kubernetes secrets store 以访问 tls 信息。访问 此处 了解有关如何配置 secret store 组件的更多信息。
注意 使用 Azure EventHubs 和 Kafka 时,元数据 version
必须设置为 1.0.0
。
认证 Kafka 支持多种认证方案,Dapr 支持几种:SASL 密码、mTLS、OIDC/OAuth2。随着添加的认证方法,authRequired
字段已从 v1.6 版本中弃用,取而代之的是 authType
字段。如果 authRequired
设置为 true
,Dapr 将尝试根据 saslPassword
的值正确配置 authType
。authType
的有效值为:
none
password
certificate
mtls
oidc
awsiam
注意 authType
仅用于 认证 。授权 仍在 Kafka 内配置,除了 awsiam
,它还可以驱动在 AWS IAM 中配置的授权决策。None 将 authType
设置为 none
将禁用任何认证。这在生产中不推荐 。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-noauth
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "none"
- name : maxMessageBytes # 可选字段。
value : 1024
- name : consumeRetryInterval # 可选字段。
value : 200ms
- name : heartbeatInterval # 可选字段。
value : 5s
- name : sessionTimeout # 可选字段。
value : 15s
- name : version # 可选字段。
value : 0.10.2.0
- name : disableTls
value : "true"
SASL 密码 将 authType
设置为 password
启用 SASL 认证。这需要设置 saslUsername
和 saslPassword
字段。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-sasl
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "password"
- name : saslUsername # 如果 authType 是 `password`,则必需。
value : "adminuser"
- name : saslPassword # 如果 authType 是 `password`,则必需。
secretKeyRef :
name : kafka-secrets
key : saslPasswordSecret
- name : saslMechanism
value : "SHA-512"
- name : maxMessageBytes # 可选字段。
value : 1024
- name : consumeRetryInterval # 可选字段。
value : 200ms
- name : heartbeatInterval # 可选字段。
value : 5s
- name : sessionTimeout # 可选字段。
value : 15s
- name : version # 可选字段。
value : 0.10.2.0
- name : caCert
secretKeyRef :
name : kafka-tls
key : caCert
Mutual TLS 将 authType
设置为 mtls
使用 x509 客户端证书(clientCert
字段)和密钥(clientKey
字段)进行认证。请注意,mTLS 作为认证机制与通过加密保护传输层的 TLS 使用是不同的。mTLS 需要 TLS 传输(意味着 disableTls
必须为 false
),但保护传输层不需要使用 mTLS。请参阅 使用 TLS 进行通信 以配置底层 TLS 传输。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-mtls
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "mtls"
- name : caCert
secretKeyRef :
name : kafka-tls
key : caCert
- name : clientCert
secretKeyRef :
name : kafka-tls
key : clientCert
- name : clientKey
secretKeyRef :
name : kafka-tls
key : clientKey
- name : maxMessageBytes # 可选字段。
value : 1024
- name : consumeRetryInterval # 可选字段。
value : 200ms
- name : heartbeatInterval # 可选字段。
value : 5s
- name : sessionTimeout # 可选字段。
value : 15s
- name : version # 可选字段。
value : 0.10.2.0
OAuth2 或 OpenID Connect 将 authType
设置为 oidc
启用通过 OAUTHBEARER 机制的 SASL 认证。这支持从外部 OAuth2 或 OIDC 身份提供者指定一个持有者令牌。目前,仅支持 client_credentials 授权。
配置 oidcTokenEndpoint
为身份提供者访问令牌端点的完整 URL。
设置 oidcClientID
和 oidcClientSecret
为在身份提供者中配置的客户端凭证。
如果在组件配置中指定了 caCert
,则证书将附加到系统 CA 信任中以验证身份提供者证书。同样,如果在组件配置中指定了 skipVerify
,则在访问身份提供者时也将跳过验证。
默认情况下,令牌请求的唯一范围是 openid
;强烈建议通过 oidcScopes
以逗号分隔的列表指定其他范围,并由 Kafka broker 验证。如果不使用其他范围来缩小访问令牌的有效性,
被破坏的 Kafka broker 可能会重放令牌以访问其他服务作为 Dapr clientID。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "oidc"
- name : oidcTokenEndpoint # 如果 authType 是 `oidc`,则必需。
value : "https://identity.example.com/v1/token"
- name : oidcClientID # 如果 authType 是 `oidc`,则必需。
value : "dapr-myapp"
- name : oidcClientSecret # 如果 authType 是 `oidc`,则必需。
secretKeyRef :
name : kafka-secrets
key : oidcClientSecret
- name : oidcScopes # 如果 authType 是 `oidc`,则推荐。
value : "openid,kafka-dev"
- name : caCert # 也应用于验证 OIDC 提供者证书
secretKeyRef :
name : kafka-tls
key : caCert
- name : maxMessageBytes # 可选字段。
value : 1024
- name : consumeRetryInterval # 可选字段。
value : 200ms
- name : heartbeatInterval # 可选字段。
value : 5s
- name : sessionTimeout # 可选字段。
value : 15s
- name : version # 可选字段。
value : 0.10.2.0
AWS IAM 支持使用 MSK 进行 AWS IAM 认证。将 authType
设置为 awsiam
使用 AWS SDK 生成认证令牌进行认证。
注意 唯一必需的元数据字段是 region
。如果没有提供 acessKey
和 secretKey
,您可以使用 AWS IAM 角色为服务账户提供无密码认证到您的 Kafka 集群。apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-awsiam
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "awsiam"
- name : region # 必需字段。
value : "us-west-1"
- name : accessKey # 可选字段。
value : <AWS_ACCESS_KEY>
- name : secretKey # 可选字段。
value : <AWS_SECRET_KEY>
- name : sessionToken # 可选字段。
value : <AWS_SESSION_KEY>
- name : assumeRoleArn # 可选字段。
value : "arn:aws:iam::123456789:role/mskRole"
- name : sessionName # 可选字段。
value : "DaprDefaultSession"
使用 TLS 进行通信 默认情况下,启用 TLS 以保护到 Kafka 的传输层。要禁用 TLS,请将 disableTls
设置为 true
。启用 TLS 时,您可以
使用 skipVerify
控制服务器证书验证以禁用验证(不推荐在生产环境中使用 )和 caCert
指定受信任的 TLS 证书颁发机构(CA)。如果没有指定 caCert
,将使用系统 CA 信任。要配置 mTLS 认证,
请参阅 认证 部分。
下面是一个配置为使用传输层 TLS 的 Kafka pubsub 组件示例:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "certificate"
- name : consumeRetryInterval # 可选字段。
value : 200ms
- name : heartbeatInterval # 可选字段。
value : 5s
- name : sessionTimeout # 可选字段。
value : 15s
- name : version # 可选字段。
value : 0.10.2.0
- name : maxMessageBytes # 可选字段。
value : 1024
- name : caCert # 证书颁发机构证书。
secretKeyRef :
name : kafka-tls
key : caCert
auth :
secretStore : <SECRET_STORE_NAME>
从多个主题消费 当使用单个 pub/sub 组件从多个主题消费时,无法保证您的消费者组中的消费者如何在主题分区之间平衡。
例如,假设您订阅了两个主题,每个主题有 10 个分区,并且您有 20 个服务副本从这两个主题消费。无法保证 10 个将分配给第一个主题,10 个将分配给第二个主题。相反,分区可能会不均匀地划分,超过 10 个分配给第一个主题,其余分配给第二个主题。
这可能导致第一个主题的消费者空闲,而第二个主题的消费者过度扩展,反之亦然。当使用自动缩放器(如 HPA 或 KEDA)时,也可以观察到这种行为。
如果您遇到此特定问题,建议您为每个主题配置一个单独的 pub/sub 组件,并为每个组件定义唯一的消费者组。这可以确保您的服务的所有副本都完全分配给唯一的消费者组,其中每个消费者组针对一个特定主题。
例如,您可以定义两个 Dapr 组件,具有以下配置:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-topic-one
spec :
type : pubsub.kafka
version : v1
metadata :
- name : consumerGroup
value : "{appID}-topic-one"
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-topic-two
spec :
type : pubsub.kafka
version : v1
metadata :
- name : consumerGroup
value : "{appID}-topic-two"
发送和接收多条消息 Apache Kafka 组件支持使用批量 Pub/sub API 在单个操作中发送和接收多条消息。
配置批量订阅 订阅主题时,您可以配置 bulkSubscribe
选项。有关更多详细信息,请参阅 批量订阅消息 。了解更多关于 批量订阅 API 的信息。
Apache Kafka 支持以下批量元数据选项:
配置 默认值 maxBulkAwaitDurationMs
10000
(10s)maxBulkSubCount
80
每次调用的元数据字段 分区键 调用 Kafka pub/sub 时,可以通过在请求 URL 中使用 metadata
查询参数提供可选的分区键。
参数名称可以是 partitionKey
或 __key
示例:
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey= key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
消息头 所有其他元数据键/值对(不是 partitionKey
或 __key
)都设置为 Kafka 消息中的头。以下是为消息设置 correlationId
的示例。
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId= myCorrelationID& metadata.partitionKey= key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
Kafka Pubsub 在消费者端接收到的特殊消息头 消费消息时,特殊消息元数据会自动作为头传递。这些是:
__key
:如果可用,消息键__topic
:消息的主题__partition
:消息的分区号__offset
:消息在分区中的偏移量__timestamp
:消息的时间戳您可以在消费者端点中访问它们,如下所示:
from fastapi import APIRouter , Body , Response , status
import json
import sys
app = FastAPI ()
router = APIRouter ()
@router.get ( '/dapr/subscribe' )
def subscribe ():
subscriptions = [{ 'pubsubname' : 'pubsub' ,
'topic' : 'my-topic' ,
'route' : 'my_topic_subscriber' ,
}]
return subscriptions
@router.post ( '/my_topic_subscriber' )
def my_topic_subscriber (
key : Annotated [ str , Header ( alias = "__key" )],
offset : Annotated [ int , Header ( alias = "__offset" )],
event_data = Body ()):
print ( f "key= { key } - offset= { offset } - data= { event_data } " , flush = True )
return Response ( status_code = status . HTTP_200_OK )
app . include_router ( router )
接收带有特殊字符的消息头 消费者应用程序可能需要接收包含特殊字符的消息头,这可能会导致 HTTP 协议验证错误。
HTTP 头值必须遵循规范,使得某些字符不被允许。了解更多关于协议的信息 。
在这种情况下,您可以启用 escapeHeaders
配置设置,该设置使用 URL 转义在消费者端对头值进行编码。
注意 使用此设置时,接收到的消息头是 URL 转义的,您需要对其进行 URL “反转义” 以获得原始值。将 escapeHeaders
设置为 true
以进行 URL 转义。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kafka-pubsub-escape-headers
spec :
type : pubsub.kafka
version : v1
metadata :
- name : brokers # 必需字段,Kafka broker 连接设置
value : "dapr-kafka.myapp.svc.cluster.local:9092"
- name : consumerGroup # 可选字段,用于输入绑定。
value : "group1"
- name : clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value : "my-dapr-app-id"
- name : authType # 必需字段。
value : "none"
- name : escapeHeaders
value : "true"
Avro Schema Registry 序列化/反序列化 您可以配置 pub/sub 以使用 Avro 二进制序列化 发布或消费数据,利用 Apache Schema Registry (例如,Confluent Schema Registry ,Apicurio )。
配置
重要 目前,仅支持消息值的序列化/反序列化。由于不支持云事件,发布 Avro 消息时必须传递 rawPayload=true
元数据。
请注意,消费者不应设置 rawPayload=true
,因为消息值将被包装到 CloudEvent 中并进行 base64 编码。将 rawPayload
保持为默认值(即 false
)将以 JSON 负载的形式将 Avro 解码的消息发送到应用程序。配置 Kafka pub/sub 组件元数据时,您必须定义:
Schema Registry URL API key/secret(如果适用) 模式主题是根据主题名称自动派生的,使用标准命名约定。例如,对于名为 my-topic
的主题,模式主题将是 my-topic-value
。
在服务中与消息负载交互时,它是 JSON 格式。负载在 Dapr 组件中透明地序列化/反序列化。
日期/日期时间字段必须作为其 Epoch Unix 时间戳 等效值传递(而不是典型的 Iso8601)。例如:
2024-01-10T04:36:05.986Z
应传递为 1704861365986
(自 1970 年 1 月 1 日以来的毫秒数)2024-01-10
应传递为 19732
(自 1970 年 1 月 1 日以来的天数)发布 Avro 消息 为了向 Kafka pub/sub 组件指示消息应使用 Avro 序列化,必须在 metadata
中设置 valueSchemaType
为 Avro
。
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload= true& metadata.valueSchemaType= Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
from dapr.clients import DaprClient
with DaprClient () as d :
req_data = {
'order_number' : '345' ,
'created_date' : 1704861365986
}
# 创建一个带有内容类型和主体的类型化消息
resp = d . publish_event (
pubsub_name = 'pubsub' ,
topic_name = 'my-topic' ,
data = json . dumps ( req_data ),
publish_metadata = { 'rawPayload' : 'true' , 'valueSchemaType' : 'Avro' }
)
# 打印请求
print ( req_data , flush = True )
订阅 Avro 主题 为了向 Kafka pub/sub 组件指示消息应使用 Avro 进行反序列化,必须在订阅元数据中设置 valueSchemaType
为 Avro
。
from fastapi import APIRouter , Body , Response , status
import json
import sys
app = FastAPI ()
router = APIRouter ()
@router.get ( '/dapr/subscribe' )
def subscribe ():
subscriptions = [{ 'pubsubname' : 'pubsub' ,
'topic' : 'my-topic' ,
'route' : 'my_topic_subscriber' ,
'metadata' : {
'valueSchemaType' : 'Avro' ,
} }]
return subscriptions
@router.post ( '/my_topic_subscriber' )
def my_topic_subscriber ( event_data = Body ()):
print ( event_data , flush = True )
return Response ( status_code = status . HTTP_200_OK )
app . include_router ( router )
创建一个 Kafka 实例 您可以使用 这个 Docker 镜像在本地运行 Kafka。
要在没有 Docker 的情况下运行,请参阅 此处 的入门指南。
要在 Kubernetes 上运行 Kafka,您可以使用任何 Kafka operator,例如 Strimzi 。
相关链接 2 - AWS SNS/SQS 关于AWS SNS/SQS pubsub组件的详细文档
组件格式 要设置AWS SNS/SQS pub/sub,创建一个类型为pubsub.aws.snssqs
的组件。
默认情况下,AWS SNS/SQS组件会:
注意 如果您只有发布者而没有订阅者,那么只会创建SNS主题。
但是,如果您有订阅者,则会生成SNS、SQS及其动态或静态订阅。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : snssqs-pubsub
spec :
type : pubsub.aws.snssqs
version : v1
metadata :
- name : accessKey
value : "AKIAIOSFODNN7EXAMPLE"
- name : secretKey
value : "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
- name : region
value : "us-east-1"
# - name: consumerID # 可选。如果未提供,运行时将创建一个。
# value: "channel1"
# - name: endpoint # 可选。
# value: "http://localhost:4566"
# - name: sessionToken # 可选(如果使用AssignedRole则必须;例如,临时accessKey和secretKey)
# value: "TOKEN"
# - name: messageVisibilityTimeout # 可选
# value: 10
# - name: messageRetryLimit # 可选
# value: 10
# - name: messageReceiveLimit # 可选
# value: 10
# - name: sqsDeadLettersQueueName # 可选
# - value: "myapp-dlq"
# - name: messageWaitTimeSeconds # 可选
# value: 1
# - name: messageMaxNumber # 可选
# value: 10
# - name: fifo # 可选
# value: "true"
# - name: fifoMessageGroupID # 可选
# value: "app1-mgi"
# - name: disableEntityManagement # 可选
# value: "false"
# - name: disableDeleteOnRetryLimit # 可选
# value: "false"
# - name: assetsManagementTimeoutSeconds # 可选
# value: 5
# - name: concurrencyMode # 可选
# value: "single"
# - name: concurrencyLimit # 可选
# value: "0"
规格元数据字段 字段 必需 详情 示例 accessKey Y 具有适当权限的AWS账户/角色的ID,用于SNS和SQS(见下文) "AKIAIOSFODNN7EXAMPLE"
secretKey Y AWS用户/角色的secret。如果使用AssumeRole
访问,还需要提供sessionToken
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
region Y SNS/SQS资产所在或将创建的AWS区域。请参阅此页面 以获取有效区域。确保SNS和SQS在该区域可用 "us-east-1"
consumerID N 消费者ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者ID的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供consumerID
,Dapr运行时将其设置为Dapr应用程序ID(appID
)值。请参阅pub/sub broker组件文件 以了解如何自动生成ConsumerID。 可以设置为字符串值(如上例中的"channel1"
)或字符串格式值(如"{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 endpoint N 组件使用的AWS端点。仅用于本地开发,例如使用localstack 。在生产AWS上运行时不需要endpoint
"http://localhost:4566"
sessionToken N 要使用的AWS会话令牌。仅在使用临时安全凭证时需要会话令牌 "TOKEN"
messageReceiveLimit N 消息接收的次数,在处理该消息失败后,一旦达到该次数,将导致从队列中删除该消息。如果指定了sqsDeadLettersQueueName
,messageReceiveLimit
是消息接收的次数,在处理该消息失败后,一旦达到该次数,将导致将消息移动到SQS死信队列。默认值:10
10
sqsDeadLettersQueueName N 此应用程序的死信队列的名称 "myapp-dlq"
messageVisibilityTimeout N 消息在发送给订阅者后从接收请求中隐藏的时间(以秒为单位)。默认值:10
10
messageRetryLimit N 在处理消息失败后重新发送消息的次数,然后从队列中删除该消息。默认值:10
10
messageWaitTimeSeconds N 调用等待消息到达队列的持续时间(以秒为单位),然后返回。如果有消息可用,调用会比messageWaitTimeSeconds
更早返回。如果没有消息可用且等待时间到期,调用会成功返回一个空消息列表。默认值:1
1
messageMaxNumber N 一次从队列中接收的最大消息数。默认值:10
,最大值:10
10
fifo N 使用SQS FIFO队列提供消息排序和去重。默认值:"false"
。有关SQS FIFO 的更多详细信息 "true"
,"false"
fifoMessageGroupID N 如果启用了fifo
,指示Dapr为pubsub部署使用自定义消息组ID 。这不是强制性的,因为Dapr为每个生产者创建一个自定义消息组ID,从而确保每个Dapr生产者的消息排序。默认值:""
"app1-mgi"
disableEntityManagement N 当设置为true时,SNS主题、SQS队列和SQS到SNS的订阅不会自动创建。默认值:"false"
"true"
,"false"
disableDeleteOnRetryLimit N 当设置为true时,在重试并失败messageRetryLimit
次处理消息后,重置消息可见性超时,以便其他消费者可以尝试处理,而不是从SQS中删除消息(默认行为)。默认值:"false"
"true"
,"false"
assetsManagementTimeoutSeconds N AWS资产管理操作的超时时间(以秒为单位),在超时并取消之前。资产管理操作是对STS、SNS和SQS执行的任何操作,除了实现默认Dapr组件重试行为的消息发布和消费操作。该值可以设置为任何非负浮点数/整数。默认值:5
0.5
,10
concurrencyMode N 当从SQS批量接收消息时,按顺序调用订阅者(一次“单个”消息),或并发调用(“并行”)。默认值:"parallel"
"single"
,"parallel"
concurrencyLimit N 定义处理消息的最大并发工作者数量。当concurrencyMode设置为"single"
时,此值被忽略。要避免限制并发工作者的数量,请将其设置为0
。默认值:0
100
其他信息 符合AWS规范 Dapr创建的SNS主题和SQS队列名称符合AWS规范 。默认情况下,Dapr根据消费者app-id
创建SQS队列名称,因此Dapr可能会执行名称标准化以符合AWS规范。
SNS/SQS组件行为 当pub/sub SNS/SQS组件配置SNS主题时,SQS队列和订阅在组件代表消息生产者(没有订阅者应用程序部署)操作的情况下,与存在订阅者应用程序(没有发布者部署)的情况下表现不同。
由于SNS在没有SQS订阅的情况下的工作方式_仅发布者设置_,SQS队列和订阅表现为依赖于订阅者监听主题消息的“经典”pub/sub系统。没有这些订阅者,消息:
无法传递并有效地丢弃 不可用于未来的订阅者(当订阅者最终订阅时没有消息重播) SQS FIFO 根据AWS规范,使用SQS FIFO(fifo
元数据字段设置为"true"
)提供消息排序和去重,但会导致较低的SQS处理吞吐量,以及其他注意事项。
指定fifoMessageGroupID
限制FIFO队列的并发消费者数量为1,但保证应用程序的Dapr sidecar发布的消息的全局排序。请参阅这篇AWS博客文章 以更好地理解消息组ID和FIFO队列的主题。
为了避免丢失传递给消费者的消息顺序,SQS组件的FIFO配置要求将concurrencyMode
元数据字段设置为"single"
。
默认并行concurrencyMode
自v1.8.0以来,组件支持"parallel"
concurrencyMode
作为其默认模式。在之前的版本中,组件的默认行为是一次调用订阅者一个消息并等待其响应。
SQS死信队列 在使用SQS死信队列配置PubSub组件时,元数据字段messageReceiveLimit
和sqsDeadLettersQueueName
必须都设置为一个值。对于messageReceiveLimit
,值必须大于0
,而sqsDeadLettersQueueName
不能是空字符串。
重要 当在EKS(AWS Kubernetes)节点/Pod上运行Dapr sidecar(daprd
)时,已经附加了定义访问AWS资源的IAM策略,您不应 在组件规格的定义中提供AWS访问密钥、秘密密钥和令牌。SNS/SQS与Dapr的争用 从根本上说,SNS通过为这些主题创建SQS订阅,将来自多个发布者主题的消息聚合到一个SQS队列中。作为订阅者,SNS/SQS pub/sub组件从该唯一的SQS队列中消费消息。
然而,像任何SQS消费者一样,组件无法选择性地检索其特定订阅的SNS主题发布的消息。这可能导致组件接收到没有关联处理程序的主题发布的消息。通常,这发生在:
组件初始化: 如果基础设施订阅在组件订阅处理程序之前准备好,或关闭: 如果组件处理程序在基础设施订阅之前被移除。由于此问题影响任何多个SNS主题的SQS消费者,组件无法防止从缺少处理程序的主题中消费消息。当这种情况发生时,组件会记录一个错误,指示这些消息被错误地检索。
在这些情况下,未处理的消息将在每次拉取后以其接收计数 递减的状态重新出现在SQS中。因此,存在未处理的消息可能超过其messageReceiveLimit
并丢失的风险。
重要 在使用SNS/SQS与Dapr时,请考虑潜在的争用场景,并适当地配置messageReceiveLimit
。强烈建议通过设置sqsDeadLettersQueueName
来使用SQS死信队列,以防止丢失消息。创建SNS/SQS实例
Self-Hosted
Kubernetes
AWS 对于本地开发,localstack项目 用于集成AWS SNS/SQS。按照这些说明 运行localstack。
要从命令行使用Docker本地运行localstack,请应用以下命令:
docker run --rm -it -p 4566:4566 -p 4571:4571 -e SERVICES = "sts,sns,sqs" -e AWS_DEFAULT_REGION = "us-east-1" localstack/localstack
为了在您的pub/sub绑定中使用localstack,您需要在组件元数据中提供endpoint
配置。在生产AWS上运行时不需要endpoint
。
请参阅认证到AWS 以获取有关认证相关属性的信息。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : snssqs-pubsub
spec :
type : pubsub.aws.snssqs
version : v1
metadata :
- name : accessKey
value : "anyString"
- name : secretKey
value : "anyString"
- name : endpoint
value : http://localhost:4566
# 使用us-east-1或提供给localstack的任何其他区域,由"AWS_DEFAULT_REGION"环境变量定义
- name : region
value : us-east-1
要在Kubernetes上运行localstack,您可以应用以下配置。然后可以通过DNS名称http://localstack.default.svc.cluster.local:4566
(假设这是应用于默认命名空间)访问localstack,应将其用作endpoint
。
apiVersion : apps/v1
kind : Deployment
metadata :
name : localstack
spec :
# 使用选择器,我们将公开正在运行的部署
# 这就是Kubernetes知道给定服务属于部署的方式
selector :
matchLabels :
app : localstack
replicas : 1
template :
metadata :
labels :
app : localstack
spec :
containers :
- name : localstack
image : localstack/localstack:latest
ports :
# 暴露边缘端点
- containerPort : 4566
---
kind : Service
apiVersion : v1
metadata :
name : localstack
labels :
app : localstack
spec :
selector :
app : localstack
ports :
- protocol : TCP
port : 4566
targetPort : 4566
type : LoadBalancer
为了在AWS中运行,创建或分配一个具有SNS和SQS服务权限的IAM用户,策略如下:
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Sid" : "YOUR_POLICY_NAME" ,
"Effect" : "Allow" ,
"Action" : [
"sns:CreateTopic" ,
"sns:GetTopicAttributes" ,
"sns:ListSubscriptionsByTopic" ,
"sns:Publish" ,
"sns:Subscribe" ,
"sns:TagResource" ,
"sqs:ChangeMessageVisibility" ,
"sqs:CreateQueue" ,
"sqs:DeleteMessage" ,
"sqs:GetQueueAttributes" ,
"sqs:GetQueueUrl" ,
"sqs:ReceiveMessage" ,
"sqs:SetQueueAttributes" ,
"sqs:TagQueue"
],
"Resource" : [
"arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:*" ,
"arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:*"
]
}
]
}
将AWS账户ID
和AWS账户secret
插入组件元数据中的accessKey
和secretKey
,使用Kubernetes secret和secretKeyRef
。
或者,假设您希望使用自己的工具(例如Terraform)来配置SNS和SQS资产,同时防止Dapr动态执行此操作。您需要启用disableEntityManagement
并为使用Dapr的应用程序分配一个IAM角色,策略如下:
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Sid" : "YOUR_POLICY_NAME" ,
"Effect" : "Allow" ,
"Action" : [
"sqs:DeleteMessage" ,
"sqs:ReceiveMessage" ,
"sqs:ChangeMessageVisibility" ,
"sqs:GetQueueUrl" ,
"sqs:GetQueueAttributes" ,
"sns:Publish" ,
"sns:ListSubscriptionsByTopic" ,
"sns:GetTopicAttributes"
],
"Resource" : [
"arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:APP_TOPIC_NAME" ,
"arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:APP_ID"
]
}
]
}
在上述示例中,您在EKS集群上运行应用程序,并进行动态资产创建(默认Dapr行为)。
相关链接 3 - Azure Event Hubs Azure Event Hubs pubsub 组件的详细文档
组件格式 要配置 Azure Event Hubs 的发布/订阅功能,请创建一个类型为 pubsub.azure.eventhubs
的组件。有关 ConsumerID 自动生成的详细信息,请参阅 pub/sub broker 组件文件 。要了解如何创建和应用 pub/sub 配置,请阅读 发布和订阅指南 。
除了下文列出的配置元数据字段,Azure Event Hubs 还支持 Azure 身份验证 机制。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : eventhubs-pubsub
spec :
type : pubsub.azure.eventhubs
version : v1
metadata :
# connectionString 和 eventHubNamespace 必须二选一
# 不使用 Microsoft Entra ID 时使用 connectionString
- name : connectionString
value : "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}"
# 使用 Microsoft Entra ID 时使用 eventHubNamespace
- name : eventHubNamespace
value : "namespace"
- name : consumerID # 可选。如果未提供,运行时将使用 Dapr 应用程序 ID (`appID`)。
value : "channel1"
- name : enableEntityManagement
value : "false"
- name : enableInOrderMessageDelivery
value : "false"
# 仅当 enableEntityManagement 设置为 true 时才需要以下四个属性
- name : resourceGroupName
value : "test-rg"
- name : subscriptionID
value : "Azure 订阅 ID 的值"
- name : partitionCount
value : "1"
- name : messageRetentionInDays
value : "3"
# 检查点存储属性
- name : storageAccountName
value : "myeventhubstorage"
- name : storageAccountKey
value : "112233445566778899"
- name : storageContainerName
value : "myeventhubstoragecontainer"
# 传递 storageAccountKey 的替代方法
- name : storageConnectionString
value : "DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<account-key>"
注意 上面的示例使用明文字符串作为密钥。建议使用密钥存储来保护密钥,具体方法请参阅
这里 。
规格元数据字段 字段 必需 详情 示例 connectionString
是* Event Hub 或 Event Hub 命名空间的连接字符串。 * 与 eventHubNamespace
字段互斥。 * 不使用 Microsoft Entra ID 身份验证 时必需 "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}"
或 "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key}"
eventHubNamespace
是* Event Hub 命名空间名称。 * 与 connectionString
字段互斥。 * 使用 Microsoft Entra ID 身份验证 时必需 "namespace"
consumerID
否 消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看可以在组件元数据中使用的所有模板标签。 enableEntityManagement
否 布尔值,允许管理 EventHub 命名空间和存储帐户。默认值:false
"true", "false"
enableInOrderMessageDelivery
否 布尔值,允许消息按发布顺序传递。这假设在发布或发布时设置了 partitionKey
以确保跨分区的顺序。默认值:false
"true"
, "false"
storageAccountName
是 用于检查点存储的存储帐户名称。 "myeventhubstorage"
storageAccountKey
是* 检查点存储帐户的存储帐户密钥。 * 使用 Microsoft Entra ID 时,如果服务主体也有权访问存储帐户,则可以省略此项。 "112233445566778899"
storageConnectionString
是* 检查点存储的连接字符串,指定 storageAccountKey
的替代方法 "DefaultEndpointsProtocol=https;AccountName=myeventhubstorage;AccountKey=<account-key>"
storageContainerName
是 存储帐户名称的存储容器名称。 "myeventhubstoragecontainer"
resourceGroupName
否 Event Hub 命名空间所属的资源组名称。启用实体管理时必需 "test-rg"
subscriptionID
否 Azure 订阅 ID 值。启用实体管理时必需 "azure subscription id"
partitionCount
否 新 Event Hub 命名空间的分区数。仅在启用实体管理时使用。默认值:"1"
"2"
messageRetentionInDays
否 在新创建的 Event Hub 命名空间中保留消息的天数。仅在启用实体管理时使用。默认值:"1"
"90"
Microsoft Entra ID 身份验证 Azure Event Hubs pub/sub 组件支持使用所有 Microsoft Entra ID 机制进行身份验证。有关更多信息以及根据选择的 Microsoft Entra ID 身份验证机制提供的相关组件元数据字段,请参阅 Azure 身份验证文档 。
示例配置 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : eventhubs-pubsub
spec :
type : pubsub.azure.eventhubs
version : v1
metadata :
# 使用 Azure 身份验证
- name : azureTenantId
value : "***"
- name : azureClientId
value : "***"
- name : azureClientSecret
value : "***"
- name : eventHubNamespace
value : "namespace"
- name : enableEntityManagement
value : "false"
# 仅当 enableEntityManagement 设置为 true 时才需要以下四个属性
- name : resourceGroupName
value : "test-rg"
- name : subscriptionID
value : "Azure 订阅 ID 的值"
- name : partitionCount
value : "1"
- name : messageRetentionInDays
# 检查点存储属性
# 在这种情况下,我们也使用 Microsoft Entra ID 访问存储帐户
- name : storageAccountName
value : "myeventhubstorage"
- name : storageContainerName
value : "myeventhubstoragecontainer"
发送和接收多条消息 Azure Eventhubs 支持使用批量 pub/sub API 在单个操作中发送和接收多条消息。
配置批量发布 要设置批量发布操作的元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 API 参考中所述 。
元数据 默认值 metadata.maxBulkPubBytes
1000000
配置批量订阅 订阅主题时,可以配置 bulkSubscribe
选项。请参阅 批量订阅消息 了解更多详细信息,并了解 批量订阅 API 。
配置 默认值 maxMessagesCount
100
maxAwaitDurationMs
10000
配置检查点频率 订阅主题时,可以通过 在 HTTP 或 gRPC 订阅请求中设置元数据 来配置分区中的检查点频率。此元数据允许在分区事件序列中配置的事件数量后进行检查点。通过将频率设置为 0
来禁用检查点。
了解更多关于检查点的信息 。
元数据 默认值 metadata.checkPointFrequencyPerPartition
1
以下示例显示了一个使用 checkPointFrequencyPerPartition
元数据的 声明性订阅 示例订阅文件。同样,您也可以在 编程订阅 中传递元数据。
apiVersion : dapr.io/v2alpha1
kind : Subscription
metadata :
name : order-pub-sub
spec :
topic : orders
routes :
default : /checkout
pubsubname : order-pub-sub
metadata :
checkPointFrequencyPerPartition : 1
scopes :
- orderprocessing
- checkout
注意 使用 BulkSubscribe
订阅主题时,您可以配置检查点在指定的 批次 数量后进行,而不是事件,其中 批次 是指在单个请求中接收到的事件集合。创建 Azure Event Hub 按照 文档 中的说明设置 Azure Event Hubs。
由于此组件使用 Azure 存储作为检查点存储,您还需要一个 Azure 存储帐户 。按照 文档 中的说明管理存储帐户访问密钥。
请参阅 文档 了解如何获取 Event Hubs 连接字符串(注意这不是 Event Hubs 命名空间的连接字符串)。
为每个订阅者创建消费者组 对于每个想要订阅事件的 Dapr 应用程序,创建一个以 Dapr 应用程序 ID 命名的 Event Hubs 消费者组。例如,在 Kubernetes 上运行的 Dapr 应用程序,其 dapr.io/app-id: "myapp"
将需要一个名为 myapp
的 Event Hubs 消费者组。
注意:Dapr 将消费者组的名称传递给 Event Hub,因此这不在元数据中提供。
实体管理 当在元数据中启用实体管理时,只要应用程序具有操作 Event Hub 命名空间的正确角色和权限,Dapr 就可以自动为您创建 Event Hub 和消费者组。
Event Hub 名称是发布或订阅请求中的 topic
字段,而消费者组名称是订阅给定 Event Hub 的 Dapr 应用程序的名称。例如,在 Kubernetes 上运行的 Dapr 应用程序,其名称为 dapr.io/app-id: "myapp"
需要一个名为 myapp
的 Event Hubs 消费者组。
实体管理仅在使用 Microsoft Entra ID 身份验证 且不使用连接字符串时才可能。
Dapr 将消费者组的名称传递给 Event Hub,因此这不在元数据中提供。
订阅 Azure IoT Hub 事件 Azure IoT Hub 提供了一个 与 Event Hubs 兼容的端点 ,因此 Azure Event Hubs pubsub 组件也可以用于订阅 Azure IoT Hub 事件。
由 Azure IoT Hub 设备创建的设备到云事件将包含额外的 IoT Hub 系统属性 ,Dapr 的 Azure Event Hubs pubsub 组件将在响应元数据中返回以下内容:
系统属性名称 描述和路由查询关键字 iothub-connection-auth-generation-id
发送消息的设备的 connectionDeviceGenerationId 。请参阅 IoT Hub 设备身份属性 。 iothub-connection-auth-method
发送消息的设备使用的 connectionAuthMethod 。 iothub-connection-device-id
发送消息的设备的 deviceId 。请参阅 IoT Hub 设备身份属性 。 iothub-connection-module-id
发送消息的设备的 moduleId 。请参阅 IoT Hub 设备身份属性 。 iothub-enqueuedtime
设备到云消息被 IoT Hub 接收的 enqueuedTime ,格式为 RFC3339。 message-id
用户可设置的 AMQP messageId 。
例如,传递的 HTTP 订阅消息的头将包含:
{
'user-agent' : 'fasthttp' ,
'host' : '127.0.0.1:3000' ,
'content-type' : 'application/json' ,
'content-length' : '120' ,
'iothub-connection-device-id' : 'my-test-device' ,
'iothub-connection-auth-generation-id' : '637618061680407492' ,
'iothub-connection-auth-method' : '{"scope":"module","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}' ,
'iothub-connection-module-id' : 'my-test-module-a' ,
'iothub-enqueuedtime' : '2021-07-13T22:08:09Z' ,
'message-id' : 'my-custom-message-id' ,
'x-opt-sequence-number' : '35' ,
'x-opt-enqueued-time' : '2021-07-13T22:08:09Z' ,
'x-opt-offset' : '21560' ,
'traceparent' : '00-4655608164bc48b985b42d39865f3834-ed6cf3697c86e7bd-01'
}
相关链接 4 - Azure Service Bus Topics Azure Service Bus Topics pubsub 组件的详细文档
组件格式 要配置 Azure Service Bus Topics pub/sub,需创建一个类型为 pubsub.azure.servicebus.topics
的组件。请参考 pub/sub broker 组件文件 了解 ConsumerID 的自动生成方式。阅读 发布和订阅指南 以获取创建和应用 pub/sub 配置的步骤。
此组件使用 Azure Service Bus 的主题功能;请查看官方文档了解 主题和队列 的区别。 如需使用队列,请参阅 Azure Service Bus Queues pubsub 组件 。
连接字符串认证 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : servicebus-pubsub
spec :
type : pubsub.azure.servicebus.topics
version : v1
metadata :
# 不使用 Microsoft Entra ID 认证时必需
- name : connectionString
value : "Endpoint=sb://{ServiceBusNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={ServiceBus}"
# - name: consumerID # 可选:默认为应用程序自身的 ID
# value: channel1
# - name: timeoutInSec # 可选
# value: 60
# - name: handlerTimeoutInSec # 可选
# value: 60
# - name: disableEntityManagement # 可选
# value: "false"
# - name: maxDeliveryCount # 可选
# value: 3
# - name: lockDurationInSec # 可选
# value: 60
# - name: lockRenewalInSec # 可选
# value: 20
# - name: maxActiveMessages # 可选
# value: 10000
# - name: maxConcurrentHandlers # 可选
# value: 10
# - name: defaultMessageTimeToLiveInSec # 可选
# value: 10
# - name: autoDeleteOnIdleInSec # 可选
# value: 3600
# - name: minConnectionRecoveryInSec # 可选
# value: 2
# - name: maxConnectionRecoveryInSec # 可选
# value: 300
# - name: maxRetriableErrorsPerSec # 可选
# value: 10
# - name: publishMaxRetries # 可选
# value: 5
# - name: publishInitialRetryIntervalInMs # 可选
# value: 500
注意: 上述设置适用于使用此组件的所有主题。
警告 上述示例中使用了明文字符串作为 secret。建议使用 secret 存储来保护 secret,具体方法请参阅
此处 。
规格元数据字段 字段 必需 详情 示例 connectionString
是 Service Bus 的共享访问策略连接字符串。除非使用 Microsoft Entra ID 认证,否则必需。 见上例 namespaceName
否 设置 Service Bus 命名空间地址的参数,作为完全限定的域名。使用 Microsoft Entra ID 认证时必需。 "namespace.servicebus.windows.net"
consumerID
否 消费者 ID 用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 timeoutInSec
否 发送消息和管理操作的超时时间。默认:60
30
handlerTimeoutInSec
否 调用应用程序处理程序的超时时间。默认:60
30
lockRenewalInSec
否 定义缓冲消息锁将被续订的频率。默认:20
。 20
maxActiveMessages
否 定义一次处理或缓冲的最大消息数。此值应至少与最大并发处理程序一样大。默认:1000
2000
maxConcurrentHandlers
否 定义最大并发消息处理程序数。默认:0
(无限制) 10
disableEntityManagement
否 设置为 true 时,队列和订阅不会自动创建。默认:"false"
"true"
,"false"
defaultMessageTimeToLiveInSec
否 默认消息生存时间,以秒为单位。仅在订阅创建期间使用。 10
autoDeleteOnIdleInSec
否 在自动删除空闲订阅之前等待的时间,以秒为单位。仅在订阅创建期间使用。必须为 300 秒或更长。默认:0
(禁用) 3600
maxDeliveryCount
否 定义服务器尝试传递消息的次数。仅在订阅创建期间使用。由服务器设置默认值。 10
lockDurationInSec
否 定义消息在过期前被锁定的时间长度,以秒为单位。仅在订阅创建期间使用。由服务器设置默认值。 30
minConnectionRecoveryInSec
否 在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最小间隔(以秒为单位)。默认:2
5
maxConnectionRecoveryInSec
否 在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最大间隔(以秒为单位)。每次尝试后,组件在最小和最大之间等待一个随机秒数,每次增加。默认:300
(5 分钟) 600
maxRetriableErrorsPerSec
否 每秒处理的最大可重试错误数。如果消息因可重试错误而无法处理,组件会在开始处理另一条消息之前添加延迟,以避免立即重新处理失败的消息。默认:10
10
publishMaxRetries
否 当 Azure Service Bus 响应“过于繁忙”以限制消息时的最大重试次数。默认:5
5
publishInitialRetryIntervalInMs
否 当 Azure Service Bus 限制消息时,初始指数退避的时间(以毫秒为单位)。默认:500
500
Microsoft Entra ID 认证 Azure Service Bus Topics pubsub 组件支持使用所有 Microsoft Entra ID 机制进行认证,包括托管身份。有关更多信息以及根据选择的 Microsoft Entra ID 认证机制提供的相关组件元数据字段,请参阅 Azure 认证文档 。
示例配置 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : servicebus-pubsub
spec :
type : pubsub.azure.servicebus.topics
version : v1
metadata :
- name : namespaceName
# 使用 Azure 认证时必需。
# 必须是完全限定的域名
value : "servicebusnamespace.servicebus.windows.net"
- name : azureTenantId
value : "***"
- name : azureClientId
value : "***"
- name : azureClientSecret
value : "***"
消息元数据 Azure Service Bus 消息通过附加上下文元数据扩展了 Dapr 消息格式。一些元数据字段由 Azure Service Bus 自行设置(只读),其他字段可以在发布消息时由客户端设置。
发送带有元数据的消息 要在发送消息时设置 Azure Service Bus 元数据,请在 HTTP 请求上设置查询参数或 gRPC 元数据,如此处 所述。
metadata.MessageId
metadata.CorrelationId
metadata.SessionId
metadata.Label
metadata.ReplyTo
metadata.PartitionKey
metadata.To
metadata.ContentType
metadata.ScheduledEnqueueTimeUtc
metadata.ReplyToSessionId
注意: metadata.MessageId
属性不会设置 Dapr 返回的云事件的 id
属性,应单独处理。
注意: 如果未设置 metadata.SessionId
属性,但主题需要会话,则将使用空会话 ID。
注意: metadata.ScheduledEnqueueTimeUtc
属性支持 RFC1123 和 RFC3339 时间戳格式。
接收带有元数据的消息 当 Dapr 调用您的应用程序时,它将使用 HTTP 头或 gRPC 元数据将 Azure Service Bus 消息元数据附加到请求中。
除了上述可设置的元数据 外,您还可以访问以下只读消息元数据。
metadata.DeliveryCount
metadata.LockedUntilUtc
metadata.LockToken
metadata.EnqueuedTimeUtc
metadata.SequenceNumber
要了解这些元数据属性的详细用途,请参阅官方 Azure Service Bus 文档 。
此外,原始 Azure Service Bus 消息的所有 ApplicationProperties
条目都作为 metadata.<application property's name>
附加。
注意:所有时间均由服务器填充,并未调整时钟偏差。
订阅启用会话的主题 要订阅启用会话 的主题,您可以在订阅元数据中提供以下属性。
requireSessions (默认: false)
sessionIdleTimeoutInSec (默认: 60)
maxConcurrentSessions (默认: 8)
为主题创建 Azure Service Bus broker 请按照此处 的说明设置 Azure Service Bus Topics。
相关链接 5 - Azure Service Bus 队列 关于 Azure Service Bus 队列发布/订阅组件的详细文档
组件格式 要配置 Azure Service Bus 队列的发布/订阅功能,创建一个类型为 pubsub.azure.servicebus.queues
的组件。请参考 发布/订阅代理组件文件 了解 ConsumerID 是如何自动生成的。请阅读 发布和订阅指南 了解如何创建和应用发布/订阅配置。
该组件在 Azure Service Bus 上使用队列;请查看官方文档了解 主题和队列 之间的区别。
若要使用主题,请参阅 Azure Service Bus 主题发布/订阅组件 。
连接字符串认证 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : servicebus-pubsub
spec :
type : pubsub.azure.servicebus.queues
version : v1
metadata :
# 不使用 Microsoft Entra ID 认证时必需
- name : connectionString
value : "Endpoint=sb://{ServiceBusNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={ServiceBus}"
# - name: consumerID # 可选
# value: channel1
# - name: timeoutInSec # 可选
# value: 60
# - name: handlerTimeoutInSec # 可选
# value: 60
# - name: disableEntityManagement # 可选
# value: "false"
# - name: maxDeliveryCount # 可选
# value: 3
# - name: lockDurationInSec # 可选
# value: 60
# - name: lockRenewalInSec # 可选
# value: 20
# - name: maxActiveMessages # 可选
# value: 10000
# - name: maxConcurrentHandlers # 可选
# value: 10
# - name: defaultMessageTimeToLiveInSec # 可选
# value: 10
# - name: autoDeleteOnIdleInSec # 可选
# value: 3600
# - name: minConnectionRecoveryInSec # 可选
# value: 2
# - name: maxConnectionRecoveryInSec # 可选
# value: 300
# - name: maxRetriableErrorsPerSec # 可选
# value: 10
# - name: publishMaxRetries # 可选
# value: 5
# - name: publishInitialRetryIntervalInMs # 可选
# value: 500
警告 上述示例使用明文字符串作为密钥。建议使用密钥存储来存储密钥,如
此处 所述。
规格元数据字段 字段 必需 详情 示例 connectionString
Y Service Bus 的共享访问策略连接字符串。除非使用 Microsoft Entra ID 认证,否则必需。 见上例 consumerID
N 消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 namespaceName
N 设置 Service Bus 命名空间地址的参数,作为完全限定的域名。使用 Microsoft Entra ID 认证时必需。 "namespace.servicebus.windows.net"
timeoutInSec
N 发送消息和管理操作的超时时间。默认:60
30
handlerTimeoutInSec
N 调用应用程序处理程序的超时时间。默认:60
30
lockRenewalInSec
N 定义缓冲消息锁将被续订的频率。默认:20
。 20
maxActiveMessages
N 定义一次处理或缓冲的最大消息数。此值应至少与最大并发处理程序一样大。默认:1000
2000
maxConcurrentHandlers
N 定义最大并发消息处理程序数。默认:0
(无限制) 10
disableEntityManagement
N 设置为 true 时,队列和订阅不会自动创建。默认:"false"
"true"
,"false"
defaultMessageTimeToLiveInSec
N 默认消息生存时间,以秒为单位。仅在订阅创建期间使用。 10
autoDeleteOnIdleInSec
N 在自动删除空闲订阅之前等待的时间,以秒为单位。仅在订阅创建期间使用。必须为 300 秒或更长。默认:0
(禁用) 3600
maxDeliveryCount
N 定义服务器尝试传递消息的次数。仅在订阅创建期间使用。服务器默认设置。 10
lockDurationInSec
N 定义消息在过期前被锁定的时间长度,以秒为单位。仅在订阅创建期间使用。服务器默认设置。 30
minConnectionRecoveryInSec
N 在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最小间隔(以秒为单位)。默认:2
5
maxConnectionRecoveryInSec
N 在连接失败的情况下,尝试重新连接到 Azure Service Bus 之前等待的最大间隔(以秒为单位)。每次尝试后,组件在最小和最大之间等待一个随机秒数,每次增加。默认:300
(5 分钟) 600
maxRetriableErrorsPerSec
N 每秒处理的最大可重试错误数。如果消息处理失败并出现可重试错误,组件会在开始处理另一条消息之前添加延迟,以避免立即重新处理失败的消息。默认:10
10
publishMaxRetries
N 当 Azure Service Bus 响应“过于繁忙”以限制消息时的最大重试次数。默认:5
5
publishInitialRetryIntervalInMs
N 当 Azure Service Bus 限制消息时,初始指数回退的时间(以毫秒为单位)。默认:500
500
Microsoft Entra ID 认证 Azure Service Bus 队列发布/订阅组件支持使用所有 Microsoft Entra ID 机制进行认证,包括托管身份。有关更多信息以及根据选择的 Microsoft Entra ID 认证机制提供的相关组件元数据字段,请参阅 Azure 认证文档 。
示例配置 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : servicebus-pubsub
spec :
type : pubsub.azure.servicebus.queues
version : v1
metadata :
- name : namespaceName
# 使用 Azure 认证时必需。
# 必须是完全限定的域名
value : "servicebusnamespace.servicebus.windows.net"
- name : azureTenantId
value : "***"
- name : azureClientId
value : "***"
- name : azureClientSecret
value : "***"
消息元数据 Azure Service Bus 消息在 Dapr 消息格式的基础上增加了上下文元数据。一些元数据字段由 Azure Service Bus 本身设置(只读),其他字段可以在发布消息时由客户端设置。
发送带有元数据的消息 要在发送消息时设置 Azure Service Bus 元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 此处 所述。
metadata.MessageId
metadata.CorrelationId
metadata.SessionId
metadata.Label
metadata.ReplyTo
metadata.PartitionKey
metadata.To
metadata.ContentType
metadata.ScheduledEnqueueTimeUtc
metadata.ReplyToSessionId
注意 metadata.MessageId
属性不会设置 Dapr 返回的云事件的 id
属性,应单独处理。metadata.ScheduledEnqueueTimeUtc
属性支持 RFC1123 和 RFC3339 时间戳格式。接收带有元数据的消息 当 Dapr 调用您的应用程序时,它使用 HTTP 头或 gRPC 元数据将 Azure Service Bus 消息元数据附加到请求中。
除了 上述可设置的元数据 外,您还可以访问以下只读消息元数据。
metadata.DeliveryCount
metadata.LockedUntilUtc
metadata.LockToken
metadata.EnqueuedTimeUtc
metadata.SequenceNumber
要了解这些元数据属性的用途的更多详细信息,请参阅 官方 Azure Service Bus 文档 。
此外,原始 Azure Service Bus 消息的所有 ApplicationProperties
条目都作为 metadata.<application property's name>
附加。
注意 所有时间均由服务器填充,并未调整时钟偏差。发送和接收多条消息 Azure Service Bus 支持使用批量发布/订阅 API 在单个操作中发送和接收多条消息。
配置批量发布 要为批量发布操作设置元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 此处 所述。
元数据 默认值 metadata.maxBulkPubBytes
131072
(128 KiB)
配置批量订阅 订阅主题时,您可以配置 bulkSubscribe
选项。有关更多详细信息,请参阅 批量订阅消息 。了解更多关于 批量订阅 API 的信息。
创建 Azure Service Bus 队列代理 按照 此处 的说明设置 Azure Service Bus 队列。
注意 您的队列名称必须与您使用 Dapr 发布的主题名称相同。例如,如果您在发布/订阅 "myPubsub"
上发布到主题 "orders"
,则您的队列必须命名为 "orders"
。
如果您使用共享访问策略连接到队列,则该策略必须能够“管理”队列。要使用死信队列,该策略必须位于包含主队列和死信队列的 Service Bus 命名空间中。重试策略和死信队列 默认情况下,Azure Service Bus 队列有一个死信队列。消息会根据 maxDeliveryCount
的值进行重试。默认的 maxDeliveryCount
值为 10,但可以设置为最多 2000。这些重试发生得非常迅速,如果没有成功返回,消息将被放入死信队列。
Dapr 发布/订阅提供了自己的死信队列概念,允许您控制重试策略并通过 Dapr 订阅死信队列。
在 Azure Service Bus 命名空间中设置一个单独的队列作为死信队列,并定义一个弹性策略来定义如何重试。 订阅主题以获取失败的消息并处理它们。 例如,在订阅中设置一个死信队列 orders-dlq
和一个弹性策略,允许您订阅主题 orders-dlq
以处理失败的消息。
有关设置死信队列的更多详细信息,请参阅 死信文章 。
相关链接 6 - GCP 关于 GCP Pub/Sub 组件的详细文档
创建 Dapr 组件 要配置 GCP pub/sub,需创建一个类型为 pubsub.gcp.pubsub
的组件。参考 pub/sub broker 组件文件 了解 ConsumerID 的自动生成方式。查看 发布和订阅指南 了解如何创建和应用 pub/sub 配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : gcp-pubsub
spec :
type : pubsub.gcp.pubsub
version : v1
metadata :
- name : type
value : service_account
- name : projectId
value : <PROJECT_ID> # 替换
- name : endpoint # 可选
value : "http://localhost:8085"
- name : consumerID # 可选 - 默认为应用程序自身的 ID
value : <CONSUMER_ID>
- name : identityProjectId
value : <IDENTITY_PROJECT_ID> # 替换
- name : privateKeyId
value : <PRIVATE_KEY_ID> # 替换
- name : clientEmail
value : <CLIENT_EMAIL> # 替换
- name : clientId
value : <CLIENT_ID> # 替换
- name : authUri
value : https://accounts.google.com/o/oauth2/auth
- name : tokenUri
value : https://oauth2.googleapis.com/token
- name : authProviderX509CertUrl
value : https://www.googleapis.com/oauth2/v1/certs
- name : clientX509CertUrl
value : https://www.googleapis.com/robot/v1/metadata/x509/<PROJECT_NAME>.iam.gserviceaccount.com # 替换 PROJECT_NAME
- name : privateKey
value : <PRIVATE_KEY> # 替换 x509 证书
- name : disableEntityManagement
value : "false"
- name : enableMessageOrdering
value : "false"
- name : orderingKey # 可选
value : <ORDERING_KEY>
- name : maxReconnectionAttempts # 可选
value : 30
- name : connectionRecoveryInSec # 可选
value : 2
- name : deadLetterTopic # 可选
value : <EXISTING_PUBSUB_TOPIC>
- name : maxDeliveryAttempts # 可选
value : 5
- name : maxOutstandingMessages # 可选
value : 1000
- name : maxOutstandingBytes # 可选
value : 1000000000
- name : maxConcurrentConnections # 可选
value : 10
警告 上述示例中,secret 以明文字符串形式使用。建议使用 secret 存储来存储 secret,具体方法请参考
这里 。
规格元数据字段 字段 必需 详情 示例 projectId Y GCP 项目 ID myproject-123
endpoint N 组件使用的 GCP 端点。仅用于本地开发(例如)与 GCP Pub/Sub Emulator 一起使用。运行 GCP 生产 API 时不需要 endpoint
。 "http://localhost:8085"
consumerID
N Consumer ID 将一个或多个消费者组织成一个组。具有相同 consumer ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。consumerID
与请求中提供的 topic
一起用于构建 Pub/Sub 订阅 ID 可以设置为字符串值(例如 "channel1"
)或字符串格式值(例如 "{podName}"
等)。查看可以在组件元数据中使用的所有模板标签。 identityProjectId N 如果 GCP pubsub 项目与身份项目不同,使用此属性指定身份项目 "myproject-123"
privateKeyId N 如果使用显式凭据,此字段应包含服务账户 JSON 文档中的 private_key_id
字段 "my-private-key"
privateKey N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 private_key
字段 -----BEGIN PRIVATE KEY-----MIIBVgIBADANBgkqhkiG9w0B
clientEmail N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 client_email
字段 "myservice@myproject-123.iam.gserviceaccount.com"
clientId N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 client_id
字段 106234234234
authUri N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 auth_uri
字段 https://accounts.google.com/o/oauth2/auth
tokenUri N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 token_uri
字段 https://oauth2.googleapis.com/token
authProviderX509CertUrl N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 auth_provider_x509_cert_url
字段 https://www.googleapis.com/oauth2/v1/certs
clientX509CertUrl N 如果使用显式凭据,此字段应包含服务账户 JSON 中的 client_x509_cert_url
字段 https://www.googleapis.com/robot/v1/metadata/x509/myserviceaccount%40myproject.iam.gserviceaccount.com
disableEntityManagement N 设置为 "true"
时,主题和订阅不会自动创建。默认值:"false"
"true"
,"false"
enableMessageOrdering N 设置为 "true"
时,订阅的消息将按顺序接收,具体取决于发布和权限配置。 "true"
,"false"
orderingKey N 请求中提供的键。当 enableMessageOrdering
设置为 true
时,用于根据该键对消息进行排序。 “my-orderingkey” maxReconnectionAttempts N 定义最大重连尝试次数。默认值:30
30
connectionRecoveryInSec N 连接恢复尝试之间的等待时间(以秒为单位)。默认值:2
2
deadLetterTopic N GCP Pub/Sub 主题的名称。此主题在使用此组件之前必须 存在。 "myapp-dlq"
maxDeliveryAttempts N 消息传递的最大尝试次数。如果指定了 deadLetterTopic
,maxDeliveryAttempts
是消息处理失败的最大尝试次数。一旦达到该次数,消息将被移至死信主题。默认值:5
5
type N 已弃用 GCP 凭据类型。仅支持 service_account
。默认为 service_account
service_account
maxOutstandingMessages N 给定 streaming-pull 连接可以拥有的最大未完成消息数。默认值:1000
50
maxOutstandingBytes N 给定 streaming-pull 连接可以拥有的最大未完成字节数。默认值:1000000000
1000000000
maxConcurrentConnections N 要维护的最大并发 streaming-pull 连接数。默认值:10
2
ackDeadline N 消息确认持续时间截止时间。默认值:20s
1m
警告 如果 enableMessageOrdering
设置为 “true”,则需要在服务账户上授予 roles/viewer 或 roles/pubsub.viewer 角色,以确保在消息中未嵌入顺序令牌的情况下保证顺序。如果未授予此角色,或调用 Subscription.Config() 失败的任何其他原因,嵌入顺序令牌的排序仍将正常工作。GCP 凭据 由于 GCP Pub/Sub 组件使用 GCP Go 客户端库,默认情况下它使用 应用程序默认凭据 进行身份验证。这在 使用客户端库对 GCP 云服务进行身份验证 指南中有进一步解释。
创建 GCP Pub/Sub 对于本地开发,使用 GCP Pub/Sub Emulator 来测试 GCP Pub/Sub 组件。按照 这些说明 运行 GCP Pub/Sub Emulator。
要在本地使用 Docker 运行 GCP Pub/Sub Emulator,请使用以下 docker-compose.yaml
:
version : '3'
services :
pubsub :
image : gcr.io/google.com/cloudsdktool/cloud-sdk:422.0.0-emulators
ports :
- "8085:8085"
container_name : gcp-pubsub
entrypoint : gcloud beta emulators pubsub start --project local-test-prj --host-port 0.0.0.0:8085
为了使用 GCP Pub/Sub Emulator 与您的 pub/sub 绑定,您需要在组件元数据中提供 endpoint
配置。运行 GCP 生产 API 时不需要 endpoint
。
projectId 属性必须与 docker-compose.yaml
或 Docker 命令中使用的 --project
匹配。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : gcp-pubsub
spec :
type : pubsub.gcp.pubsub
version : v1
metadata :
- name : projectId
value : "local-test-prj"
- name : consumerID
value : "testConsumer"
- name : endpoint
value : "localhost:8085"
您可以使用“显式”或“隐式”凭据来配置对 GCP pubsub 实例的访问。如果使用显式,大多数字段是必需的。隐式依赖于 dapr 在映射到具有访问 pubsub 所需权限的 Google 服务账户 (GSA) 的 Kubernetes 服务账户 (KSA) 下运行。在隐式模式下,只需要 projectId
属性,其他所有都是可选的。
按照 此处 的说明设置 Google Cloud Pub/Sub 系统。
相关链接 7 - JetStream NATS JetStream 组件的详细说明文档
组件格式 要配置 JetStream 的发布/订阅功能,需要创建一个类型为 pubsub.jetstream
的组件。请参考 pubsub broker 组件文件 以了解 ConsumerID 的自动生成方式。阅读 发布和订阅指南 以获取创建和应用 pubsub 配置的步骤。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : jetstream-pubsub
spec :
type : pubsub.jetstream
version : v1
metadata :
- name : natsURL
value : "nats://localhost:4222"
- name : jwt # 可选。用于分布式 JWT 认证。
value : "eyJhbGciOiJ...6yJV_adQssw5c"
- name : seedKey # 可选。用于分布式 JWT 认证。
value : "SUACS34K232O...5Z3POU7BNIL4Y"
- name : tls_client_cert # 可选。用于 TLS 客户端认证。
value : "/path/to/tls.crt"
- name : tls_client_key # 可选。用于 TLS 客户端认证。
value : "/path/to/tls.key"
- name : token # 可选。用于基于令牌的认证。
value : "my-token"
- name : name
value : "my-conn-name"
- name : streamName
value : "my-stream"
- name : durableName
value : "my-durable-subscription"
- name : queueGroupName
value : "my-queue-group"
- name : startSequence
value : 1
- name : startTime # Unix 时间戳格式
value : 1630349391
- name : flowControl
value : false
- name : ackWait
value : 10s
- name : maxDeliver
value : 5
- name : backOff
value : "50ms, 1s, 10s"
- name : maxAckPending
value : 5000
- name : replicas
value : 1
- name : memoryStorage
value : false
- name : rateLimit
value : 1024
- name : heartbeat
value : 15s
- name : ackPolicy
value : explicit
- name : deliverPolicy
value : all
- name : domain
value : hub
- name : apiPrefix
value : PREFIX
规格元数据字段 字段 必需 详情 示例 natsURL 是 NATS 服务器地址 URL "nats://localhost:4222"
jwt 否 NATS 分布式认证 JWT "eyJhbGciOiJ...6yJV_adQssw5c"
seedKey 否 NATS 分布式认证种子密钥 "SUACS34K232O...5Z3POU7BNIL4Y"
tls_client_cert 否 NATS TLS 客户端认证证书 "/path/to/tls.crt"
tls_client_key 否 NATS TLS 客户端认证密钥 "/path/to/tls.key"
token 否 NATS 基于令牌的认证 "my-token"
name 否 NATS 连接名称 "my-conn-name"
streamName 否 要绑定的 JetStream 流名称 "my-stream"
durableName 否 持久名称 "my-durable"
queueGroupName 否 队列组名称 "my-queue"
startSequence 否 开始序列 1
startTime 否 开始时间 ,Unix 时间戳格式1630349391
flowControl 否 流量控制 true
ackWait 否 确认等待 10s
maxDeliver 否 最大投递次数 15
backOff 否 退避 "50ms, 1s, 5s, 10s"
maxAckPending 否 最大确认待处理 5000
replicas 否 副本 3
memoryStorage 否 内存存储 false
rateLimit 否 速率限制 1024
heartbeat 否 心跳 10s
ackPolicy 否 确认策略 explicit
deliverPolicy 否 其中之一:all, last, new, sequence, time all
domain 否 JetStream Leafonodes HUB
apiPrefix 否 [JetStream Leafnodes] PREFIX
创建 NATS 服务器 您可以使用 Docker 在本地运行启用 JetStream 的 NATS 服务器:
docker run -d -p 4222:4222 nats:latest -js
然后,您可以通过客户端端口与服务器交互:localhost:4222
。
使用 helm 在 Kubernetes 上安装 NATS JetStream:
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install --set nats.jetstream.enabled= true my-nats nats/nats
这将在 default
命名空间中安装一个 NATS 服务器。要与 NATS 交互,请找到服务:
有关 helm chart 设置的更多信息,请参阅 Helm chart 文档 。
创建 JetStream 为特定主题创建 NATS JetStream 是至关重要的。例如,对于在本地运行的 NATS 服务器,使用:
nats -s localhost:4222 stream add myStream --subjects mySubject
示例:竞争消费者模式 假设您希望每条消息仅由具有相同 app-id 的一个应用程序或 pod 处理。通常,consumerID
元数据规范可以帮助您定义竞争消费者。
由于 NATS JetStream 不支持 consumerID
,您需要指定 durableName
和 queueGroupName
来实现竞争消费者模式。例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : pubsub
spec :
type : pubsub.jetstream
version : v1
metadata :
- name : name
value : "my-conn-name"
- name : streamName
value : "my-stream"
- name : durableName
value : "my-durable-subscription"
- name : queueGroupName
value : "my-queue-group"
相关链接 8 - KubeMQ KubeMQ pubsub 组件的详细说明
组件格式 要配置 KubeMQ pub/sub,需创建一个类型为 pubsub.kubemq
的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何发布和订阅指南 以了解如何创建和应用 pub/sub 配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : kubemq-pubsub
spec :
type : pubsub.kubemq
version : v1
metadata :
- name : address
value : localhost:50000
- name : store
value : false
- name : consumerID
value : channel1
规格元数据字段 字段 必需 详情 示例 address Y KubeMQ 服务器的地址 "localhost:50000"
store N pubsub 类型,true: pubsub 持久化 (EventsStore),false: pubsub 内存中 (Events) true
或 false
(默认是 false
)consumerID N 消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看可以在组件元数据中使用的所有模板标签。 clientID N 客户端 ID 连接的名称 sub-client-12345
authToken N 连接的 Auth JWT 令牌 查看 KubeMQ 认证 ew...
group N 用于负载均衡的订阅者组 g1
disableReDelivery N 设置是否在应用程序出错时重新传递消息 true
或 false
(默认是 false
)
创建 KubeMQ broker 获取 KubeMQ 密钥 。等待电子邮件确认您的密钥 您可以使用 Docker 运行 KubeMQ broker:
docker run -d -p 8080:8080 -p 50000:50000 -p 9090:9090 -e KUBEMQ_TOKEN = <your-key> kubemq/kubemq
然后您可以使用客户端端口与服务器交互:localhost:50000
获取 KubeMQ 密钥 。等待电子邮件确认您的密钥 然后运行以下 kubectl 命令:
kubectl apply -f https://deploy.kubemq.io/init
kubectl apply -f https://deploy.kubemq.io/key/<your-key>
安装 KubeMQ CLI 前往 KubeMQ CLI 并下载最新版本的 CLI。
浏览 KubeMQ 仪表板 安装 KubeMQCTL 后,运行以下命令:
或者,安装 kubectl 后,运行端口转发命令:
kubectl port-forward svc/kubemq-cluster-api -n kubemq 8080:8080
KubeMQ 文档 访问 KubeMQ 文档 了解更多信息。
相关链接 9 - MQTT MQTT pubsub组件的详细文档
组件格式 要配置MQTT pub/sub,您需要创建一个类型为pubsub.mqtt
的组件。请参阅pub/sub broker组件文件 以了解ConsumerID的自动生成方式。阅读操作指南:发布和订阅指南 以了解如何创建和应用pub/sub配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : mqtt-pubsub
spec :
type : pubsub.mqtt
version : v1
metadata :
- name : url
value : "tcp://[username][:password]@host.domain[:port]"
- name : qos
value : 1
- name : retain
value : "false"
- name : cleanSession
value : "false"
- name : consumerID
value : "channel1"
警告 上述示例中使用了明文字符串作为密钥。建议使用密钥存储来保护密钥,详情请参阅
这里 。
规格元数据字段 字段 必需 详情 示例 url Y MQTT broker的地址。可以使用secretKeyRef
来引用密钥。 对于非TLS通信,使用**tcp://
** URI方案。 对于TLS通信,使用**ssl://
** URI方案。 "tcp://[username][:password]@host.domain[:port]"
consumerID N 用于连接到MQTT broker的消费者连接的客户端ID。默认为Dapr应用ID。 注意:如果未设置producerID
,则在此值后附加-consumer
用于消费者连接 可以设置为字符串值(如上例中的"channel1"
)或字符串格式值(如"{podName}"
等)。查看可以在组件元数据中使用的所有模板标签。 producerID N 用于连接到MQTT broker的生产者连接的客户端ID。默认为{consumerID}-producer
。 "myMqttProducerApp"
qos N 表示消息的服务质量级别(QoS)(更多信息 )。默认为1
。 0
, 1
, 2
retain N 定义broker是否将消息保存为指定主题的最后已知良好值。默认为"false"
。 "true"
, "false"
cleanSession N 如果为"true"
,则在连接消息中设置clean_session
标志到MQTT broker(更多信息 )。默认为"false"
。 "true"
, "false"
caCert 使用TLS时必需 用于验证服务器TLS证书的证书颁发机构(CA)证书,格式为PEM。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert 使用TLS时必需 TLS客户端证书,格式为PEM。必须与clientKey
一起使用。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey 使用TLS时必需 TLS客户端密钥,格式为PEM。必须与clientCert
一起使用。可以使用secretKeyRef
来引用密钥。 "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
启用消息传递重试 MQTT pub/sub组件不支持内置的重试策略。这意味着sidecar只会向服务发送一次消息。如果服务标记消息为未处理,则消息不会被确认回broker。只有当broker重新发送消息时,才会重试。
要使Dapr使用更复杂的重试策略,可以将重试弹性策略 应用于MQTT pub/sub组件。
两种重试方式之间有一个关键区别:
未确认消息的重新传递完全依赖于broker。Dapr不保证这一点。一些broker如emqx 、vernemq 等支持它,但它不是MQTT3规范 的一部分。
使用重试弹性策略 使得同一个Dapr sidecar重试重新传递消息。因此是同一个Dapr sidecar和同一个应用接收相同的消息。
使用TLS进行通信 要配置使用TLS进行通信,请确保MQTT broker(例如,mosquitto)配置为支持证书,并在组件配置中提供caCert
、clientCert
、clientKey
元数据。例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : mqtt-pubsub
spec :
type : pubsub.mqtt
version : v1
metadata :
- name : url
value : "ssl://host.domain[:port]"
- name : qos
value : 1
- name : retain
value : "false"
- name : cleanSession
value : "false"
- name : caCert
value : ${{ myLoadedCACert }}
- name : clientCert
value : ${{ myLoadedClientCert }}
- name : clientKey
secretKeyRef :
name : myMqttClientKey
key : myMqttClientKey
auth :
secretStore : <SECRET_STORE_NAME>
注意,虽然caCert
和clientCert
值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。
消费共享主题 在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run
时使用不同的应用ID即可使它们从同一个共享主题中消费。然而,在Kubernetes上,应用pod的多个实例将共享相同的应用ID,禁止所有实例消费同一个主题。为了解决这个问题,配置组件的consumerID
元数据为{uuid}
标签,这将在启动时为每个实例提供一个随机生成的consumerID
值。例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : mqtt-pubsub
spec :
type : pubsub.mqtt
version : v1
metadata :
- name : consumerID
value : "{uuid}"
- name : url
value : "tcp://admin:public@localhost:1883"
- name : qos
value : 1
- name : retain
value : "false"
- name : cleanSession
value : "true"
警告 上述示例中使用了明文字符串作为密钥。建议使用密钥存储来保护密钥,详情请参阅
这里 。
注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此我们也将cleanSession
设置为true。
创建MQTT broker 您可以使用Docker本地运行 MQTT broker:
docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6
然后您可以使用客户端端口与服务器交互:mqtt://localhost:1883
您可以在kubernetes中使用以下yaml运行MQTT broker:
apiVersion : apps/v1
kind : Deployment
metadata :
name : mqtt-broker
labels :
app-name : mqtt-broker
spec :
replicas : 1
selector :
matchLabels :
app-name : mqtt-broker
template :
metadata :
labels :
app-name : mqtt-broker
spec :
containers :
- name : mqtt
image : eclipse-mosquitto:1.6
imagePullPolicy : IfNotPresent
ports :
- name : default
containerPort : 1883
protocol : TCP
- name : websocket
containerPort : 9001
protocol : TCP
---
apiVersion : v1
kind : Service
metadata :
name : mqtt-broker
labels :
app-name : mqtt-broker
spec :
type : ClusterIP
selector :
app-name : mqtt-broker
ports :
- port : 1883
targetPort : default
name : default
protocol : TCP
- port : 9001
targetPort : websocket
name : websocket
protocol : TCP
然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883
相关链接 10 - MQTT3 MQTT3 发布订阅组件的详细文档
组件格式 要配置一个MQTT3发布/订阅组件,请创建一个类型为pubsub.mqtt3
的组件。请参阅发布/订阅代理组件文件 以了解如何自动生成ConsumerID。阅读操作指南:发布和订阅指南 以了解如何创建和应用发布/订阅配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : mqtt-pubsub
spec :
type : pubsub.mqtt3
version : v1
metadata :
- name : url
value : "tcp://[username][:password]@host.domain[:port]"
# 可选
- name : retain
value : "false"
- name : cleanSession
value : "false"
- name : qos
value : "1"
- name : consumerID
value : "channel1"
警告 上述示例中使用了明文字符串作为密钥。建议使用密钥存储来管理密钥,如
此处 所述。
规格元数据字段 字段 必需 详情 示例 url
Y MQTT broker的地址。可以使用secretKeyRef
来引用密钥。 对于非TLS通信,使用**tcp://
** URI方案。 对于TLS通信,使用**ssl://
** URI方案。 "tcp://[username][:password]@host.domain[:port]"
consumerID
N 用于连接到MQTT broker的客户端ID。默认为Dapr应用ID。 可以设置为字符串值(如上例中的"channel1"
)或字符串格式值(如"{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 retain
N 定义消息是否由broker保存为指定主题的最后已知良好值。默认为"false"
。 "true"
,"false"
cleanSession
N 如果为"true"
,则在连接消息中设置clean_session
标志到MQTT broker(更多信息 )。默认为"false"
。 "true"
,"false"
caCert
使用TLS时必需 用于验证服务器TLS证书的PEM格式的证书颁发机构(CA)证书。 参见下面的示例 clientCert
使用TLS时必需 PEM格式的TLS客户端证书。必须与clientKey
一起使用。 参见下面的示例 clientKey
使用TLS时必需 PEM格式的TLS客户端密钥。必须与clientCert
一起使用。可以使用secretKeyRef
来引用密钥。 参见下面的示例 qos
N 表示消息的服务质量级别(QoS)(更多信息 )。默认为1
。 0
,1
,2
使用TLS进行通信 要配置使用TLS进行通信,请确保MQTT broker(例如emqx)配置为支持证书,并在组件配置中提供caCert
,clientCert
,clientKey
元数据。例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : mqtt-pubsub
spec :
type : pubsub.mqtt3
version : v1
metadata :
- name : url
value : "ssl://host.domain[:port]"
# TLS配置
- name : caCert
value : |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
- name : clientCert
value : |
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
- name : clientKey
secretKeyRef :
name : myMqttClientKey
key : myMqttClientKey
# 可选
- name : retain
value : "false"
- name : cleanSession
value : "false"
- name : qos
value : 1
请注意,虽然caCert
和clientCert
的值可能不是密钥,但为了方便起见,它们也可以从Dapr密钥存储中引用。
消费共享主题 在消费共享主题时,每个消费者必须有一个唯一标识符。默认情况下,应用ID用于唯一标识每个消费者和发布者。在selfhost模式下,调用每个dapr run
时使用不同的应用ID即可让它们从同一个共享主题中消费。然而,在Kubernetes上,应用Pod的多个实例将共享相同的应用ID,禁止所有实例消费相同的主题。为了解决这个问题,可以在组件的consumerID
元数据中配置一个{uuid}
标签(这将在启动时为每个实例生成一个随机值)或{podName}
(这将在Kubernetes上使用Pod的名称)。例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : mqtt-pubsub
spec :
type : pubsub.mqtt3
version : v1
metadata :
- name : consumerID
value : "{uuid}"
- name : cleanSession
value : "true"
- name : url
value : "tcp://admin:public@localhost:1883"
- name : qos
value : 1
- name : retain
value : "false"
警告 上述示例中使用了明文字符串作为密钥。建议使用密钥存储来管理密钥,如
此处 所述。
请注意,在这种情况下,每次Dapr重启时,consumer ID的值都是随机的,因此您也应该将cleanSession
设置为true
。
建议使用StatefulSets 进行共享订阅。
创建一个MQTT3 broker 您可以使用Docker在本地运行一个像emqx这样的MQTT broker:
docker run -d -p 1883:1883 --name mqtt emqx:latest
然后您可以使用客户端端口与服务器交互:tcp://localhost:1883
您可以使用以下yaml在Kubernetes中运行一个MQTT3 broker:
apiVersion : apps/v1
kind : Deployment
metadata :
name : mqtt-broker
labels :
app-name : mqtt-broker
spec :
replicas : 1
selector :
matchLabels :
app-name : mqtt-broker
template :
metadata :
labels :
app-name : mqtt-broker
spec :
containers :
- name : mqtt
image : emqx:latest
imagePullPolicy : IfNotPresent
ports :
- name : default
containerPort : 1883
protocol : TCP
---
apiVersion : v1
kind : Service
metadata :
name : mqtt-broker
labels :
app-name : mqtt-broker
spec :
type : ClusterIP
selector :
app-name : mqtt-broker
ports :
- port : 1883
targetPort : default
name : default
protocol : TCP
然后您可以使用客户端端口与服务器交互:tcp://mqtt-broker.default.svc.cluster.local:1883
相关链接 11 - Pulsar 关于 Pulsar 发布/订阅组件的详细文档
组件格式 要配置 Apache Pulsar 的发布/订阅(pub/sub)功能,需要创建一个类型为 pubsub.pulsar
的组件。请参阅 pub/sub broker 组件文件 以了解 ConsumerID 的自动生成方式。阅读 操作指南:发布和订阅 以了解如何创建和应用 pub/sub 配置。
有关 Apache Pulsar 的更多信息,请阅读官方文档 。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : pulsar-pubsub
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "localhost:6650"
- name : enableTLS
value : "false"
- name : tenant
value : "public"
- name : token
value : "eyJrZXlJZCI6InB1bHNhci1wajU0cXd3ZHB6NGIiLCJhbGciOiJIUzI1NiJ9.eyJzd"
- name : consumerID
value : "channel1"
- name : namespace
value : "default"
- name : persistent
value : "true"
- name : disableBatching
value : "false"
- name : receiverQueueSize
value : "1000"
- name : <topic-name>.jsonschema # 为配置的主题设置 JSON schema 验证
value : |
{
"type": "record",
"name": "Example",
"namespace": "test",
"fields": [
{"name": "ID","type": "int"},
{"name": "Name","type": "string"}
]
}
- name : <topic-name>.avroschema # 为配置的主题设置 Avro schema 验证
value : |
{
"type": "record",
"name": "Example",
"namespace": "test",
"fields": [
{"name": "ID","type": "int"},
{"name": "Name","type": "string"}
]
}
警告 上面的示例使用了明文字符串作为 secret。建议使用
secret 存储 来存储 secret。此组件支持将
token
参数和其他敏感数据存储为 Kubernetes Secrets。
规格元数据字段 字段 必需 详情 示例 host Y Pulsar broker 的地址。默认值为 "localhost:6650"
"localhost:6650"
或 "http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080"
enableTLS N 是否启用 TLS。默认值: "false"
"true"
, "false"
tenant N 主题的租户。租户是 Pulsar 多租户的关键,并跨集群分布。默认值: "public"
"public"
consumerID N 用于设置订阅名称或消费者 ID。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看可以在组件元数据中使用的所有模板标签。 namespace N 主题的管理单元,作为相关主题的分组机制。默认值: "default"
"default"
persistent N Pulsar 支持两种类型的主题:持久化 和 非持久化 。持久化主题的所有消息都存储在磁盘上,而非持久化主题的数据不会存储到磁盘。 disableBatching N 是否禁用批处理。启用批处理时,默认批处理延迟为 10 毫秒,默认批处理大小为 1000 条消息,设置 disableBatching: true
将使生产者单独发送消息。默认值: "false"
"true"
, "false"
receiverQueueSize N 设置消费者接收队列的大小。控制消费者在被 Dapr 显式调用读取消息之前可以累积多少消息。默认值: "1000"
"1000"
batchingMaxPublishDelay N 设置消息发送的批处理时间段(如果启用了批处理消息)。如果设置为非零值,消息将排队直到此时间间隔或 batchingMaxMessages(见下文)或 batchingMaxSize(见下文)。有两种有效格式,一种是带单位后缀的分数格式,另一种是纯数字格式,处理为毫秒。有效的时间单位有 “ns”, “us” (或 “µs”), “ms”, “s”, “m”, “h”。默认值: "10ms"
"10ms"
, "10"
batchingMaxMessages N 设置批处理中允许的最大消息数。如果设置为大于 1 的值,消息将排队直到达到此阈值或 batchingMaxSize(见下文)或批处理间隔已过。默认值: "1000"
"1000"
batchingMaxSize N 设置批处理中允许的最大字节数。如果设置为大于 1 的值,消息将排队直到达到此阈值或 batchingMaxMessages(见上文)或批处理间隔已过。默认值: "128KB"
"131072"
.jsonschema N 为配置的主题强制执行 JSON schema 验证。 .avroschema N 为配置的主题强制执行 Avro schema 验证。 publicKey N 用于发布者和消费者加密的公钥。值可以是两种选项之一:本地 PEM 证书的文件路径,或证书数据字符串值 privateKey N 用于消费者加密的私钥。值可以是两种选项之一:本地 PEM 证书的文件路径,或证书数据字符串值 keys N 包含 Pulsar 会话密钥 名称的逗号分隔字符串。与 publicKey
一起用于发布者加密 processMode N 是否支持同时处理多条消息。默认值: "async"
"async"
, "sync"
subscribeType N Pulsar 支持四种 订阅类型 。默认值: "shared"
"shared"
, "exclusive"
, "failover"
, "key_shared"
partitionKey N 设置消息的路由策略键。默认值: ""
maxConcurrentHandlers
N 定义并发消息处理程序的最大数量。默认值: 100
10
使用 Token 进行身份验证 要使用静态 JWT token 进行 Pulsar 身份验证,可以使用以下元数据字段:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : messagebus
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "pulsar.example.com:6650"
- name : token
secretKeyRef :
name : pulsar
key : token
使用 OIDC 进行身份验证 自 v3.0
起,Pulsar 支持 OIDC 身份验证 。
要启用 OIDC 身份验证,您需要向组件规范提供以下 OAuth2 参数。
OAuth2 身份验证不能与 token 身份验证结合使用。
建议您使用 secret 引用来获取客户端 secret。
Pulsar 的 OAuth2 身份验证器不完全符合 OIDC,因此您有责任确保字段符合要求。例如,发行者 URL 必须使用 https
协议,请求的范围包括 openid
等。
如果省略 oauth2TokenCAPEM
字段,则系统的证书池将用于连接到 OAuth2 发行者(如果使用 https
)。
字段 必需 详情 示例 oauth2TokenURL N 请求 OIDC client_credentials token 的 URL。不能为空。 “https://oauth.example.com/o/oauth2/token"` oauth2TokenCAPEM N 连接到 OAuth2 发行者的 CA PEM 证书包。如果未定义,将使用系统的证书池。 "---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
oauth2ClientID N OIDC 客户端 ID。不能为空。 "my-client-id"
oauth2ClientSecret N OIDC 客户端 secret。不能为空。 "my-client-secret"
oauth2Audiences N 请求的受众的逗号分隔列表。不能为空。 "my-audience-1,my-audience-2"
oauth2Scopes N 请求的范围的逗号分隔列表。不能为空。 "openid,profile,email"
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : messagebus
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "pulsar.example.com:6650"
- name : oauth2TokenURL
value : https://oauth.example.com/o/oauth2/token
- name : oauth2TokenCAPEM
value : "---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
- name : oauth2ClientID
value : my-client-id
- name : oauth2ClientSecret
secretKeyRef :
name : pulsar-oauth2
key : my-client-secret
- name : oauth2Audiences
value : "my.pulsar.example.com,another.pulsar.example.com"
- name : oauth2Scopes
value : "openid,profile,email"
启用消息传递重试 Pulsar pub/sub 组件没有内置的重试策略支持。这意味着 sidecar 仅向服务发送一次消息,失败时不会重试。要使 Dapr 使用更复杂的重试策略,您可以将 重试弹性策略 应用于 Pulsar pub/sub 组件。请注意,这将是同一个 Dapr sidecar 重试将消息重新传递到同一个应用实例,而不是其他实例。
延迟队列 在调用 Pulsar pub/sub 时,可以通过请求 URL 中的 metadata
查询参数提供可选的延迟队列。
这些可选参数名称是 metadata.deliverAt
或 metadata.deliverAfter
:
deliverAt
: 延迟消息在指定时间(RFC3339 格式)交付;例如,"2021-09-01T10:00:00Z"
deliverAfter
: 延迟消息在指定时间后交付;例如,"4h5m3s"
示例:
curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAt= '2021-09-01T10:00:00Z' \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
或
curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAfter= '4h5m3s' \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
端到端加密 Dapr 支持设置公钥和私钥对以启用 Pulsar 的 端到端加密功能 。
从文件证书启用发布者加密 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : messagebus
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "localhost:6650"
- name : publicKey
value : ./public.key
- name : keys
value : myapp.key
从文件证书启用消费者加密 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : messagebus
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "localhost:6650"
- name : publicKey
value : ./public.key
- name : privateKey
value : ./private.key
从值启用发布者加密 注意:建议 从 secret 引用公钥 。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : messagebus
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "localhost:6650"
- name : publicKey
value : "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
- name : keys
value : myapp.key
从值启用消费者加密 注意:建议 从 secret 引用公钥和私钥 。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : messagebus
spec :
type : pubsub.pulsar
version : v1
metadata :
- name : host
value : "localhost:6650"
- name : publicKey
value : "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
- name : privateKey
value : "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n"
分区键 在调用 Pulsar pub/sub 时,可以通过请求 URL 中的 metadata
查询参数提供可选的分区键。
参数名称是 partitionKey
。
示例:
curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.partitionKey= key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
消息头 所有其他元数据键/值对(不是 partitionKey
)都设置为 Pulsar 消息中的头。例如,为消息设置 correlationId
:
curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.correlationId= myCorrelationID& metadata.partitionKey= key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
顺序保证 为了确保消息按顺序到达订阅特定键的每个消费者,必须满足三个条件。
subscribeType
应设置为 key_shared
。必须设置 partitionKey
。 processMode
应设置为 sync
。创建一个 Pulsar 实例 docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.5.1 \
bin/pulsar standalone
相关链接 12 - RabbitMQ RabbitMQ pubsub 组件的详细说明文档
组件格式 要设置 RabbitMQ 的发布/订阅功能,请创建一个类型为 pubsub.rabbitmq
的组件。请参阅 pub/sub broker 组件文件 以了解消费者ID(ConsumerID)是如何自动生成的。阅读 How-to: 发布和订阅指南 以了解如何创建和应用 pub/sub 配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : rabbitmq-pubsub
spec :
type : pubsub.rabbitmq
version : v1
metadata :
- name : connectionString
value : "amqp://localhost:5672"
- name : protocol
value : amqp
- name : hostname
value : localhost
- name : username
value : username
- name : password
value : password
- name : consumerID
value : channel1
- name : durable
value : false
- name : deletedWhenUnused
value : false
- name : autoAck
value : false
- name : deliveryMode
value : 0
- name : requeueInFailure
value : false
- name : prefetchCount
value : 0
- name : reconnectWait
value : 0
- name : concurrencyMode
value : parallel
- name : publisherConfirm
value : false
- name : enableDeadLetter # 可选,是否启用死信
value : true
- name : maxLen # 可选,队列中的最大消息数
value : 3000
- name : maxLenBytes # 可选,队列的最大字节长度
value : 10485760
- name : exchangeKind
value : fanout
- name : saslExternal
value : false
- name : ttlInSeconds
value : 60
- name : clientName
value : {podName}
- name : heartBeat
value : 10s
警告 上述示例使用明文字符串作为 secrets。建议使用 secret 存储来存储 secrets,如
此处 所述。
规格元数据字段 字段 必需 详情 示例 connectionString Y* RabbitMQ 连接字符串。*与 protocol、hostname、username、password 字段互斥 amqp://user:pass@localhost:5672
protocol N* RabbitMQ 协议。*与 connectionString 字段互斥 amqp
hostname N* RabbitMQ 主机名。*与 connectionString 字段互斥 localhost
username N* RabbitMQ 用户名。*与 connectionString 字段互斥 username
password N* RabbitMQ 密码。*与 connectionString 字段互斥 password
consumerID N 消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时会将其设置为 Dapr 应用程序 ID (appID
) 的值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 durable N 是否使用 持久化 队列。默认为 "false"
"true"
,"false"
deletedWhenUnused N 队列是否应配置为 自动删除 默认为 "true"
"true"
,"false"
autoAck N 队列消费者是否应 自动确认 消息。默认为 "false"
"true"
,"false"
deliveryMode N 发布消息时的持久性模式。默认为 "0"
。RabbitMQ 将 "2"
视为持久性,其他数字视为非持久性 "0"
,"2"
requeueInFailure N 在失败情况下发送 负确认 时是否重新排队。默认为 "false"
"true"
,"false"
prefetchCount N 要 预取 的消息数量。考虑将其更改为非零值以用于生产环境。默认为 "0"
,这意味着将预取所有可用消息。 "2"
publisherConfirm N 如果启用,客户端在发布消息后等待 发布者确认 。默认为 "false"
"true"
,"false"
reconnectWait N 如果发生连接故障,重新连接前等待的时间(以秒为单位) "0"
concurrencyMode N parallel
是默认值,允许并行处理多个消息(如果配置了 app-max-concurrency
注释,则受其限制)。设置为 single
以禁用并行处理。在大多数情况下,没有理由更改此设置。parallel
,single
enableDeadLetter N 启用将无法处理的消息转发到死信主题。默认为 "false"
"true"
,"false"
maxLen N 队列及其死信队列(如果启用了死信)的最大消息数。如果同时设置了 maxLen
和 maxLenBytes
,则两者都将适用;首先达到的限制将被强制执行。默认为无限制。 "1000"
maxLenBytes N 队列及其死信队列(如果启用了死信)的最大字节长度。如果同时设置了 maxLen
和 maxLenBytes
,则两者都将适用;首先达到的限制将被强制执行。默认为无限制。 "1048576"
exchangeKind N rabbitmq 交换的交换类型。默认为 "fanout"
。 "fanout"
,"topic"
saslExternal N 使用 TLS 时,用户名是否应从附加字段(例如 CN)中获取。请参阅 RabbitMQ 认证机制 。默认为 "false"
。 "true"
,"false"
ttlInSeconds N 在组件级别设置消息 TTL,可以通过每个请求的消息级别 TTL 覆盖。 "60"
caCert 使用 TLS 时必需 用于验证服务器 TLS 证书的 PEM 格式的证书颁发机构(CA)证书。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert 使用 TLS 时必需 PEM 格式的 TLS 客户端证书。必须与 clientKey
一起使用。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey 使用 TLS 时必需 PEM 格式的 TLS 客户端密钥。必须与 clientCert
一起使用。可以是 secretKeyRef
以使用 secret 引用。 "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
clientName N 这个 RabbitMQ 客户端提供的连接名称 是一个自定义标识符。如果设置,标识符将在 RabbitMQ 服务器日志条目和管理 UI 中提及。可以设置为 {uuid}、{podName} 或 {appID},Dapr 运行时将其替换为实际值。 "app1"
,{uuid}
,{podName}
,{appID}
heartBeat N 定义与服务器的心跳间隔,检测与 RabbitMQ 服务器的对等 TCP 连接的存活性。默认为 10s
。 "10s"
使用 TLS 进行通信 要配置使用 TLS 的通信,请确保 RabbitMQ 节点已启用 TLS,并在组件配置中提供 caCert
、clientCert
、clientKey
元数据。例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : rabbitmq-pubsub
spec :
type : pubsub.rabbitmq
version : v1
metadata :
- name : host
value : "amqps://localhost:5671"
- name : consumerID
value : myapp
- name : durable
value : false
- name : deletedWhenUnused
value : false
- name : autoAck
value : false
- name : deliveryMode
value : 0
- name : requeueInFailure
value : false
- name : prefetchCount
value : 0
- name : reconnectWait
value : 0
- name : concurrencyMode
value : parallel
- name : publisherConfirm
value : false
- name : enableDeadLetter # 可选,是否启用死信
value : true
- name : maxLen # 可选,队列中的最大消息数
value : 3000
- name : maxLenBytes # 可选,队列的最大字节长度
value : 10485760
- name : exchangeKind
value : fanout
- name : saslExternal
value : false
- name : caCert
value : ${{ myLoadedCACert }}
- name : clientCert
value : ${{ myLoadedClientCert }}
- name : clientKey
secretKeyRef :
name : myRabbitMQClientKey
key : myRabbitMQClientKey
请注意,虽然 caCert
和 clientCert
值可能不是 secrets,但为了方便起见,它们也可以从 Dapr secret 存储中引用。
启用消息传递重试 RabbitMQ pub/sub 组件不支持内置的重试策略。这意味着 sidecar 仅将消息发送到服务一次。当服务返回结果时,无论消息是否正确处理,消息都将被标记为已消费。请注意,这在所有 Dapr PubSub 组件中都是常见的,而不仅仅是 RabbitMQ。
当 autoAck
设置为 false
且 requeueInFailure
设置为 true
时,Dapr 可以尝试第二次重新传递消息。
要使 Dapr 使用更复杂的重试策略,您可以将 重试弹性策略 应用于 RabbitMQ pub/sub 组件。
两种重试消息的方法之间有一个关键区别:
使用 autoAck = false
和 requeueInFailure = true
时,RabbitMQ 负责重新传递消息,任何 订阅者都可以获取重新传递的消息。如果您的消费者有多个实例,那么可能会有另一个消费者获取它。这通常是更好的方法,因为如果存在瞬态故障,另一个工作者更有可能成功处理消息。 使用 Resiliency 使同一个 Dapr sidecar 重试重新传递消息。因此,将是同一个 Dapr sidecar 和同一个应用程序接收相同的消息。 创建 RabbitMQ 服务器 您可以使用 Docker 在本地运行 RabbitMQ 服务器:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
然后,您可以使用客户端端口与服务器交互:localhost:5672
。
在 Kubernetes 上安装 RabbitMQ 的最简单方法是使用 Helm chart :
helm install rabbitmq stable/rabbitmq
查看 chart 输出并获取用户名和密码。
这将 RabbitMQ 安装到 default
命名空间。要与 RabbitMQ 交互,请使用以下命令查找服务:kubectl get svc rabbitmq
。
例如,如果使用上述示例进行安装,RabbitMQ 服务器客户端地址将是:
rabbitmq.default.svc.cluster.local:5672
使用主题交换路由消息 将 exchangeKind
设置为 "topic"
使用主题交换,这通常用于消息的多播路由。为了使用主题交换路由消息,您必须设置以下元数据:
例如,如果应用程序配置了路由键 keyA
和 queueName
为 queue-A
:
apiVersion : dapr.io/v2alpha1
kind : Subscription
metadata :
name : orderspubsub
spec :
topic : B
routes :
default : /B
pubsubname : pubsub
metadata :
routingKey : keyA
queueName : queue-A
它将接收路由键为 keyA
的消息,而其他路由键的消息将不被接收。
// 发布路由键为 `keyA` 的消息,这些消息将被上述示例接收。
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is a message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyA"}))
// 发布路由键为 `keyB` 的消息,这些消息将不被上述示例接收。
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyB"}))
绑定多个 routingKey
多个路由键可以用逗号分隔。 下面的示例绑定了三个 routingKey
:keyA
、keyB
和 ""
。请注意空键的绑定方法。
apiVersion : dapr.io/v2alpha1
kind : Subscription
metadata :
name : orderspubsub
spec :
topic : B
routes :
default : /B
pubsubname : pubsub
metadata :
routingKey : keyA,keyB,
有关更多信息,请参阅 rabbitmq 交换 。
使用优先级队列 Dapr 支持 RabbitMQ 优先级队列 。要为队列设置优先级,请使用 maxPriority
主题订阅元数据。
声明式优先级队列示例 apiVersion : dapr.io/v2alpha1
kind : Subscription
metadata :
name : pubsub
spec :
topic : checkout
routes :
default : /orders
pubsubname : order-pub-sub
metadata :
maxPriority : 3
编程优先级队列示例 @app.route ( '/dapr/subscribe' , methods = [ 'GET' ])
def subscribe ():
subscriptions = [
{
'pubsubname' : 'pubsub' ,
'topic' : 'checkout' ,
'routes' : {
'default' : '/orders'
},
'metadata' : { 'maxPriority' : '3' }
}
]
return jsonify ( subscriptions )
const express = require ( 'express' )
const bodyParser = require ( 'body-parser' )
const app = express ()
app . use ( bodyParser . json ({ type : 'application/*+json' }));
const port = 3000
app . get ( '/dapr/subscribe' , ( req , res ) => {
res . json ([
{
pubsubname : "pubsub" ,
topic : "checkout" ,
routes : {
default : '/orders'
},
metadata : {
maxPriority : '3'
}
}
]);
})
package main
"encoding/json"
"net/http"
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map [ string ] string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules [] rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
// 处理 /dapr/subscribe
func configureSubscribeHandler ( w http . ResponseWriter , _ * http . Request ) {
t := [] subscription {
{
PubsubName : "pubsub" ,
Topic : "checkout" ,
Routes : routes {
Default : "/orders" ,
},
Metadata : map [ string ] string {
"maxPriority" : "3"
},
},
}
w . WriteHeader ( http . StatusOK )
json . NewEncoder ( w ). Encode ( t )
}
发布消息时设置优先级 要在消息上设置优先级,请将发布元数据键 maxPriority
添加到发布端点或 SDK 方法。
HTTP API (Bash)
Python
JavaScript
Go curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders?metadata.priority= 3 -H "Content-Type: application/json" -d '{"orderId": "100"}'
with DaprClient () as client :
result = client . publish_event (
pubsub_name = PUBSUB_NAME ,
topic_name = TOPIC_NAME ,
data = json . dumps ( orderId ),
data_content_type = 'application/json' ,
metadata = { 'priority' : '3' })
await client . pubsub . publish ( PUBSUB_NAME , TOPIC_NAME , orderId , { 'priority' : '3' });
client . PublishEvent ( ctx , PUBSUB_NAME , TOPIC_NAME , [] byte ( strconv . Itoa ( orderId )), map [ string ] string { "priority" : "3" })
使用仲裁队列 默认情况下,Dapr 创建 经典
队列。要创建 仲裁
队列,请将以下元数据添加到您的 pub/sub 订阅
apiVersion : dapr.io/v2alpha1
kind : Subscription
metadata :
name : pubsub
spec :
topic : checkout
routes :
default : /orders
pubsubname : order-pub-sub
metadata :
queueType : quorum
生存时间 您可以在消息级别或组件级别设置生存时间(TTL)值。使用组件规范 ttlInSeconds
字段在组件中设置默认组件级别 TTL。
注意 如果同时设置了组件级别和消息级别 TTL,则默认组件级别 TTL 将被忽略,以消息级别 TTL 为准。单一活动消费者 RabbitMQ 单一活动消费者 设置确保一次只有一个消费者从队列中处理消息,并在活动消费者被取消或失败时切换到另一个注册的消费者。当消息必须按到达队列的确切顺序消费且不支持多实例分布式处理时,可能需要这种方法。
当 Dapr 在队列上启用此选项时,Dapr 运行时的一个实例将是单一活动消费者。为了在故障情况下允许另一个应用程序实例接管,Dapr 运行时必须 探测应用程序的健康状况 并从 pub/sub 组件中取消订阅。
注意 这种模式将阻止应用程序扩展,因为只有一个实例可以处理负载。虽然对于 Dapr 与遗留或敏感应用程序的集成可能很有趣,但如果您需要可扩展性,您应该考虑允许分布式处理的设计。apiVersion : dapr.io/v2alpha1
kind : Subscription
metadata :
name : pubsub
spec :
topic : orders
routes :
default : /orders
pubsubname : order-pub-sub
metadata :
singleActiveConsumer : "true"
相关链接 13 - Redis Streams 关于 Redis Streams pubsub 组件的详细文档
组件格式 要设置 Redis Streams pub/sub,创建一个类型为 pubsub.redis
的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 操作指南:发布和订阅指南 了解如何创建和应用 pub/sub 配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : redis-pubsub
spec :
type : pubsub.redis
version : v1
metadata :
- name : redisHost
value : localhost:6379
- name : redisPassword
value : "KeFg23!"
- name : consumerID
value : "channel1"
- name : useEntraID
value : "true"
- name : enableTLS
value : "false"
警告 上述示例使用了明文字符串作为密钥。建议使用密钥存储来保护密钥,具体方法请参阅
此处 。
规格元数据字段 字段 必需 详情 示例 redisHost Y Redis 主机的连接字符串。如果 "redisType"
是 "cluster"
,可以是多个主机用逗号分隔,或仅一个主机 localhost:6379
, redis-master.default.svc.cluster.local:6379
redisPassword N Redis 主机的密码。无默认值。可以是 secretKeyRef
以使用密钥引用 ""
, "KeFg23!"
redisUsername N Redis 主机的用户名。默认为空。确保您的 Redis 服务器版本为 6 或更高,并正确创建了 ACL 规则。 ""
, "default"
consumerID N 消费者组 ID。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 useEntraID N 实现对 Azure Cache for Redis 的 EntraID 支持。启用此功能之前:必须以 "server:port"
的形式指定 redisHost
名称 必须启用 TLS 了解更多关于此设置的信息,请参阅 创建 Redis 实例 > Azure Cache for Redis "true"
, "false"
enableTLS N 如果 Redis 实例支持带有公共证书的 TLS,可以配置为启用或禁用。默认为 "false"
"true"
, "false"
clientCert N 客户端证书的内容,用于需要客户端证书的 Redis 实例。必须与 clientKey
一起使用,并且 enableTLS
必须设置为 true。建议使用密钥存储,如 此处 所述 "----BEGIN CERTIFICATE-----\nMIIC..."
clientKey N 客户端私钥的内容,与 clientCert
一起用于身份验证。建议使用密钥存储,如 此处 所述 "----BEGIN PRIVATE KEY-----\nMIIE..."
redeliverInterval N 检查待处理消息以重新传递的间隔。可以使用 Go duration 字符串(例如 “ms”, “s”, “m”)或毫秒数。默认为 "60s"
。"0"
禁用重新传递。 "30s"
, "5000"
processingTimeout N 消息在尝试重新传递之前必须挂起的时间量。可以使用 Go duration 字符串(例如 “ms”, “s”, “m”)或毫秒数。默认为 "15s"
。"0"
禁用重新传递。 "60s"
, "600000"
queueDepth N 处理消息的队列大小。默认为 "100"
。 "1000"
concurrency N 处理消息的并发工作者数量。默认为 "10"
。 "15"
redisType N Redis 的类型。有两个有效值,一个是 "node"
表示单节点模式,另一个是 "cluster"
表示 Redis 集群模式。默认为 "node"
。 "cluster"
redisDB N 连接到 Redis 后选择的数据库。如果 "redisType"
是 "cluster"
,此选项将被忽略。默认为 "0"
。 "0"
redisMaxRetries N 在放弃之前重试命令的最大次数。默认情况下不重试失败的命令。 "5"
redisMinRetryInterval N 每次重试之间 Redis 命令的最小回退时间。默认为 "8ms"
;"-1"
禁用回退。 "8ms"
redisMaxRetryInterval N 每次重试之间 Redis 命令的最大回退时间。默认为 "512ms"
;"-1"
禁用回退。 "5s"
dialTimeout N 建立新连接的拨号超时时间。默认为 "5s"
。 "5s"
readTimeout N 套接字读取的超时时间。如果达到,Redis 命令将因超时而失败而不是阻塞。默认为 "3s"
,"-1"
表示无超时。 "3s"
writeTimeout N 套接字写入的超时时间。如果达到,Redis 命令将因超时而失败而不是阻塞。默认值为 readTimeout。 "3s"
poolSize N 最大套接字连接数。默认是每个 CPU 10 个连接,由 runtime.NumCPU 报告。 "20"
poolTimeout N 如果所有连接都忙,客户端等待连接的时间量,然后返回错误。默认是 readTimeout + 1 秒。 "5s"
maxConnAge N 客户端退役(关闭)连接的连接年龄。默认是不关闭老化连接。 "30m"
minIdleConns N 为了避免创建新连接的性能下降,保持打开的最小空闲连接数。默认为 "0"
。 "2"
idleCheckFrequency N 空闲连接清理器进行空闲检查的频率。默认为 "1m"
。"-1"
禁用空闲连接清理器。 "-1"
idleTimeout N 客户端关闭空闲连接的时间量。应小于服务器的超时时间。默认为 "5m"
。"-1"
禁用空闲超时检查。 "10m"
failover N 启用故障转移配置的属性。需要设置 sentinalMasterName。默认为 "false"
"true"
, "false"
sentinelMasterName N Sentinel 主名称。参见 Redis Sentinel 文档 ""
, "127.0.0.1:6379"
maxLenApprox N 流内的最大项目数。当达到指定长度时,旧条目会自动被驱逐,以便流保持恒定大小。默认为无限制。 "10000"
创建 Redis 实例 Dapr 可以使用任何 Redis 实例 - 无论是容器化的、在本地开发机器上运行的,还是托管的云服务,只要 Redis 的版本是 5.x 或 6.x。
Self-Hosted
Kubernetes
AWS
Azure
GCP Dapr CLI 会自动为您创建并设置一个 Redis Streams 实例。
当您运行 dapr init
时,Redis 实例将通过 Docker 安装,并且组件文件将创建在默认目录中。($HOME/.dapr/components
目录 (Mac/Linux) 或 %USERPROFILE%\.dapr\components
在 Windows 上)。
您可以使用 Helm 快速在 Kubernetes 集群中创建一个 Redis 实例。此方法需要 安装 Helm 。
将 Redis 安装到您的集群中。
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install redis bitnami/redis --set image.tag= 6.2
运行 kubectl get pods
查看现在在您的集群中运行的 Redis 容器。
在您的 redis.yaml 文件中将 redis-master:6379
添加为 redisHost
。例如:
metadata :
- name : redisHost
value : redis-master:6379
接下来,我们将获取我们的 Redis 密码,这在不同操作系统上略有不同:
Windows : 运行 kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" > encoded.b64
,这将创建一个包含您编码密码的文件。接下来,运行 certutil -decode encoded.b64 password.txt
,这将把您的 Redis 密码放入一个名为 password.txt
的文本文件中。复制密码并删除这两个文件。
Linux/MacOS : 运行 kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" | base64 --decode
并复制输出的密码。
将此密码作为 redisPassword
值添加到您的 redis.yaml 文件中。例如:
- name : redisPassword
value : "lhDOkwTlp0"
使用官方 Microsoft 文档创建 Azure Cache for Redis 实例。
一旦您的实例创建完成,从 Azure 门户获取主机名(FQDN)和您的访问密钥。
将您的密钥和主机名添加到 Dapr 可以应用到您集群的 redis.yaml
文件中。
如果您正在运行一个示例,将主机和密钥添加到提供的 redis.yaml
中。 如果您从头开始创建项目,请按照 组件格式部分 中的说明创建一个 redis.yaml
文件。 将 redisHost
键设置为 [上一步中的主机名]:6379
,并将 redisPassword
键设置为您之前保存的密钥。
注意: 在生产级应用程序中,请按照 密钥管理 指南安全地管理您的密钥。
启用 EntraID 支持:
在您的 Azure Redis 服务器上启用 Entra ID 身份验证。这可能需要几分钟。 将 useEntraID
设置为 "true"
以实现对 Azure Cache for Redis 的 EntraID 支持。 将 enableTLS
设置为 "true"
以支持 TLS。
注意: useEntraID
假设您的 UserPrincipal(通过 AzureCLICredential)或 SystemAssigned 托管身份具有 RedisDataOwner 角色权限。如果使用用户分配的身份,您需要指定 azureClientID
属性 。
注意 Dapr CLI 在 selfhost 模式下作为 dapr init
命令的一部分自动部署本地 redis 实例。相关链接 14 - RocketMQ 关于 RocketMQ pubsub 组件的详细文档
组件格式 要设置 RocketMQ pub/sub,创建一个类型为 pubsub.rocketmq
的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何:发布和订阅指南 了解如何创建和应用 pub/sub 配置。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : rocketmq-pubsub
spec :
type : pubsub.rocketmq
version : v1
metadata :
- name : instanceName
value : dapr-rocketmq-test
- name : consumerGroup
value : dapr-rocketmq-test-g-c
- name : producerGroup
value : dapr-rocketmq-test-g-p
- name : consumerID
value : channel1
- name : nameSpace
value : dapr-test
- name : nameServer
value : "127.0.0.1:9876,127.0.0.2:9876"
- name : retries
value : 3
- name : consumerModel
value : "clustering"
- name : consumeOrderly
value : false
警告 上述示例中,secret 使用了明文字符串。建议使用 secret 存储来存储 secret,如
此处 所述。
规格元数据字段 字段 必需 详情 默认值 示例 instanceName N 实例名称 time.Now().String()
dapr-rocketmq-test
consumerGroup N 消费者组名称。建议使用。如果 producerGroup
为 null
,则使用 groupName
。 dapr-rocketmq-test-g-c
producerGroup (consumerID) N 生产者组名称。建议使用。如果 producerGroup
为 null
,则使用 consumerID
。如果 consumerID
也为 null,则使用 groupName
。 dapr-rocketmq-test-g-p
consumerID N 消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,一条消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看可以在组件元数据中使用的所有模板标签。 groupName N 消费者/生产者组名称。已弃用 。 dapr-rocketmq-test-g
nameSpace N RocketMQ 命名空间 dapr-rocketmq
nameServerDomain N RocketMQ 名称服务器域名 https://my-app.net:8080/nsaddr
nameServer N RocketMQ 名称服务器,使用 “,” 或 “;” 分隔 127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKey N 访问密钥(用户名) "admin"
secretKey N 密钥(密码) "password"
securityToken N 安全令牌 retries N 向 broker 发送消息的重试次数 3
3
producerQueueSelector (queueSelector) N 生产者队列选择器。有五种队列选择器实现:hash
、random
、manual
、roundRobin
、dapr
。 dapr
hash
consumerModel N 定义消息如何传递到每个消费者客户端的消息模型。RocketMQ 支持两种消息模型:clustering
和 broadcasting
。 clustering
broadcasting
, clustering
fromWhere (consumeFromWhere) N 消费者启动时的消费点。有三个消费点:CONSUME_FROM_LAST_OFFSET
、CONSUME_FROM_FIRST_OFFSET
、CONSUME_FROM_TIMESTAMP
CONSUME_FROM_LAST_OFFSET
CONSUME_FROM_LAST_OFFSET
consumeTimestamp N 以秒为精度回溯消费时间。时间格式为 yyyymmddhhmmss
。例如,20131223171201
表示 2013 年 12 月 23 日 17:12:01 time.Now().Add(time.Minute * (-30)).Format("20060102150405")
20131223171201
consumeOrderly N 确定是否使用 FIFO 顺序的有序消息。 false
false
consumeMessageBatchMaxSize N 批量消费大小,范围 [1, 1024]
512
10
consumeConcurrentlyMaxSpan N 并发最大跨度偏移。这对顺序消费没有影响。范围:[1, 65535]
1000
1000
maxReconsumeTimes N 最大重新消费次数。-1
表示 16 次。如果消息在成功前被重新消费超过 {@link maxReconsumeTimes} 次,它们将被定向到删除队列。 顺序消息为 MaxInt32
;并发消息为 16
16
autoCommit N 启用自动提交 true
false
consumeTimeout N 消息可能阻塞消费线程的最大时间。时间单位:分钟 15
15
consumerPullTimeout N 套接字超时时间,单位为毫秒 pullInterval N 消息拉取间隔 100
100
pullBatchSize N 一次从 broker 拉取的消息数量。如果 pullBatchSize
为 null
,使用 ConsumerBatchSize
。pullBatchSize
范围 [1, 1024]
32
10
pullThresholdForQueue N 队列级别的流量控制阈值。默认情况下,每个消息队列将缓存最多 1000 条消息。考虑 PullBatchSize
- 瞬时值可能超过限制。范围:[1, 65535]
1024
1000
pullThresholdForTopic N 主题级别的流量控制阈值。如果 pullThresholdForQueue
不是无限制的,将被 pullThresholdForTopic
的值覆盖并计算。例如,如果 pullThresholdForTopic
的值为 1000,并且为此消费者分配了 10 个消息队列,则 pullThresholdForQueue
将设置为 100。范围:[1, 6553500]
-1(无限制)
10
pullThresholdSizeForQueue N 限制队列级别的缓存消息大小。考虑 pullBatchSize
- 瞬时值可能超过限制。消息的大小仅通过消息体测量,因此不准确。范围:[1, 1024]
100
100
pullThresholdSizeForTopic N 限制主题级别的缓存消息大小。如果 pullThresholdSizeForQueue
不是无限制的,将被 pullThresholdSizeForTopic
的值覆盖并计算。例如,如果 pullThresholdSizeForTopic
的值为 1000 MiB,并且为此消费者分配了 10 个消息队列,则 pullThresholdSizeForQueue
将设置为 100 MiB。范围:[1, 102400]
-1
100
content-type N 消息内容类型。 "text/plain"
"application/cloudevents+json; charset=utf-8"
, "application/octet-stream"
logLevel N 日志级别 warn
info
sendTimeOut N 连接 RocketMQ 的 broker 发送消息超时,以纳秒为单位。已弃用 。 3 秒 10000000000
sendTimeOutSec N 发布消息的超时时间,以秒为单位。如果 sendTimeOutSec
为 null
,则使用 sendTimeOut
。 3 秒 3
mspProperties N RocketMQ 消息属性集合中的属性传递给应用程序,数据用 “,” 分隔多个属性 key,mkey
出于向后兼容的原因,元数据中的以下值仍然支持,尽管不推荐使用。
字段(支持但已弃用) 必需 详情 示例 groupName N RocketMQ 发布者的生产者组名称 "my_unique_group_name"
sendTimeOut N 发布消息的超时时间,以纳秒为单位 0
consumerBatchSize N 一次从 broker 拉取的消息数量 32
设置 RocketMQ 请参阅 https://rocketmq.apache.org/docs/quick-start/ 以设置本地 RocketMQ 实例。
每次调用的元数据字段 分区键 在调用 RocketMQ pub/sub 时,可以通过在请求 URL 中使用 metadata
查询参数提供可选的分区键。
您需要在 metadata
中指定 rocketmq-tag
、"rocketmq-key"
、rocketmq-shardingkey
、rocketmq-queue
示例:
curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag= ?& metadata.rocketmq-key= ?& metadata.rocketmq-shardingkey= key& metadata.rocketmq-queue= 1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
队列选择器 RocketMQ 组件提供了五种队列选择器。RocketMQ 客户端提供以下队列选择器:
HashQueueSelector
RandomQueueSelector
RoundRobinQueueSelector
ManualQueueSelector
要了解有关这些 RocketMQ 客户端队列选择器的更多信息,请阅读 RocketMQ 文档 。
Dapr RocketMQ 组件实现了以下队列选择器:
本文重点介绍 DaprQueueSelector
的设计。
DaprQueueSelector DaprQueueSelector
集成了三个队列选择器:
HashQueueSelector
RoundRobinQueueSelector
ManualQueueSelector
DaprQueueSelector
从请求参数中获取队列 ID。您可以通过运行以下命令设置队列 ID:
http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1
ManualQueueSelector
是通过上述方法实现的。
接下来,DaprQueueSelector
尝试:
获取 ShardingKey
哈希 ShardingKey
以确定队列 ID。 您可以通过以下方式设置 ShardingKey
:
http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key
如果 ShardingKey
不存在,则使用 RoundRobin
算法确定队列 ID。
相关链接 15 - Solace-AMQP 关于 Solace-AMQP 发布/订阅组件的详细文档
组件格式 要配置 Solace-AMQP 发布/订阅组件,请创建一个类型为 pubsub.solace.amqp
的组件。请参考 发布/订阅代理组件文件 了解 ConsumerID 的自动生成方式。参阅 操作指南:发布和订阅指南 以获取创建和应用发布/订阅配置的步骤。
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : solace
spec :
type : pubsub.solace.amqp
version : v1
metadata :
- name : url
value : 'amqp://localhost:5672'
- name : username
value : 'default'
- name : password
value : 'default'
- name : consumerID
value : 'channel1'
警告 上述示例中使用了明文字符串作为密钥。建议使用密钥存储来保护密钥,具体方法请参见
此处 。
规格元数据字段 字段 必需 详情 示例 url Y AMQP 代理的地址。可以使用 secretKeyRef
引用密钥。 使用 amqp://
URI 方案进行非 TLS 通信。 使用 amqps://
URI 方案进行 TLS 通信。 "amqp://host.domain[:port]"
username Y 连接到代理的用户名。仅在未启用匿名连接或设置为 false
时需要。 default
password Y 连接到代理的密码。仅在未启用匿名连接或设置为 false
时需要。 default
consumerID N 消费者 ID(消费者标签)用于将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID
,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID
) 值。 可以设置为字符串值(如上例中的 "channel1"
)或字符串格式值(如 "{podName}"
等)。查看您可以在组件元数据中使用的所有模板标签。 anonymous N 在不进行凭证验证的情况下连接到代理。仅在代理上启用时有效。如果设置为 true
,则不需要用户名和密码。 true
caCert 使用 TLS 时必需 用于验证服务器 TLS 证书的 PEM 格式的证书颁发机构 (CA) 证书。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert 使用 TLS 时必需 PEM 格式的 TLS 客户端证书。必须与 clientKey
一起使用。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey 使用 TLS 时必需 PEM 格式的 TLS 客户端密钥。必须与 clientCert
一起使用。可以使用 secretKeyRef
引用密钥。 "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
使用 TLS 进行通信 要配置使用 TLS 进行通信:
确保 Solace 代理已配置为支持证书。 在组件配置中提供 caCert
、clientCert
和 clientKey
元数据。 例如:
apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : solace
spec :
type : pubsub.solace.amqp
version : v1
metadata :
- name : url
value : "amqps://host.domain[:port]"
- name : username
value : 'default'
- name : password
value : 'default'
- name : caCert
value : ${{ myLoadedCACert }}
- name : clientCert
value : ${{ myLoadedClientCert }}
- name : clientKey
secretKeyRef :
name : mySolaceClientKey
key : mySolaceClientKey
auth :
secretStore : <SECRET_STORE_NAME>
虽然 caCert
和 clientCert
的值可能不是密钥,但为了方便起见,它们也可以从 Dapr 密钥存储中引用。
发布/订阅主题和队列 默认情况下,消息通过主题发布和订阅。如果您希望目标是队列,请在主题前加上 queue:
前缀,Solace AMQP 组件将连接到队列。
创建 Solace 代理 您可以使用 Docker 本地运行 Solace 代理 :
docker run -d -p 8080:8080 -p 55554:55555 -p 8008:8008 -p 1883:1883 -p 8000:8000 -p 5672:5672 -p 9000:9000 -p 2222:2222 --shm-size= 2g --env username_admin_globalaccesslevel = admin --env username_admin_password = admin --name= solace solace/solace-pubsub-standard
然后您可以使用客户端端口与服务器交互:mqtt://localhost:5672
您还可以在 Solace Cloud 上注册一个免费的 SaaS 代理。
相关链接 16 - 内存 关于内存 pubsub 组件的详细文档
内存 pub/sub 组件运行在单个 Dapr sidecar 中。这主要用于开发目的。状态不会在多个 sidecar 之间复制,并且在 Dapr sidecar 重启时会丢失。
组件格式 apiVersion : dapr.io/v1alpha1
kind : Component
metadata :
name : pubsub
spec :
type : pubsub.in-memory
version : v1
metadata : []
注意:内存组件不需要特定的元数据即可工作,但 spec.metadata 是必填字段。
相关链接