要设置 Apache Kafka 的发布/订阅功能,您需要创建一个类型为 pubsub.kafka
的组件。请参阅 pub/sub broker 组件文件 了解 ConsumerID 是如何自动生成的。阅读 如何:发布和订阅指南 了解如何创建和应用发布/订阅配置。
所有组件的元数据字段值可以使用 模板化的元数据值,这些值会在 Dapr sidecar 启动时解析。例如,您可以选择使用 {namespace}
作为 consumerGroup
,以便在不同命名空间中使用相同的 appId
和主题,如 本文 所述。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "{namespace}"
- name: consumerID # 可选字段。如果未提供,运行时将自动创建。
value: "channel1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "password"
- name: saslUsername # 如果 authType 是 `password`,则必需。
value: "adminuser"
- name: saslPassword # 如果 authType 是 `password`,则必需。
secretKeyRef:
name: kafka-secrets
key: saslPasswordSecret
- name: saslMechanism
value: "SHA-512"
- name: maxMessageBytes # 可选字段。
value: 1024
- name: consumeRetryInterval # 可选字段。
value: 200ms
- name: heartbeatInterval # 可选字段。
value: 5s
- name: sessionTimeout # 可选字段。
value: 15s
- name: version # 可选字段。
value: 2.0.0
- name: disableTls # 可选字段。禁用 TLS。在生产环境中不安全!请阅读 `Mutual TLS` 部分以了解如何使用 TLS。
value: "true"
- name: consumerFetchMin # 可选字段。高级设置。请求中要获取的最小消息字节数 - broker 将等待直到至少有这么多可用。
value: 1
- name: consumerFetchDefault # 可选字段。高级设置。每个请求中从 broker 获取的默认消息字节数。
value: 2097152
- name: channelBufferSize # 可选字段。高级设置。内部和外部通道中要缓冲的事件数量。
value: 512
- name: schemaRegistryURL # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry URL。
value: http://localhost:8081
- name: schemaRegistryAPIKey # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry API Key。
value: XYAXXAZ
- name: schemaRegistryAPISecret # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Secret。
value: "ABCDEFGMEADFF"
- name: schemaCachingEnabled # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。启用模式缓存。
value: true
- name: schemaLatestVersionCacheTTL # 可选字段。当使用 Schema Registry Avro 序列化/反序列化时。发布具有最新模式的消息时的模式缓存 TTL。
value: 5m
- name: escapeHeaders # 可选字段。
value: false
有关使用
secretKeyRef
的详细信息,请参阅 如何在组件中引用 secrets 的指南。
字段 | 必需 | 详情 | 示例 |
---|---|---|---|
brokers | Y | 逗号分隔的 Kafka brokers 列表。 | "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093" |
consumerGroup | N | 监听的 kafka 消费者组。发布到主题的每条记录都会传递给订阅该主题的每个消费者组中的一个消费者。如果提供了 consumerGroup 的值,则忽略 consumerID 的任何值 - 将为 consumerID 设置消费者组和随机唯一标识符的组合。 | "group1" |
consumerID | N | 消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,消息仅由组中的一个消费者处理一次。如果未提供 consumerID ,Dapr 运行时将其设置为 Dapr 应用程序 ID (appID ) 值。如果提供了 consumerGroup 的值,则忽略 consumerID 的任何值 - 将为 consumerID 设置消费者组和随机唯一标识符的组合。 | 可以设置为字符串值(例如上例中的 "channel1" )或字符串格式值(例如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。 |
clientID | N | 用户提供的字符串,随每个请求发送到 Kafka brokers,用于日志记录、调试和审计。默认为 Kubernetes 模式的 "namespace.appID" 或 Self-Hosted 模式的 "appID" 。 | "my-namespace.my-dapr-app" ,"my-dapr-app" |
authRequired | N | 已弃用 启用 SASL 认证与 Kafka brokers。 | "true" ,"false" |
authType | Y | 配置或禁用认证。支持的值:none ,password ,mtls ,oidc 或 awsiam | "password" ,"none" |
saslUsername | N | 用于认证的 SASL 用户名。仅在 authType 设置为 "password" 时需要。 | "adminuser" |
saslPassword | N | 用于认证的 SASL 密码。可以是 secretKeyRef 以使用 secret 引用。仅在 authType 设置为 "password" 时需要。 | "" ,"KeFg23!" |
saslMechanism | N | 您希望使用的 SASL 认证机制。仅在 authType 设置为 "password" 时需要。默认为 PLAINTEXT | "SHA-512", "SHA-256", "PLAINTEXT" |
initialOffset | N | 如果没有先前提交的偏移量,则使用的初始偏移量。应为 “newest” 或 “oldest”。默认为 “newest”。 | "oldest" |
maxMessageBytes | N | 允许的单个 Kafka 消息的最大字节大小。默认为 1024。 | 2048 |
consumeRetryInterval | N | 尝试消费主题时的重试间隔。将没有后缀的数字视为毫秒。默认为 100ms。 | 200ms |
consumeRetryEnabled | N | 通过设置 "false" 禁用消费重试 | "true" ,"false" |
version | N | Kafka 集群版本。默认为 2.0.0。请注意,如果您使用 Azure EventHubs 和 Kafka,则必须将其设置为 1.0.0 。 | 0.10.2.0 |
caCert | N | 证书颁发机构证书,使用 TLS 时需要。可以是 secretKeyRef 以使用 secret 引用 | "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----" |
clientCert | N | 客户端证书,authType 为 mtls 时需要。可以是 secretKeyRef 以使用 secret 引用 | "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----" |
clientKey | N | 客户端密钥,authType 为 mtls 时需要。可以是 secretKeyRef 以使用 secret 引用 | "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----" |
skipVerify | N | 跳过 TLS 验证,不建议在生产中使用。默认为 "false" | "true" ,"false" |
disableTls | N | 禁用传输安全的 TLS。要禁用,您不需要将值设置为 "true" 。不建议在生产中使用。默认为 "false" 。 | "true" ,"false" |
oidcTokenEndpoint | N | OAuth2 身份提供者访问令牌端点的完整 URL。当 authType 设置为 oidc 时需要 | “https://identity.example.com/v1/token" |
oidcClientID | N | 在身份提供者中配置的 OAuth2 客户端 ID。当 authType 设置为 oidc 时需要 | dapr-kafka |
oidcClientSecret | N | 在身份提供者中配置的 OAuth2 客户端 secret:当 authType 设置为 oidc 时需要 | "KeFg23!" |
oidcScopes | N | 用于请求访问令牌的 OAuth2/OIDC 范围的逗号分隔列表。当 authType 设置为 oidc 时推荐。默认为 "openid" | "openid,kafka-prod" |
oidcExtensions | N | 包含 OAuth2/OIDC 扩展的 JSON 编码字典的字符串,用于请求访问令牌 | {"cluster":"kafka","poolid":"kafkapool"} |
awsRegion | N | 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘region’。Kafka 集群部署到的 AWS 区域。当 authType 设置为 awsiam 时需要 | us-west-1 |
awsAccessKey | N | 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘accessKey’。与 IAM 账户关联的 AWS 访问密钥。 | "accessKey" |
awsSecretKey | N | 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘secretKey’。与访问密钥关联的 secret 密钥。 | "secretKey" |
awsSessionToken | N | 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘sessionToken’。要使用的 AWS 会话令牌。仅在使用临时安全凭证时需要会话令牌。 | "sessionToken" |
awsIamRoleArn | N | 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘assumeRoleArn’。具有访问 AWS 管理的 Apache Kafka (MSK) 的 IAM 角色。这是除 AWS 凭证外的另一种与 MSK 认证的选项。 | "arn:aws:iam::123456789:role/mskRole" |
awsStsSessionName | N | 这保持与现有字段的向后兼容性。它将在 Dapr 1.17 中被弃用。请改用 ‘sessionName’。表示假设角色的会话名称。 | "DaprDefaultSession" |
schemaRegistryURL | N | 使用 Schema Registry Avro 序列化/反序列化时需要。Schema Registry URL。 | http://localhost:8081 |
schemaRegistryAPIKey | N | 使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Key。 | XYAXXAZ |
schemaRegistryAPISecret | N | 使用 Schema Registry Avro 序列化/反序列化时。Schema Registry 凭证 API Secret。 | ABCDEFGMEADFF |
schemaCachingEnabled | N | 使用 Schema Registry Avro 序列化/反序列化时。启用模式缓存。默认为 true | true |
schemaLatestVersionCacheTTL | N | 使用 Schema Registry Avro 序列化/反序列化时。发布具有最新模式的消息时的模式缓存 TTL。默认为 5 分钟 | 5m |
clientConnectionTopicMetadataRefreshInterval | N | 客户端连接的主题元数据与 broker 刷新的间隔,以 Go 持续时间表示。默认为 9m 。 | "4m" |
clientConnectionKeepAliveInterval | N | 客户端连接与 broker 保持活动的最长时间,以 Go 持续时间表示,然后关闭连接。零值(默认)表示无限期保持活动。 | "4m" |
consumerFetchMin | N | 请求中要获取的最小消息字节数 - broker 将等待直到至少有这么多可用。默认值为 1 ,因为 0 会导致消费者在没有消息可用时旋转。相当于 JVM 的 fetch.min.bytes 。 | "2" |
consumerFetchDefault | N | 每个请求中从 broker 获取的默认消息字节数。默认值为 "1048576" 字节。 | "2097152" |
channelBufferSize | N | 内部和外部通道中要缓冲的事件数量。这允许生产者和消费者在用户代码工作时继续在后台处理一些消息,从而大大提高吞吐量。默认为 256 。 | "512" |
heartbeatInterval | N | 向消费者协调器发送心跳的间隔。最多应将值设置为 sessionTimeout 值的 1/3。默认为 “3s”。 | "5s" |
sessionTimeout | N | 使用 Kafka 的组管理功能时用于检测客户端故障的超时时间。如果 broker 在此会话超时之前未收到任何来自消费者的心跳,则消费者将被移除并启动重新平衡。默认为 “10s”。 | "20s" |
escapeHeaders | N | 启用对消费者接收到的消息头值的 URL 转义。允许接收通常不允许在 HTTP 头中使用的特殊字符内容。默认为 false 。 | true |
上面的 secretKeyRef
引用了一个 kubernetes secrets store 以访问 tls 信息。访问 此处 了解有关如何配置 secret store 组件的更多信息。
使用 Azure EventHubs 和 Kafka 时,元数据 version
必须设置为 1.0.0
。
Kafka 支持多种认证方案,Dapr 支持几种:SASL 密码、mTLS、OIDC/OAuth2。随着添加的认证方法,authRequired
字段已从 v1.6 版本中弃用,取而代之的是 authType
字段。如果 authRequired
设置为 true
,Dapr 将尝试根据 saslPassword
的值正确配置 authType
。authType
的有效值为:
none
password
certificate
mtls
oidc
awsiam
authType
仅用于 认证。授权 仍在 Kafka 内配置,除了 awsiam
,它还可以驱动在 AWS IAM 中配置的授权决策。将 authType
设置为 none
将禁用任何认证。这在生产中不推荐。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-noauth
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "none"
- name: maxMessageBytes # 可选字段。
value: 1024
- name: consumeRetryInterval # 可选字段。
value: 200ms
- name: heartbeatInterval # 可选字段。
value: 5s
- name: sessionTimeout # 可选字段。
value: 15s
- name: version # 可选字段。
value: 0.10.2.0
- name: disableTls
value: "true"
将 authType
设置为 password
启用 SASL 认证。这需要设置 saslUsername
和 saslPassword
字段。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-sasl
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "password"
- name: saslUsername # 如果 authType 是 `password`,则必需。
value: "adminuser"
- name: saslPassword # 如果 authType 是 `password`,则必需。
secretKeyRef:
name: kafka-secrets
key: saslPasswordSecret
- name: saslMechanism
value: "SHA-512"
- name: maxMessageBytes # 可选字段。
value: 1024
- name: consumeRetryInterval # 可选字段。
value: 200ms
- name: heartbeatInterval # 可选字段。
value: 5s
- name: sessionTimeout # 可选字段。
value: 15s
- name: version # 可选字段。
value: 0.10.2.0
- name: caCert
secretKeyRef:
name: kafka-tls
key: caCert
将 authType
设置为 mtls
使用 x509 客户端证书(clientCert
字段)和密钥(clientKey
字段)进行认证。请注意,mTLS 作为认证机制与通过加密保护传输层的 TLS 使用是不同的。mTLS 需要 TLS 传输(意味着 disableTls
必须为 false
),但保护传输层不需要使用 mTLS。请参阅 使用 TLS 进行通信 以配置底层 TLS 传输。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-mtls
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "mtls"
- name: caCert
secretKeyRef:
name: kafka-tls
key: caCert
- name: clientCert
secretKeyRef:
name: kafka-tls
key: clientCert
- name: clientKey
secretKeyRef:
name: kafka-tls
key: clientKey
- name: maxMessageBytes # 可选字段。
value: 1024
- name: consumeRetryInterval # 可选字段。
value: 200ms
- name: heartbeatInterval # 可选字段。
value: 5s
- name: sessionTimeout # 可选字段。
value: 15s
- name: version # 可选字段。
value: 0.10.2.0
将 authType
设置为 oidc
启用通过 OAUTHBEARER 机制的 SASL 认证。这支持从外部 OAuth2 或 OIDC 身份提供者指定一个持有者令牌。目前,仅支持 client_credentials 授权。
配置 oidcTokenEndpoint
为身份提供者访问令牌端点的完整 URL。
设置 oidcClientID
和 oidcClientSecret
为在身份提供者中配置的客户端凭证。
如果在组件配置中指定了 caCert
,则证书将附加到系统 CA 信任中以验证身份提供者证书。同样,如果在组件配置中指定了 skipVerify
,则在访问身份提供者时也将跳过验证。
默认情况下,令牌请求的唯一范围是 openid
;强烈建议通过 oidcScopes
以逗号分隔的列表指定其他范围,并由 Kafka broker 验证。如果不使用其他范围来缩小访问令牌的有效性,
被破坏的 Kafka broker 可能会重放令牌以访问其他服务作为 Dapr clientID。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "oidc"
- name: oidcTokenEndpoint # 如果 authType 是 `oidc`,则必需。
value: "https://identity.example.com/v1/token"
- name: oidcClientID # 如果 authType 是 `oidc`,则必需。
value: "dapr-myapp"
- name: oidcClientSecret # 如果 authType 是 `oidc`,则必需。
secretKeyRef:
name: kafka-secrets
key: oidcClientSecret
- name: oidcScopes # 如果 authType 是 `oidc`,则推荐。
value: "openid,kafka-dev"
- name: caCert # 也应用于验证 OIDC 提供者证书
secretKeyRef:
name: kafka-tls
key: caCert
- name: maxMessageBytes # 可选字段。
value: 1024
- name: consumeRetryInterval # 可选字段。
value: 200ms
- name: heartbeatInterval # 可选字段。
value: 5s
- name: sessionTimeout # 可选字段。
value: 15s
- name: version # 可选字段。
value: 0.10.2.0
支持使用 MSK 进行 AWS IAM 认证。将 authType
设置为 awsiam
使用 AWS SDK 生成认证令牌进行认证。
region
。如果没有提供 acessKey
和 secretKey
,您可以使用 AWS IAM 角色为服务账户提供无密码认证到您的 Kafka 集群。apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-awsiam
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "awsiam"
- name: region # 必需字段。
value: "us-west-1"
- name: accessKey # 可选字段。
value: <AWS_ACCESS_KEY>
- name: secretKey # 可选字段。
value: <AWS_SECRET_KEY>
- name: sessionToken # 可选字段。
value: <AWS_SESSION_KEY>
- name: assumeRoleArn # 可选字段。
value: "arn:aws:iam::123456789:role/mskRole"
- name: sessionName # 可选字段。
value: "DaprDefaultSession"
默认情况下,启用 TLS 以保护到 Kafka 的传输层。要禁用 TLS,请将 disableTls
设置为 true
。启用 TLS 时,您可以
使用 skipVerify
控制服务器证书验证以禁用验证(不推荐在生产环境中使用)和 caCert
指定受信任的 TLS 证书颁发机构(CA)。如果没有指定 caCert
,将使用系统 CA 信任。要配置 mTLS 认证,
请参阅 认证 部分。
下面是一个配置为使用传输层 TLS 的 Kafka pubsub 组件示例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "certificate"
- name: consumeRetryInterval # 可选字段。
value: 200ms
- name: heartbeatInterval # 可选字段。
value: 5s
- name: sessionTimeout # 可选字段。
value: 15s
- name: version # 可选字段。
value: 0.10.2.0
- name: maxMessageBytes # 可选字段。
value: 1024
- name: caCert # 证书颁发机构证书。
secretKeyRef:
name: kafka-tls
key: caCert
auth:
secretStore: <SECRET_STORE_NAME>
当使用单个 pub/sub 组件从多个主题消费时,无法保证您的消费者组中的消费者如何在主题分区之间平衡。
例如,假设您订阅了两个主题,每个主题有 10 个分区,并且您有 20 个服务副本从这两个主题消费。无法保证 10 个将分配给第一个主题,10 个将分配给第二个主题。相反,分区可能会不均匀地划分,超过 10 个分配给第一个主题,其余分配给第二个主题。
这可能导致第一个主题的消费者空闲,而第二个主题的消费者过度扩展,反之亦然。当使用自动缩放器(如 HPA 或 KEDA)时,也可以观察到这种行为。
如果您遇到此特定问题,建议您为每个主题配置一个单独的 pub/sub 组件,并为每个组件定义唯一的消费者组。这可以确保您的服务的所有副本都完全分配给唯一的消费者组,其中每个消费者组针对一个特定主题。
例如,您可以定义两个 Dapr 组件,具有以下配置:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-topic-one
spec:
type: pubsub.kafka
version: v1
metadata:
- name: consumerGroup
value: "{appID}-topic-one"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-topic-two
spec:
type: pubsub.kafka
version: v1
metadata:
- name: consumerGroup
value: "{appID}-topic-two"
Apache Kafka 组件支持使用批量 Pub/sub API 在单个操作中发送和接收多条消息。
订阅主题时,您可以配置 bulkSubscribe
选项。有关更多详细信息,请参阅 批量订阅消息。了解更多关于 批量订阅 API 的信息。
Apache Kafka 支持以下批量元数据选项:
配置 | 默认值 |
---|---|
maxBulkAwaitDurationMs | 10000 (10s) |
maxBulkSubCount | 80 |
调用 Kafka pub/sub 时,可以通过在请求 URL 中使用 metadata
查询参数提供可选的分区键。
参数名称可以是 partitionKey
或 __key
示例:
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
所有其他元数据键/值对(不是 partitionKey
或 __key
)都设置为 Kafka 消息中的头。以下是为消息设置 correlationId
的示例。
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
消费消息时,特殊消息元数据会自动作为头传递。这些是:
__key
:如果可用,消息键__topic
:消息的主题__partition
:消息的分区号__offset
:消息在分区中的偏移量__timestamp
:消息的时间戳您可以在消费者端点中访问它们,如下所示:
from fastapi import APIRouter, Body, Response, status
import json
import sys
app = FastAPI()
router = APIRouter()
@router.get('/dapr/subscribe')
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'my-topic',
'route': 'my_topic_subscriber',
}]
return subscriptions
@router.post('/my_topic_subscriber')
def my_topic_subscriber(
key: Annotated[str, Header(alias="__key")],
offset: Annotated[int, Header(alias="__offset")],
event_data=Body()):
print(f"key={key} - offset={offset} - data={event_data}", flush=True)
return Response(status_code=status.HTTP_200_OK)
app.include_router(router)
消费者应用程序可能需要接收包含特殊字符的消息头,这可能会导致 HTTP 协议验证错误。
HTTP 头值必须遵循规范,使得某些字符不被允许。了解更多关于协议的信息。
在这种情况下,您可以启用 escapeHeaders
配置设置,该设置使用 URL 转义在消费者端对头值进行编码。
将 escapeHeaders
设置为 true
以进行 URL 转义。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-escape-headers
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # 必需字段,Kafka broker 连接设置
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # 可选字段,用于输入绑定。
value: "group1"
- name: clientID # 可选字段,用于 Kafka brokers 的客户端跟踪 ID。
value: "my-dapr-app-id"
- name: authType # 必需字段。
value: "none"
- name: escapeHeaders
value: "true"
您可以配置 pub/sub 以使用 Avro 二进制序列化 发布或消费数据,利用 Apache Schema Registry(例如,Confluent Schema Registry,Apicurio)。
rawPayload=true
元数据。
请注意,消费者不应设置 rawPayload=true
,因为消息值将被包装到 CloudEvent 中并进行 base64 编码。将 rawPayload
保持为默认值(即 false
)将以 JSON 负载的形式将 Avro 解码的消息发送到应用程序。配置 Kafka pub/sub 组件元数据时,您必须定义:
模式主题是根据主题名称自动派生的,使用标准命名约定。例如,对于名为 my-topic
的主题,模式主题将是 my-topic-value
。
在服务中与消息负载交互时,它是 JSON 格式。负载在 Dapr 组件中透明地序列化/反序列化。
日期/日期时间字段必须作为其 Epoch Unix 时间戳 等效值传递(而不是典型的 Iso8601)。例如:
2024-01-10T04:36:05.986Z
应传递为 1704861365986
(自 1970 年 1 月 1 日以来的毫秒数)2024-01-10
应传递为 19732
(自 1970 年 1 月 1 日以来的天数)为了向 Kafka pub/sub 组件指示消息应使用 Avro 序列化,必须在 metadata
中设置 valueSchemaType
为 Avro
。
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
from dapr.clients import DaprClient
with DaprClient() as d:
req_data = {
'order_number': '345',
'created_date': 1704861365986
}
# 创建一个带有内容类型和主体的类型化消息
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='my-topic',
data=json.dumps(req_data),
publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
)
# 打印请求
print(req_data, flush=True)
为了向 Kafka pub/sub 组件指示消息应使用 Avro 进行反序列化,必须在订阅元数据中设置 valueSchemaType
为 Avro
。
from fastapi import APIRouter, Body, Response, status
import json
import sys
app = FastAPI()
router = APIRouter()
@router.get('/dapr/subscribe')
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'my-topic',
'route': 'my_topic_subscriber',
'metadata': {
'valueSchemaType': 'Avro',
} }]
return subscriptions
@router.post('/my_topic_subscriber')
def my_topic_subscriber(event_data=Body()):
print(event_data, flush=True)
return Response(status_code=status.HTTP_200_OK)
app.include_router(router)
要在 Kubernetes 上运行 Kafka,您可以使用任何 Kafka operator,例如 Strimzi。