Each pub/sub component has its own built-in retry behaviors, unique to the message broker solution and unrelated to Dapr. Before explicity applying a Dapr resiliency policy, make sure you understand the implicit retry policy of the pub/sub component you’re using. Instead of overriding these built-in retries, Dapr resiliency augments them, which can cause repetitive clustering of messages.
<tr>
<td><a href="/reference/components-reference/supported-pubsub/setup-azure-eventhubs/">Azure Event Hubs</a>
</td>
<td>Stable</td>
<td>v1</td>
<td>1.8</td>
</tr>
<tr>
<td><a href="/reference/components-reference/supported-pubsub/setup-azure-servicebus-queues/">Azure Service Bus Queues</a>
</td>
<td>Beta</td>
<td>v1</td>
<td>1.10</td>
</tr>
<tr>
<td><a href="/reference/components-reference/supported-pubsub/setup-azure-servicebus-topics/">Azure Service Bus Topics</a>
</td>
<td>Stable</td>
<td>v1</td>
<td>1.0</td>
</tr>
1 - Apache Kafka
Detailed documentation on the Apache Kafka pubsub component
Component format
To set up Apache Kafka pub/sub, create a component of type pubsub.kafka. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
All component metadata field values can carry templated metadata values, which are resolved on Dapr sidecar startup.
For example, you can choose to use {namespace} as the consumerGroup to enable using the same appId in different namespaces using the same topics as described in this article.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsubspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"{namespace}"- name:consumerID# Optional. If not supplied, runtime will create one.value:"channel1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"password"- name:saslUsername# Required if authType is `password`.value:"adminuser"- name:saslPassword# Required if authType is `password`.secretKeyRef:name:kafka-secretskey:saslPasswordSecret- name:saslMechanismvalue:"SHA-512"- name:maxMessageBytes# Optional.value:1024- name:consumeRetryInterval# Optional.value:200ms- name:heartbeatInterval# Optional.value:5s- name:sessionTimeout# Optional.value:15s- name:version# Optional.value:2.0.0- name:disableTls# Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.value:"true"- name:consumerFetchMin# Optional. Advanced setting. The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.value:1- name:consumerFetchDefault# Optional. Advanced setting. The default number of message bytes to fetch from the broker in each request.value:2097152- name:channelBufferSize# Optional. Advanced setting. The number of events to buffer in internal and external channels.value:512- name:consumerGroupRebalanceStrategy# Optional. Advanced setting. The strategy to use for consumer group rebalancing.value:sticky- name:schemaRegistryURL# Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.value:http://localhost:8081- name:schemaRegistryAPIKey# Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.value:XYAXXAZ- name:schemaRegistryAPISecret# Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.value:"ABCDEFGMEADFF"- name:schemaCachingEnabled# Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.value:true- name:schemaLatestVersionCacheTTL# Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.value:5m- name:useAvroJson# Optional. Enables Avro JSON schema for serialization as opposed to Standard JSON default. Only applicable when the subscription uses valueSchemaType=Avrovalue:"true"- name:escapeHeaders# Optional.value:false
A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. If a value for consumerGroup is provided, any value for consumerID is ignored - a combination of the consumer group and a random unique identifier will be set for the consumerID instead.
"group1"
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. If a value for consumerGroup is provided, any value for consumerID is ignored - a combination of the consumer group and a random unique identifier will be set for the consumerID instead.
A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to "namespace.appID" for Kubernetes mode or "appID" for Self-Hosted mode.
"my-namespace.my-dapr-app", "my-dapr-app"
authRequired
N
Deprecated Enable SASL authentication with the Kafka brokers.
"true", "false"
authType
Y
Configure or disable authentication. Supported values: none, password, mtls, oidc or awsiam
"password", "none"
saslUsername
N
The SASL username used for authentication. Only required if authType is set to "password".
"adminuser"
saslPassword
N
The SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to “password”`.
"", "KeFg23!"
saslMechanism
N
The SASL Authentication Mechanism you wish to use. Only required if authType is set to "password". Defaults to PLAINTEXT
"SHA-512", "SHA-256", "PLAINTEXT"
initialOffset
N
The initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”.
"oldest"
maxMessageBytes
N
The maximum size in bytes allowed for a single Kafka message. Defaults to 1024.
2048
consumeRetryInterval
N
The interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to 100ms.
200ms
consumeRetryEnabled
N
Disable consume retry by setting "false"
"true", "false"
version
N
Kafka cluster version. Defaults to 2.0.0. Note that this must be set to 1.0.0 if you are using Azure EventHubs with Kafka.
0.10.2.0
caCert
N
Certificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference
Skip TLS verification, this is not recommended for use in production. Defaults to "false"
"true", "false"
disableTls
N
Disable TLS for transport security. To disable, you’re not required to set value to "true". This is not recommended for use in production. Defaults to "false".
"true", "false"
oidcTokenEndpoint
N
Full URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidc
The OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidc
dapr-kafka
oidcClientSecret
N
The OAuth2 client secret that has been provisioned in the identity provider: Required when authType is set to oidc
"KeFg23!"
oidcScopes
N
Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to "openid"
"openid,kafka-prod"
oidcExtensions
N
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token
{"cluster":"kafka","poolid":"kafkapool"}
awsRegion
N
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘region’ instead. The AWS region where the Kafka cluster is deployed to. Required when authType is set to awsiam
us-west-1
awsAccessKey
N
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘accessKey’ instead. AWS access key associated with an IAM account.
"accessKey"
awsSecretKey
N
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘secretKey’ instead. The secret key associated with the access key.
"secretKey"
awsSessionToken
N
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘sessionToken’ instead. AWS session token to use. A session token is only required if you are using temporary security credentials.
"sessionToken"
awsIamRoleArn
N
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘assumeRoleArn’ instead. IAM role that has access to AWS Managed Streaming for Apache Kafka (MSK). This is another option to authenticate with MSK aside from the AWS Credentials.
"arn:aws:iam::123456789:role/mskRole"
awsStsSessionName
N
This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘sessionName’ instead. Represents the session name for assuming a role.
"DaprDefaultSession"
schemaRegistryURL
N
Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
http://localhost:8081
schemaRegistryAPIKey
N
When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key.
XYAXXAZ
schemaRegistryAPISecret
N
When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
ABCDEFGMEADFF
schemaCachingEnabled
N
When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is true
true
schemaLatestVersionCacheTTL
N
When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min
5m
useAvroJson
N
Enables Avro JSON schema for serialization as opposed to Standard JSON default. Only applicable when the subscription uses valueSchemaType=Avro. Default is "false"
"true"
clientConnectionTopicMetadataRefreshInterval
N
The interval for the client connection’s topic metadata to be refreshed with the broker as a Go duration. Defaults to 9m.
"4m"
clientConnectionKeepAliveInterval
N
The maximum time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely.
"4m"
consumerFetchMin
N
The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available. The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM’s fetch.min.bytes.
"2"
consumerFetchDefault
N
The default number of message bytes to fetch from the broker in each request. Default is "1048576" bytes.
"2097152"
channelBufferSize
N
The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughput. Defaults to 256.
"512"
heartbeatInterval
N
The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the sessionTimeout value. Defaults to “3s”.
"5s"
sessionTimeout
N
The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to “10s”.
"20s"
consumerGroupRebalanceStrategy
N
The strategy to use for consumer group rebalancing. Supported values: range, sticky, roundrobin. Default is range
"sticky"
escapeHeaders
N
Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is false.
true
The secretKeyRef above is referencing a kubernetes secrets store to access the tls information. Visit here to learn more about how to configure a secret store component.
Note
The metadata version must be set to 1.0.0 when using Azure EventHubs with Kafka.
Authentication
Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the authRequired field has
been deprecated from the v1.6 release and instead the authType field should be used. If authRequired is set to true, Dapr will attempt to configure authType correctly
based on the value of saslPassword. The valid values for authType are:
none
password
certificate
mtls
oidc
awsiam
Note
authType is authentication only. Authorization is still configured within Kafka, except for awsiam, which can also drive authorization decisions configured in AWS IAM.
None
Setting authType to none will disable any authentication. This is NOT recommended in production.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsub-noauthspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"none"- name:maxMessageBytes# Optional.value:1024- name:consumeRetryInterval# Optional.value:200ms- name:heartbeatInterval# Optional.value:5s- name:sessionTimeout# Optional.value:15s- name:version# Optional.value:0.10.2.0- name:disableTlsvalue:"true"
SASL Password
Setting authType to password enables SASL authentication. This requires setting the saslUsername and saslPassword fields.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsub-saslspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"password"- name:saslUsername# Required if authType is `password`.value:"adminuser"- name:saslPassword# Required if authType is `password`.secretKeyRef:name:kafka-secretskey:saslPasswordSecret- name:saslMechanismvalue:"SHA-512"- name:maxMessageBytes# Optional.value:1024- name:consumeRetryInterval# Optional.value:200ms- name:heartbeatInterval# Optional.value:5s- name:sessionTimeout# Optional.value:15s- name:version# Optional.value:0.10.2.0- name:caCertsecretKeyRef:name:kafka-tlskey:caCert
Mutual TLS
Setting authType to mtls uses a x509 client certificate (the clientCert field) and key (the clientKey field) to authenticate. Note that mTLS as an
authentication mechanism is distinct from using TLS to secure the transport layer via encryption. mTLS requires TLS transport (meaning disableTls must be false), but securing
the transport layer does not require using mTLS. See Communication using TLS for configuring underlying TLS transport.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsub-mtlsspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"mtls"- name:caCertsecretKeyRef:name:kafka-tlskey:caCert- name:clientCertsecretKeyRef:name:kafka-tlskey:clientCert- name:clientKeysecretKeyRef:name:kafka-tlskey:clientKey- name:maxMessageBytes# Optional.value:1024- name:consumeRetryInterval# Optional.value:200ms- name:heartbeatInterval# Optional.value:5s- name:sessionTimeout# Optional.value:15s- name:version# Optional.value:0.10.2.0
OAuth2 or OpenID Connect
Setting authType to oidc enables SASL authentication via the OAUTHBEARER mechanism. This supports specifying a bearer token from an external OAuth2 or OIDC identity provider. Currently, only the client_credentials grant is supported.
Configure oidcTokenEndpoint to the full URL for the identity provider access token endpoint.
Set oidcClientID and oidcClientSecret to the client credentials provisioned in the identity provider.
If caCert is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if skipVerify is specified in the component configuration, verification will also be skipped when accessing the identity provider.
By default, the only scope requested for the token is openid; it is highly recommended that additional scopes be specified via oidcScopes in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token,
a compromised Kafka broker could replay the token to access other services as the Dapr clientID.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsubspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"oidc"- name:oidcTokenEndpoint# Required if authType is `oidc`.value:"https://identity.example.com/v1/token"- name:oidcClientID # Required if authType is `oidc`.value:"dapr-myapp"- name:oidcClientSecret # Required if authType is `oidc`.secretKeyRef:name:kafka-secretskey:oidcClientSecret- name:oidcScopes # Recommended if authType is `oidc`.value:"openid,kafka-dev"- name:caCert # Also applied to verifying OIDC provider certificatesecretKeyRef:name:kafka-tlskey:caCert- name:maxMessageBytes# Optional.value:1024- name:consumeRetryInterval# Optional.value:200ms- name:heartbeatInterval# Optional.value:5s- name:sessionTimeout# Optional.value:15s- name:version# Optional.value:0.10.2.0
AWS IAM
Authenticating with AWS IAM is supported with MSK. Setting authType to awsiam uses AWS SDK to generate auth tokens to authenticate.
Note
The only required metadata field is region. If no acessKey and secretKey are provided, you can use AWS IAM roles for service accounts to have password-less authentication to your Kafka cluster.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsub-awsiamspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"awsiam"- name:region# Required.value:"us-west-1"- name:accessKey# Optional.value:<AWS_ACCESS_KEY>- name:secretKey# Optional.value:<AWS_SECRET_KEY>- name:sessionToken# Optional.value:<AWS_SESSION_KEY>- name:assumeRoleArn# Optional.value:"arn:aws:iam::123456789:role/mskRole"- name:sessionName# Optional.value:"DaprDefaultSession"
Communication using TLS
By default TLS is enabled to secure the transport layer to Kafka. To disable TLS, set disableTls to true. When TLS is enabled, you can
control server certificate verification using skipVerify to disable verification (NOT recommended in production environments) and caCert to
specify a trusted TLS certificate authority (CA). If no caCert is specified, the system CA trust will be used. To also configure mTLS authentication,
see the section under Authentication.
Below is an example of a Kafka pubsub component configured to use transport layer TLS:
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsubspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"certificate"- name:consumeRetryInterval# Optional.value:200ms- name:heartbeatInterval# Optional.value:5s- name:sessionTimeout# Optional.value:15s- name:version# Optional.value:0.10.2.0- name:maxMessageBytes# Optional.value:1024- name:caCert# Certificate authority certificate.secretKeyRef:name:kafka-tlskey:caCertauth:secretStore:<SECRET_STORE_NAME>
Consuming from multiple topics
When consuming from multiple topics using a single pub/sub component, there is no guarantee about how the consumers in your consumer group are balanced across the topic partitions.
For instance, let’s say you are subscribing to two topics with 10 partitions per topic and you have 20 replicas of your service consuming from the two topics. There is no guarantee that 10 will be assigned to the first topic and 10 to the second topic. Instead, the partitions could be divided unequally, with more than 10 assigned to the first topic and the rest assigned to the second topic.
This can result in idle consumers listening to the first topic and over-extended consumers on the second topic, or vice versa. This same behavior can be observed when using auto-scalers such as HPA or KEDA.
If you run into this particular issue, it is recommended that you configure a single pub/sub component per topic with uniquely defined consumer groups per component. This guarantees that all replicas of your service are fully allocated to the unique consumer group, where each consumer group targets one specific topic.
For example, you may define two Dapr components with the following configuration:
All other metadata key/value pairs (that are not partitionKey or __key) are set as headers in the Kafka message. Here is an example setting a correlationId for the message.
The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
HTTP header values must follow specifications, making some characters not allowed. Learn more about the protocols.
In this case, you can enable escapeHeaders configuration setting, which uses URL escaping to encode header values on the consumer side.
Note
When using this setting, the received message headers are URL escaped, and you need to URL “un-escape” it to get the original value.
Set escapeHeaders to true to URL escape.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:kafka-pubsub-escape-headersspec:type:pubsub.kafkaversion:v1metadata:- name:brokers# Required. Kafka broker connection settingvalue:"dapr-kafka.myapp.svc.cluster.local:9092"- name:consumerGroup# Optional. Used for input bindings.value:"group1"- name:clientID# Optional. Used as client tracing ID by Kafka brokers.value:"my-dapr-app-id"- name:authType# Required.value:"none"- name:escapeHeadersvalue:"true"
Currently, only message value serialization/deserialization is supported. Since cloud events are not supported, the rawPayload=true metadata must be passed when publishing Avro messages.
Please note that rawPayload=true should NOT be set for consumers, as the message value will be wrapped into a CloudEvent and base64-encoded. Leaving rawPayload as default (i.e. false) will send the Avro-decoded message to the application as a JSON payload.
When setting the useAvroJson component metadata to true, the inbound/outbound Avro binary is converted into/from Avro JSON encoding.
This can be preferable when accurate type mapping is desirable.
The default is standard JSON which is typically easier to bind to a native type in an application.
When configuring the Kafka pub/sub component metadata, you must define:
The schema registry URL
The API key/secret, if applicable
Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named my-topic, the schema subject will be my-topic-value.
When interacting with the message payload within the service, it is in JSON format. The payload is transparently serialized/deserialized within the Dapr component.
Date/Datetime fields must be passed as their Epoch Unix timestamp equivalent (rather than typical Iso8601). For example:
2024-01-10T04:36:05.986Z should be passed as 1704861365986 (the number of milliseconds since Jan 1st, 1970)
2024-01-10 should be passed as 19732 (the number of days since Jan 1st, 1970)
Publishing Avro messages
In order to indicate to the Kafka pub/sub component that the message should be using Avro serialization, the valueSchemaType metadata must be set to Avro.
fromdapr.clientsimportDaprClientwithDaprClient()asd:req_data={'order_number':'345','created_date':1704861365986}# Create a typed message with content type and bodyresp=d.publish_event(pubsub_name='pubsub',topic_name='my-topic',data=json.dumps(req_data),publish_metadata={'rawPayload':'true','valueSchemaType':'Avro'})# Print the requestprint(req_data,flush=True)
Subscribing to Avro topics
In order to indicate to the Kafka pub/sub component that the message should be deserialized using Avro, the valueSchemaType metadata must be set to Avro in the subscription metadata.
In Kafka, rebalancing strategies determine how partitions are assigned to consumers within a consumer group. The default strategy is “range”, but “roundrobin” and “sticky” are also available.
Range:
Partitions are assigned to consumers based on their lexicographical order.
If you have three partitions (0, 1, 2) and two consumers (A, B), consumer A might get partitions 0 and 1, while consumer B gets partition 2.
RoundRobin:
Partitions are assigned to consumers in a round-robin fashion.
With the same example above, consumer A might get partitions 0 and 2, while consumer B gets partition 1.
Sticky:
This strategy aims to preserve previous assignments as much as possible while still maintaining a balanced distribution.
If a consumer leaves or joins the group, only the affected partitions are reassigned, minimizing disruption.
Choosing a Strategy:
Range:
Simple to understand and implement, but can lead to uneven distribution if partition sizes vary significantly.
RoundRobin:
Provides a good balance in many cases, but might not be optimal if message keys are unevenly distributed.
Sticky:
Generally preferred for its ability to minimize disruption during rebalances, especially when dealing with a large number of partitions or frequent consumer group changes.
Create a Kafka instance
You can run Kafka locally using this Docker image.
To run without Docker, see the getting started guide here.
To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.
ID of the AWS account/role with appropriate permissions to SNS and SQS (see below)
"AKIAIOSFODNN7EXAMPLE"
secretKey
Y
Secret for the AWS user/role. If using an AssumeRole access, you will also need to provide a sessionToken
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
region
Y
The AWS region where the SNS/SQS assets are located or be created in. See this page for valid regions. Ensure that SNS and SQS are available in that region
"us-east-1"
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. See the pub/sub broker component file to learn how ConsumerID is automatically generated.
AWS endpoint for the component to use. Only used for local development with, for example, localstack. The endpoint is unnecessary when running against production AWS
"http://localhost:4566"
sessionToken
N
AWS session token to use. A session token is only required if you are using temporary security credentials
"TOKEN"
messageReceiveLimit
N
Number of times a message is received, after processing of that message fails, that once reached, results in removing of that message from the queue. If sqsDeadLettersQueueName is specified, messageReceiveLimit is the number of times a message is received, after processing of that message fails, that once reached, results in moving of the message to the SQS dead-letters queue. Default: 10
10
sqsDeadLettersQueueName
N
Name of the dead letters queue for this application
"myapp-dlq"
messageVisibilityTimeout
N
Amount of time in seconds that a message is hidden from receive requests after it is sent to a subscriber. Default: 10
10
messageRetryLimit
N
Number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10
10
messageWaitTimeSeconds
N
The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than messageWaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. Default: 1
1
messageMaxNumber
N
Maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10
10
fifo
N
Use SQS FIFO queue to provide message ordering and deduplication. Default: "false". See further details about SQS FIFO
"true", "false"
fifoMessageGroupID
N
If fifo is enabled, instructs Dapr to use a custom Message Group ID for the pubsub deployment. This is not mandatory as Dapr creates a custom Message Group ID for each producer, thus ensuring ordering of messages per a Dapr producer. Default: ""
"app1-mgi"
disableEntityManagement
N
When set to true, SNS topics, SQS queues and the SQS subscriptions to SNS do not get created automatically. Default: "false"
"true", "false"
disableDeleteOnRetryLimit
N
When set to true, after retrying and failing of messageRetryLimit times processing a message, reset the message visibility timeout so that other consumers can try processing, instead of deleting the message from SQS (the default behvior). Default: "false"
"true", "false"
assetsManagementTimeoutSeconds
N
Amount of time in seconds, for an AWS asset management operation, before it times out and cancelled. Asset management operations are any operations performed on STS, SNS and SQS, except message publish and consume operations that implement the default Dapr component retry behavior. The value can be set to any non-negative float/integer. Default: 5
0.5, 10
concurrencyMode
N
When messages are received in bulk from SQS, call the subscriber sequentially (“single” message at a time), or concurrently (in “parallel”). Default: "parallel"
"single", "parallel"
concurrencyLimit
N
Defines the maximum number of concurrent workers handling messages. This value is ignored when concurrencyMode is set to "single". To avoid limiting the number of concurrent workers, set this to 0. Default: 0
100
Additional info
Conforming with AWS specifications
Dapr created SNS topic and SQS queue names conform with AWS specifications. By default, Dapr creates an SQS queue name based on the consumer app-id, therefore Dapr might perform name standardization to meet with AWS specifications.
SNS/SQS component behavior
When the pub/sub SNS/SQS component provisions SNS topics, the SQS queues and the subscription behave differently in situations where the component is operating on behalf of a message producer (with no subscriber app deployed), than in situations where a subscriber app is present (with no publisher deployed).
Due to how SNS works without SQS subscription in publisher only setup, the SQS queues and the subscription behave as a “classic” pub/sub system that relies on subscribers listening to topic messages. Without those subscribers, messages:
Cannot be passed onwards and are effectively dropped
Are not available for future subscribers (no replay of message when the subscriber finally subscribes)
SQS FIFO
Using SQS FIFO (fifo metadata field set to "true") per AWS specifications provides message ordering and deduplication, but incurs a lower SQS processing throughput, among other caveats.
Specifying fifoMessageGroupID limits the number of concurrent consumers of the FIFO queue used to only one but guarantees global ordering of messages published by the app’s Dapr sidecars. See this AWS blog post to better understand the topic of Message Group IDs and FIFO queues.
To avoid losing the order of messages delivered to consumers, the FIFO configuration for the SQS Component requires the concurrencyMode metadata field set to "single".
Default parallel concurrencyMode
Since v1.8.0, the component supports the "parallel"concurrencyMode as its default mode. In prior versions, the component default behavior was calling the subscriber a single message at a time and waiting for its response.
SQS dead-letter Queues
When configuring the PubSub component with SQS dead-letter queues, the metadata fields messageReceiveLimit and sqsDeadLettersQueueName must both be set to a value. For messageReceiveLimit, the value must be greater than 0 and the sqsDeadLettersQueueName must not be empty string.
Important
When running the Dapr sidecar (daprd) with your application on EKS (AWS Kubernetes) node/pod already attached to an IAM policy defining access to AWS resources, you must not provide AWS access-key, secret-key, and tokens in the definition of the component spec.
SNS/SQS Contention with Dapr
Fundamentally, SNS aggregates messages from multiple publisher topics into a single SQS queue by creating SQS subscriptions to those topics. As a subscriber, the SNS/SQS pub/sub component consumes messages from that sole SQS queue.
However, like any SQS consumer, the component cannot selectively retrieve the messages published to the SNS topics to which it is specifically subscribed. This can result in the component receiving messages originating from topics without associated handlers. Typically, this occurs during:
Component initialization: If infrastructure subscriptions are ready before component subscription handlers, or
Shutdown: If component handlers are removed before infrastructure subscriptions.
Since this issue affects any SQS consumer of multiple SNS topics, the component cannot prevent consuming messages from topics lacking handlers. When this happens, the component logs an error indicating such messages were erroneously retrieved.
In these situations, the unhandled messages would reappear in SQS with their receive count decremented after each pull. Thus, there is a risk that an unhandled message could exceed its messageReceiveLimit and be lost.
Important
Consider potential contention scenarios when using SNS/SQS with Dapr, and configure messageReceiveLimit appropriately. It is highly recommended to use SQS dead-letter queues by setting sqsDeadLettersQueueName to prevent losing messages.
In order to use localstack with your pub/sub binding, you need to provide the endpoint configuration in the component metadata. The endpoint is unnecessary when running against production AWS.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:snssqs-pubsubspec:type:pubsub.aws.snssqsversion:v1metadata:- name:accessKeyvalue:"anyString"- name:secretKeyvalue:"anyString"- name:endpointvalue:http://localhost:4566# Use us-east-1 or any other region if provided to localstack as defined by "AWS_DEFAULT_REGION" envvar- name:regionvalue:us-east-1
To run localstack on Kubernetes, you can apply the configuration below. Localstack is then reachable at the DNS name http://localstack.default.svc.cluster.local:4566 (assuming this was applied to the default namespace), which should be used as the endpoint.
apiVersion:apps/v1kind:Deploymentmetadata:name:localstackspec:# using the selector, we will expose the running deployments# this is how Kubernetes knows, that a given service belongs to a deploymentselector:matchLabels:app:localstackreplicas:1template:metadata:labels:app:localstackspec:containers:- name:localstackimage:localstack/localstack:latestports:# Expose the edge endpoint- containerPort:4566---kind:ServiceapiVersion:v1metadata:name:localstacklabels:app:localstackspec:selector:app:localstackports:- protocol:TCPport:4566targetPort:4566type:LoadBalancer
In order to run in AWS, create or assign an IAM user with permissions to the SNS and SQS services, with a policy like:
Plug the AWS account ID and AWS account secret into the accessKey and secretKey in the component metadata, using Kubernetes secrets and secretKeyRef.
Alternatively, let’s say you want to provision the SNS and SQS assets using your own tool of choice (for example, Terraform) while preventing Dapr from doing so dynamically. You need to enable disableEntityManagement and assign your Dapr-using application with an IAM Role, with a policy like:
Detailed documentation on the Azure Event Hubs pubsub component
Component format
To set up an Azure Event Hubs pub/sub, create a component of type pubsub.azure.eventhubs. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
Apart from the configuration metadata fields shown below, Azure Event Hubs also supports Azure Authentication mechanisms.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:eventhubs-pubsubspec:type:pubsub.azure.eventhubsversion:v1metadata:# Either connectionString or eventHubNamespace is required# Use connectionString when *not* using Microsoft Entra ID- name:connectionStringvalue:"Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}"# Use eventHubNamespace when using Microsoft Entra ID- name:eventHubNamespacevalue:"namespace"- name:consumerID# Optional. If not supplied, the runtime will create one.value:"channel1"- name:enableEntityManagementvalue:"false"- name:enableInOrderMessageDeliveryvalue:"false"# The following four properties are needed only if enableEntityManagement is set to true- name:resourceGroupNamevalue:"test-rg"- name:subscriptionIDvalue:"value of Azure subscription ID"- name:partitionCountvalue:"1"- name:messageRetentionInDaysvalue:"3"# Checkpoint store attributes- name:storageAccountNamevalue:"myeventhubstorage"- name:storageAccountKeyvalue:"112233445566778899"- name:storageContainerNamevalue:"myeventhubstoragecontainer"# Alternative to passing storageAccountKey- name:storageConnectionStringvalue:"DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<account-key>"
Warning
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
connectionString
Y*
Connection string for the Event Hub or the Event Hub namespace. * Mutally exclusive with eventHubNamespace field. * Required when not using Microsoft Entra ID Authentication
"Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}" or "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key}"
eventHubNamespace
Y*
The Event Hub Namespace name. * Mutally exclusive with connectionString field. * Required when using Microsoft Entra ID Authentication
"namespace"
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.
Boolean value to allow management of the EventHub namespace and storage account. Default: false
"true", "false"
enableInOrderMessageDelivery
N
Input/Output
Boolean value to allow messages to be delivered in the order in which they were posted. This assumes partitionKey is set when publishing or posting to ensure ordering across partitions. Default: false
storageAccountName
Y
Storage account name to use for the checkpoint store.
"myeventhubstorage"
storageAccountKey
Y*
Storage account key for the checkpoint store account. * When using Microsoft Entra ID, it’s possible to omit this if the service principal has access to the storage account too.
"112233445566778899"
storageConnectionString
Y*
Connection string for the checkpoint store, alternative to specifying storageAccountKey
Storage container name for the storage account name.
"myeventhubstoragecontainer"
resourceGroupName
N
Name of the resource group the Event Hub namespace is part of. Required when entity management is enabled
"test-rg"
subscriptionID
N
Azure subscription ID value. Required when entity management is enabled
"azure subscription id"
partitionCount
N
Number of partitions for the new Event Hub namespace. Used only when entity management is enabled. Default: "1"
"2"
messageRetentionInDays
N
Number of days to retain messages for in the newly created Event Hub namespace. Used only when entity management is enabled. Default: "1"
"90"
Microsoft Entra ID authentication
The Azure Event Hubs pub/sub component supports authentication using all Microsoft Entra ID mechanisms. For further information and the relevant component metadata fields to provide depending on the choice of Microsoft Entra ID authentication mechanism, see the docs for authenticating to Azure.
Example Configuration
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:eventhubs-pubsubspec:type:pubsub.azure.eventhubsversion:v1metadata:# Azure Authentication Used- name:azureTenantIdvalue:"***"- name:azureClientIdvalue:"***"- name:azureClientSecretvalue:"***"- name:eventHubNamespace value:"namespace"- name:enableEntityManagementvalue:"false"# The following four properties are needed only if enableEntityManagement is set to true- name:resourceGroupNamevalue:"test-rg"- name:subscriptionIDvalue:"value of Azure subscription ID"- name:partitionCountvalue:"1"- name:messageRetentionInDays# Checkpoint store attributes# In this case, we're using Microsoft Entra ID to access the storage account too- name:storageAccountNamevalue:"myeventhubstorage"- name:storageContainerNamevalue:"myeventhubstoragecontainer"
Sending and receiving multiple messages
Azure Eventhubs supports sending and receiving multiple messages in a single operation using the bulk pub/sub API.
Configuring bulk publish
To set the metadata for bulk publish operation, set the query parameters on the HTTP request or the gRPC metadata, as documented in the API reference.
When subscribing to a topic, you can configure the checkpointing frequency in a partition by setting the metadata in the HTTP or gRPC subscribe request . This metadata enables checkpointing after the configured number of events within a partition event sequence. Disable checkpointing by setting the frequency to 0.
Following example shows a sample subscription file for Declarative subscription using checkPointFrequencyPerPartition metadata. Similarly, you can also pass the metadata in Programmatic subscriptions as well.
When subscribing to a topic using BulkSubscribe, you configure the checkpointing to occur after the specified number of batches, instead of events, where batch means the collection of events received in a single request.
Create an Azure Event Hub
Follow the instructions on the documentation to set up Azure Event Hubs.
Because this component uses Azure Storage as checkpoint store, you will also need an Azure Storage Account. Follow the instructions on the documentation to manage the storage account access keys.
See the documentation on how to get the Event Hubs connection string (note this is not for the Event Hubs namespace).
Create consumer groups for each subscriber
For every Dapr app that wants to subscribe to events, create an Event Hubs consumer group with the name of the Dapr app ID. For example, a Dapr app running on Kubernetes with dapr.io/app-id: "myapp" will need an Event Hubs consumer group named myapp.
Note: Dapr passes the name of the consumer group to the Event Hub, so this is not supplied in the metadata.
Entity Management
When entity management is enabled in the metadata, as long as the application has the right role and permissions to manipulate the Event Hub namespace, Dapr can automatically create the Event Hub and consumer group for you.
The Evet Hub name is the topic field in the incoming request to publish or subscribe to, while the consumer group name is the name of the Dapr app which subscribes to a given Event Hub. For example, a Dapr app running on Kubernetes with name dapr.io/app-id: "myapp" requires an Event Hubs consumer group named myapp.
Dapr passes the name of the consumer group to the Event Hub, so this is not supplied in the metadata.
Receiving custom properties
By default, Dapr does not forward custom properties. However, by setting the subscription metadata requireAllProperties to "true", you can receive custom properties as HTTP headers.
The device-to-cloud events created by Azure IoT Hub devices will contain additional IoT Hub System Properties, and the Azure Event Hubs pubsub component for Dapr will return the following as part of the response metadata:
Detailed documentation on the Azure Service Bus Queues pubsub component
Component format
To set up Azure Service Bus Queues pub/sub, create a component of type pubsub.azure.servicebus.queues. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
connectionString
Y
Shared access policy connection string for the Service Bus. Required unless using Microsoft Entra ID authentication.
See example above
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.
Parameter to set the address of the Service Bus namespace, as a fully-qualified domain name. Required if using Microsoft Entra ID authentication.
"namespace.servicebus.windows.net"
timeoutInSec
N
Timeout for sending messages and for management operations. Default: 60
30
handlerTimeoutInSec
N
Timeout for invoking the app’s handler. Default: 60
30
lockRenewalInSec
N
Defines the frequency at which buffered message locks will be renewed. Default: 20.
20
maxActiveMessages
N
Defines the maximum number of messages to be processing or in the buffer at once. This should be at least as big as the maximum concurrent handlers. Default: 1000
2000
maxConcurrentHandlers
N
Defines the maximum number of concurrent message handlers. Default: 0 (unlimited)
10
disableEntityManagement
N
When set to true, queues and subscriptions do not get created automatically. Default: "false"
"true", "false"
defaultMessageTimeToLiveInSec
N
Default message time to live, in seconds. Used during subscription creation only.
10
autoDeleteOnIdleInSec
N
Time in seconds to wait before auto deleting idle subscriptions. Used during subscription creation only. Must be 300s or greater. Default: 0 (disabled)
3600
maxDeliveryCount
N
Defines the number of attempts the server will make to deliver a message. Used during subscription creation only. Default set by server.
10
lockDurationInSec
N
Defines the length in seconds that a message will be locked for before expiring. Used during subscription creation only. Default set by server.
30
minConnectionRecoveryInSec
N
Minimum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. Default: 2
5
maxConnectionRecoveryInSec
N
Maximum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. After each attempt, the component waits a random number of seconds, increasing every time, between the minimum and the maximum. Default: 300 (5 minutes)
600
maxRetriableErrorsPerSec
N
Maximum number of retriable errors that are processed per second. If a message fails to be processed with a retriable error, the component adds a delay before it starts processing another message, to avoid immediately re-processing messages that have failed. Default: 10
10
publishMaxRetries
N
The max number of retries for when Azure Service Bus responds with “too busy” in order to throttle messages. Defaults: 5
5
publishInitialRetryIntervalInMs
N
Time in milliseconds for the initial exponential backoff when Azure Service Bus throttle messages. Defaults: 500
500
Microsoft Entra ID authentication
The Azure Service Bus Queues pubsub component supports authentication using all Microsoft Entra ID mechanisms, including Managed Identities. For further information and the relevant component metadata fields to provide depending on the choice of Microsoft Entra ID authentication mechanism, see the docs for authenticating to Azure.
Example Configuration
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:servicebus-pubsubspec:type:pubsub.azure.servicebus.queuesversion:v1metadata:- name:namespaceName# Required when using Azure Authentication.# Must be a fully-qualified domain namevalue:"servicebusnamespace.servicebus.windows.net"- name:azureTenantIdvalue:"***"- name:azureClientIdvalue:"***"- name:azureClientSecretvalue:"***"
Message metadata
Azure Service Bus messages extend the Dapr message format with additional contextual metadata. Some metadata fields are set by Azure Service Bus itself (read-only) and others can be set by the client when publishing a message.
Sending a message with metadata
To set Azure Service Bus metadata when sending a message, set the query parameters on the HTTP request or the gRPC metadata as documented here.
metadata.MessageId
metadata.CorrelationId
metadata.SessionId
metadata.Label
metadata.ReplyTo
metadata.PartitionKey
metadata.To
metadata.ContentType
metadata.ScheduledEnqueueTimeUtc
metadata.ReplyToSessionId
Note
The metadata.MessageId property does not set the id property of the cloud event returned by Dapr and should be treated in isolation.
The metadata.ScheduledEnqueueTimeUtc property supports the RFC1123 and RFC3339 timestamp formats.
Receiving a message with metadata
When Dapr calls your application, it attaches Azure Service Bus message metadata to the request using either HTTP headers or gRPC metadata.
In addition to the settable metadata listed above, you can also access the following read-only message metadata.
Follow the instructions here on setting up Azure Service Bus Queues.
Note
Your queue must have the same name as the topic you are publishing to with Dapr. For example, if you are publishing to the pub/sub "myPubsub" on the topic "orders", your queue must be named "orders".
If you are using a shared access policy to connect to the queue, that policy must be able to “manage” the queue. To work with a dead-letter queue, the policy must live on the Service Bus Namespace that contains both the main queue and the dead-letter queue.
Retry policy and dead-letter queues
By default, an Azure Service Bus Queue has a dead-letter queue. The messages are retried the amount given for maxDeliveryCount. The default maxDeliveryCount value defaults to 10, but can be set up to 2000. These retries happen very rapidly and the message is put in the dead-letter queue if no success is returned.
Dapr Pub/sub offers its own dead-letter queue concept that lets you control the retry policy and subscribe to the dead-letter queue through Dapr.
Set up a separate queue as that dead-letter queue in the Azure Service Bus namespace, and a resilience policy that defines how to retry.
Subscribe to the topic to get the failed messages and deal with them.
For example, setting up a dead-letter queue orders-dlq in the subscription and a resiliency policy lets you subscribe to the topic orders-dlq to handle failed messages.
For more details on setting up dead-letter queues, see the dead-letter article.
Read this guide for instructions on configuring pub/sub components
5 - Azure Service Bus Topics
Detailed documentation on the Azure Service Bus Topics pubsub component
Component format
To set up Azure Service Bus Topics pub/sub, create a component of type pubsub.azure.servicebus.topics. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
NOTE: The above settings are shared across all topics that use this component.
Warning
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
connectionString
Y
Shared access policy connection string for the Service Bus. Required unless using Microsoft Entra ID authentication.
See example above
namespaceName
N
Parameter to set the address of the Service Bus namespace, as a fully-qualified domain name. Required if using Microsoft Entra ID authentication.
"namespace.servicebus.windows.net"
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. (appID) value.
Timeout for sending messages and for management operations. Default: 60
30
handlerTimeoutInSec
N
Timeout for invoking the app’s handler. Default: 60
30
lockRenewalInSec
N
Defines the frequency at which buffered message locks will be renewed. Default: 20.
20
maxActiveMessages
N
Defines the maximum number of messages to be processing or in the buffer at once. This should be at least as big as the maximum concurrent handlers. Default: 1000
2000
maxConcurrentHandlers
N
Defines the maximum number of concurrent message handlers. Default: 0 (unlimited)
10
disableEntityManagement
N
When set to true, queues and subscriptions do not get created automatically. Default: "false"
"true", "false"
defaultMessageTimeToLiveInSec
N
Default message time to live, in seconds. Used during subscription creation only.
10
autoDeleteOnIdleInSec
N
Time in seconds to wait before auto deleting idle subscriptions. Used during subscription creation only. Must be 300s or greater. Default: 0 (disabled)
3600
maxDeliveryCount
N
Defines the number of attempts the server makes to deliver a message. Used during subscription creation only. Default set by server.
10
lockDurationInSec
N
Defines the length in seconds that a message will be locked for before expiring. Used during subscription creation only. Default set by server.
30
minConnectionRecoveryInSec
N
Minimum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. Default: 2
5
maxConnectionRecoveryInSec
N
Maximum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. After each attempt, the component waits a random number of seconds, increasing every time, between the minimum and the maximum. Default: 300 (5 minutes)
600
maxRetriableErrorsPerSec
N
Maximum number of retriable errors that are processed per second. If a message fails to be processed with a retriable error, the component adds a delay before it starts processing another message, to avoid immediately re-processing messages that have failed. Default: 10
10
publishMaxRetries
N
The max number of retries for when Azure Service Bus responds with “too busy” in order to throttle messages. Defaults: 5
5
publishInitialRetryIntervalInMs
N
Time in milliseconds for the initial exponential backoff when Azure Service Bus throttle messages. Defaults: 500
500
Microsoft Entra ID authentication
The Azure Service Bus Topics pubsub component supports authentication using all Microsoft Entra ID mechanisms, including Managed Identities. For further information and the relevant component metadata fields to provide depending on the choice of Microsoft Entra ID authentication mechanism, see the docs for authenticating to Azure.
Example Configuration
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:servicebus-pubsubspec:type:pubsub.azure.servicebus.topicsversion:v1metadata:- name:namespaceName# Required when using Azure Authentication.# Must be a fully-qualified domain namevalue:"servicebusnamespace.servicebus.windows.net"- name:azureTenantIdvalue:"***"- name:azureClientIdvalue:"***"- name:azureClientSecretvalue:"***"
Message metadata
Azure Service Bus messages extend the Dapr message format with additional contextual metadata. Some metadata fields are set by Azure Service Bus itself (read-only) and others can be set by the client when publishing a message.
Sending a message with metadata
To set Azure Service Bus metadata when sending a message, set the query parameters on the HTTP request or the gRPC metadata as documented here.
metadata.MessageId
metadata.CorrelationId
metadata.SessionId
metadata.Label
metadata.ReplyTo
metadata.PartitionKey
metadata.To
metadata.ContentType
metadata.ScheduledEnqueueTimeUtc
metadata.ReplyToSessionId
Note: The metadata.MessageId property does not set the id property of the cloud event returned by Dapr and should be treated in isolation.
NOTE: If the metadata.SessionId property is not set but the topic requires sessions then an empty session id will be used.
NOTE: The metadata.ScheduledEnqueueTimeUtc property supports the RFC1123 and RFC3339 timestamp formats.
Receiving a message with metadata
When Dapr calls your application, it will attach Azure Service Bus message metadata to the request using either HTTP headers or gRPC metadata.
In addition to the settable metadata listed above, you can also access the following read-only message metadata.
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
projectId
Y
GCP project ID
myproject-123
endpoint
N
GCP endpoint for the component to use. Only used for local development (for example) with GCP Pub/Sub Emulator. The endpoint is unnecessary when running against the GCP production API.
"http://localhost:8085"
consumerID
N
The Consumer ID organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. The consumerID, along with the topic provided as part of the request, are used to build the Pub/Sub subscription ID
When set to "true", topics and subscriptions do not get created automatically. Default: "false"
"true", "false"
enableMessageOrdering
N
When set to "true", subscribed messages will be received in order, depending on publishing and permissions configuration.
"true", "false"
orderingKey
N
The key provided in the request. It’s used when enableMessageOrdering is set to true to order messages based on such key.
“my-orderingkey”
maxReconnectionAttempts
N
Defines the maximum number of reconnect attempts. Default: 30
30
connectionRecoveryInSec
N
Time in seconds to wait between connection recovery attempts. Default: 2
2
deadLetterTopic
N
Name of the GCP Pub/Sub Topic. This topic must exist before using this component.
"myapp-dlq"
maxDeliveryAttempts
N
Maximum number of attempts to deliver the message. If deadLetterTopic is specified, maxDeliveryAttempts is the maximum number of attempts for failed processing of messages. Once that number is reached, the message will be moved to the dead-letter topic. Default: 5
5
type
N
DEPRECATED GCP credentials type. Only service_account is supported. Defaults to service_account
service_account
maxOutstandingMessages
N
Maximum number of outstanding messages a given streaming-pull connection can have. Default: 1000
50
maxOutstandingBytes
N
Maximum number of outstanding bytes a given streaming-pull connection can have. Default: 1000000000
1000000000
maxConcurrentConnections
N
Maximum number of concurrent streaming-pull connections to be maintained. Default: 10
If enableMessageOrdering is set to “true”, the roles/viewer or roles/pubsub.viewer role will be required on the service account in order to guarantee ordering in cases where order tokens are not embedded in the messages. If this role is not given, or the call to Subscription.Config() fails for any other reason, ordering by embedded order tokens will still function correctly.
GCP Credentials
Since the GCP Pub/Sub component uses the GCP Go Client Libraries, by default it authenticates using Application Default Credentials. This is explained further in the Authenticate to GCP Cloud services using client libraries guide.
In order to use the GCP Pub/Sub Emulator with your pub/sub binding, you need to provide the endpoint configuration in the component metadata. The endpoint is unnecessary when running against the GCP Production API.
The projectId attribute must match the --project used in either the docker-compose.yaml or Docker command.
You can use either “explicit” or “implicit” credentials to configure access to your GCP pubsub instance. If using explicit, most fields are required. Implicit relies on dapr running under a Kubernetes service account (KSA) mapped to a Google service account (GSA) which has the necessary permissions to access pubsub. In implicit mode, only the projectId attribute is needed, all other are optional.
Follow the instructions here on setting up Google Cloud Pub/Sub system.
Detailed documentation on the In Memory pubsub component
The in-memory pub/sub component operates within a single Dapr sidecar. This is primarily meant for development purposes. State is not replicated across multiple sidecars and is lost when the Dapr sidecar is restarted.
Detailed documentation on the NATS JetStream component
Component format
To set up JetStream pub/sub, create a component of type pubsub.jetstream. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:jetstream-pubsubspec:type:pubsub.jetstreamversion:v1metadata:- name:natsURLvalue:"nats://localhost:4222"- name:jwt# Optional. Used for decentralized JWT authentication.value:"eyJhbGciOiJ...6yJV_adQssw5c"- name:seedKey# Optional. Used for decentralized JWT authentication.value:"SUACS34K232O...5Z3POU7BNIL4Y"- name:tls_client_cert# Optional. Used for TLS Client authentication.value:"/path/to/tls.crt"- name:tls_client_key# Optional. Used for TLS Client authentication.value:"/path/to/tls.key"- name:token# Optional. Used for token based authentication.value:"my-token"- name:namevalue:"my-conn-name"- name:streamNamevalue:"my-stream"- name:durableName value:"my-durable-subscription"- name:queueGroupNamevalue:"my-queue-group"- name:startSequencevalue:1- name:startTime# In Unix formatvalue:1630349391- name:flowControlvalue:false- name:ackWaitvalue:10s- name:maxDelivervalue:5- name:backOffvalue:"50ms, 1s, 10s"- name:maxAckPendingvalue:5000- name:replicasvalue:1- name:memoryStoragevalue:false- name:rateLimitvalue:1024- name:heartbeatvalue:15s- name:ackPolicyvalue:explicit- name:deliverPolicyvalue:all- name:domainvalue:hub- name:apiPrefixvalue:PREFIX
Let’s say you’d like each message to be processed by only one application or pod with the same app-id. Typically, the consumerID metadata spec helps you define competing consumers.
Since consumerID is not supported in NATS JetStream, you need to specify durableName and queueGroupName to achieve the competing consumers pattern. For example:
type of pubsub, true: pubsub persisted (EventsStore), false: pubsub in-memory (Events)
true or false (default is false)
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
url
Y
Address of the MQTT broker. Can be secretKeyRef to use a secret reference. Use the tcp:// URI scheme for non-TLS communication. Use the ssl:// URI scheme for TLS communication.
"tcp://[username][:password]@host.domain[:port]"
consumerID
N
The client ID used to connect to the MQTT broker for the consumer connection. Defaults to the Dapr app ID. Note: if producerID is not set, -consumer is appended to this value for the consumer connection
The MQTT pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. If the service marks the message as not processed, the message won’t be acknowledged back to the broker. Only if broker resends the message, would it would be retried.
To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the MQTT pub/sub component.
There is a crucial difference between the two ways of retries:
Re-delivery of unacknowledged messages is completely dependent on the broker. Dapr does not guarantee it. Some brokers like emqx, vernemq etc. support it but it not a part of MQTT3 spec.
Using a retry resiliency policy makes the same Dapr sidecar retry redelivering the messages. So it is the same Dapr sidecar and the same app receiving the same message.
Communication using TLS
To configure communication using TLS, ensure that the MQTT broker (for example, mosquitto) is configured to support certificates and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:
Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.
Consuming a shared topic
When consuming a shared topic, each consumer must have a unique identifier. By default, the application ID is used to uniquely identify each consumer and publisher. In self-hosted mode, invoking each dapr run with a different application ID is sufficient to have them consume from the same shared topic. However, on Kubernetes, multiple instances of an application pod will share the same application ID, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s consumerID metadata with a {uuid} tag, which will give each instance a randomly generated consumerID value on start up. For example:
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
url
Y
Address of the MQTT broker. Can be secretKeyRef to use a secret reference. Use the tcp:// URI scheme for non-TLS communication. Use the ssl:// URI scheme for TLS communication.
"tcp://[username][:password]@host.domain[:port]"
consumerID
N
The client ID used to connect to the MQTT broker. Defaults to the Dapr app ID.
Defines whether the message is saved by the broker as the last known good value for a specified topic. Defaults to "false".
"true", "false"
cleanSession
N
Sets the clean_session flag in the connection message to the MQTT broker if "true" (more info). Defaults to "false".
"true", "false"
caCert
Required for using TLS
Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates.
See example below
clientCert
Required for using TLS
TLS client certificate in PEM format. Must be used with clientKey.
See example below
clientKey
Required for using TLS
TLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference.
See example below
qos
N
Indicates the Quality of Service Level (QoS) of the message (more info). Defaults to 1.
0, 1, 2
Communication using TLS
To configure communication using TLS, ensure that the MQTT broker (for example, emqx) is configured to support certificates and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:
Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.
Consuming a shared topic
When consuming a shared topic, each consumer must have a unique identifier. By default, the application ID is used to uniquely identify each consumer and publisher. In self-hosted mode, invoking each dapr run with a different application ID is sufficient to have them consume from the same shared topic. However, on Kubernetes, multiple instances of an application pod will share the same application ID, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s consumerID metadata with a {uuid} tag (which will give each instance a randomly generated value on start up) or {podName} (which will use the Pod’s name on Kubernetes). For example:
Detailed documentation on the Pulsar pubsub component
Component format
To set up Apache Pulsar pub/sub, create a component of type pubsub.pulsar. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets. This component supports storing the token parameter and any other sensitive parameter and data as Kubernetes Secrets.
Spec metadata fields
Field
Required
Details
Example
host
Y
Address of the Pulsar broker. Default is "localhost:6650"
"localhost:6650" OR "http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080"
enableTLS
N
Enable TLS. Default: "false"
"true", "false"
tenant
N
The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. Default: "public"
The administrative unit of the topic, which acts as a grouping mechanism for related topics. Default: "default"
"default"
persistent
N
Pulsar supports two kinds of topics: persistent and non-persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks.
disableBatching
N
disable batching.When batching enabled default batch delay is set to 10 ms and default batch size is 1000 messages,Setting disableBatching: true will make the producer to send messages individually. Default: "false"
"true", "false"
receiverQueueSize
N
Sets the size of the consumer receiver queue. Controls how many messages can be accumulated by the consumer before it is explicitly called to read messages by Dapr. Default: "1000"
"1000"
batchingMaxPublishDelay
N
batchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Default: "10ms"
"10ms", "10"
batchingMaxMessages
N
batchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: "1000"
"1000"
batchingMaxSize
N
batchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: "128KB"
"131072"
.jsonschema
N
Enforces JSON schema validation for the configured topic.
.avroschema
N
Enforces Avro schema validation for the configured topic.
publicKey
N
A public key to be used for publisher and consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value
privateKey
N
A private key to be used for consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value
keys
N
A comma delimited string containing names of Pulsar session keys. Used in conjunction with publicKey for publisher encryption
processMode
N
Enable processing multiple messages at once. Default: "async"
Since v3.0, Pulsar supports OIDC authentication.
To enable OIDC authentication, you need to provide the following OAuth2 parameters to the component spec.
OAuth2 authentication cannot be used in combination with token authentication.
It is recommended that you use a secret reference for the client secret.
The pulsar OAuth2 authenticator is not specifically complaint with OIDC so it is your responsibility to ensure fields are compliant. For example, the issuer URL must use the https protocol, the requested scopes include openid, etc.
If the oauth2TokenCAPEM field is omitted then the system’s certificate pool is used for connecting to the OAuth2 issuer if using https.
Field
Required
Details
Example
oauth2TokenURL
N
URL to request the OIDC client_credentials token from. Must not be empty.
The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.
Delay queue
When invoking the Pulsar pub/sub, it’s possible to provide an optional delay queue by using the metadata query parameters in the request url.
These optional parameter names are metadata.deliverAt or metadata.deliverAfter:
deliverAt: Delay message to deliver at a specified time (RFC3339 format); for example, "2021-09-01T10:00:00Z"
deliverAfter: Delay message to deliver after a specified amount of time; for example,"4h5m3s"
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:messagebusspec:type:pubsub.pulsarversion:v1metadata:- name:hostvalue:"localhost:6650"- name:publicKeyvalue:"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"- name:keysvalue:myapp.key
All other metadata key/value pairs (that are not partitionKey) are set as headers in the Pulsar message. For example, set a correlationId for the message:
Detailed documentation on the RabbitMQ pubsub component
Component format
To set up RabbitMQ pub/sub, create a component of type pubsub.rabbitmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:rabbitmq-pubsubspec:type:pubsub.rabbitmqversion:v1metadata:- name:connectionStringvalue:"amqp://localhost:5672"- name:protocolvalue:amqp - name:hostnamevalue:localhost - name:usernamevalue:username- name:passwordvalue:password - name:consumerIDvalue:channel1- name:durablevalue:false- name:deletedWhenUnusedvalue:false- name:autoAckvalue:false- name:deliveryModevalue:0- name:requeueInFailurevalue:false- name:prefetchCountvalue:0- name:reconnectWaitvalue:0- name:concurrencyModevalue:parallel- name:publisherConfirmvalue:false- name:enableDeadLetter# Optional enable dead Letter or notvalue:true- name:maxLen# Optional max message count in a queuevalue:3000- name:maxLenBytes# Optional maximum length in bytes of a queue.value:10485760- name:exchangeKindvalue:fanout- name:saslExternalvalue:false- name:ttlInSecondsvalue:60- name:clientNamevalue:{podName}- name:heartBeatvalue:10s
Warning
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
connectionString
Y*
The RabbitMQ connection string. *Mutally exclusive with protocol, hostname, username, password field
amqp://user:pass@localhost:5672
protocol
N*
The RabbitMQ protocol. *Mutally exclusive with connectionString field
amqp
hostname
N*
The RabbitMQ hostname. *Mutally exclusive with connectionString field
localhost
username
N*
The RabbitMQ username. *Mutally exclusive with connectionString field
username
password
N*
The RabbitMQ password. *Mutally exclusive with connectionString field
password
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.
Whether or not to use durable queues. Defaults to "false"
"true", "false"
deletedWhenUnused
N
Whether or not the queue should be configured to auto-delete Defaults to "true"
"true", "false"
autoAck
N
Whether or not the queue consumer should auto-ack messages. Defaults to "false"
"true", "false"
deliveryMode
N
Persistence mode when publishing messages. Defaults to "0". RabbitMQ treats "2" as persistent, all other numbers as non-persistent
"0", "2"
requeueInFailure
N
Whether or not to requeue when sending a negative acknowledgement in case of a failure. Defaults to "false"
"true", "false"
prefetchCount
N
Number of messages to prefetch. Consider changing this to a non-zero value for production environments. Defaults to "0", which means that all available messages will be pre-fetched.
"2"
publisherConfirm
N
If enabled, client waits for publisher confirms after publishing a message. Defaults to "false"
"true", "false"
reconnectWait
N
How long to wait (in seconds) before reconnecting if a connection failure occurs
"0"
concurrencyMode
N
parallel is the default, and allows processing multiple messages in parallel (limited by the app-max-concurrency annotation, if configured). Set to single to disable parallel processing. In most situations there’s no reason to change this.
parallel, single
enableDeadLetter
N
Enable forwarding Messages that cannot be handled to a dead-letter topic. Defaults to "false"
"true", "false"
maxLen
N
The maximum number of messages of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit.
"1000"
maxLenBytes
N
Maximum length in bytes of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit.
"1048576"
exchangeKind
N
Exchange kind of the rabbitmq exchange. Defaults to "fanout".
"fanout","topic"
saslExternal
N
With TLS, should the username be taken from an additional field (for example, CN). See RabbitMQ Authentication Mechanisms. Defaults to "false".
"true", "false"
ttlInSeconds
N
Set message TTL at the component level, which can be overwritten by message level TTL per request.
"60"
caCert
Required for using TLS
Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates.
This RabbitMQ client-provided connection name is a custom identifier. If set, the identifier is mentioned in RabbitMQ server log entries and management UI. Can be set to {uuid}, {podName}, or {appID}, which is replaced by Dapr runtime to the real value.
"app1", {uuid}, {podName}, {appID}
heartBeat
N
Defines the heartbeat interval with the server, detecting the aliveness of the peer TCP connection with the RabbitMQ server. Defaults to 10s .
"10s"
Communication using TLS
To configure communication using TLS, ensure that the RabbitMQ nodes have TLS enabled and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:
apiVersion:dapr.io/v1alpha1kind:Componentmetadata:name:rabbitmq-pubsubspec:type:pubsub.rabbitmqversion:v1metadata:- name:hostvalue:"amqps://localhost:5671"- name:consumerIDvalue:myapp- name:durablevalue:false- name:deletedWhenUnusedvalue:false- name:autoAckvalue:false- name:deliveryModevalue:0- name:requeueInFailurevalue:false- name:prefetchCountvalue:0- name:reconnectWaitvalue:0- name:concurrencyModevalue:parallel- name:publisherConfirmvalue:false- name:enableDeadLetter# Optional enable dead Letter or notvalue:true- name:maxLen# Optional max message count in a queuevalue:3000- name:maxLenBytes# Optional maximum length in bytes of a queue.value:10485760- name:exchangeKindvalue:fanout- name:saslExternalvalue:false- name:caCertvalue:${{ myLoadedCACert }}- name:clientCertvalue:${{ myLoadedClientCert }}- name:clientKeysecretKeyRef:name:myRabbitMQClientKeykey:myRabbitMQClientKey
Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.
Enabling message delivery retries
The RabbitMQ pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. When the service returns a result, the message will be marked as consumed regardless of whether it was processed correctly or not. Note that this is common among all Dapr PubSub components and not just RabbitMQ.
Dapr can try redelivering a message a second time, when autoAck is set to false and requeueInFailure is set to true.
To make Dapr use more sophisticated retry policies, you can apply a retry resiliency policy to the RabbitMQ pub/sub component.
There is a crucial difference between the two ways to retry messages:
When using autoAck = false and requeueInFailure = true, RabbitMQ is the one responsible for re-delivering messages and any subscriber can get the redelivered message. If you have more than one instance of your consumer, then it’s possible that another consumer will get it. This is usually the better approach because if there’s a transient failure, it’s more likely that a different worker will be in a better position to successfully process the message.
Using Resiliency makes the same Dapr sidecar retry redelivering the messages. So it will be the same Dapr sidecar and the same app receiving the same message.
Create a RabbitMQ server
You can run a RabbitMQ server locally using Docker:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
You can then interact with the server using the client port: localhost:5672.
The easiest way to install RabbitMQ on Kubernetes is by using the Helm chart:
helm install rabbitmq stable/rabbitmq
Look at the chart output and get the username and password.
This will install RabbitMQ into the default namespace. To interact with RabbitMQ, find the service with: kubectl get svc rabbitmq.
For example, if installing using the example above, the RabbitMQ server client address would be:
rabbitmq.default.svc.cluster.local:5672
Use topic exchange to route messages
Setting exchangeKind to "topic" uses the topic exchanges, which are commonly used for the multicast routing of messages. In order to route messages using topic exchange, you must set the following metadata:
routingKey:
Messages with a routing key are routed to one or many queues based on the routing key defined in the metadata when subscribing.
queueName:
If you don’t set the queueName, only one queue is created, and all routing keys will route to that queue. This means all subscribers will bind to that queue, which won’t give the desired results.
For example, if an app is configured with a routing key keyA and queueName of queue-A:
It will receive messages with routing key keyA, and messages with other routing keys are not received.
// publish messages with routing key `keyA`, and these will be received by the above example.
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is a message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyA"}))
// publish messages with routing key `keyB`, and these will not be received by the above example.
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyB"}))
Bind multiple routingKey
Multiple routing keys can be separated by commas.
The example below binds three routingKey: keyA, keyB, and "". Note the binding method of empty keys.
packagemain"encoding/json""net/http"constappPort=3000typesubscriptionstruct{PubsubNamestring`json:"pubsubname"`Topicstring`json:"topic"`Metadatamap[string]string`json:"metadata,omitempty"`Routesroutes`json:"routes"`}typeroutesstruct{Rules[]rule`json:"rules,omitempty"`Defaultstring`json:"default,omitempty"`}// This handles /dapr/subscribefuncconfigureSubscribeHandler(whttp.ResponseWriter,_*http.Request){t:=[]subscription{{PubsubName:"pubsub",Topic:"checkout",Routes:routes{Default:"/orders",},Metadata:map[string]string{"maxPriority":"3"},},}w.WriteHeader(http.StatusOK)json.NewEncoder(w).Encode(t)}
Setting a priority when publishing a message
To set a priority on a message, add the publish metadata key maxPriority to the publish endpoint or SDK method.
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders?metadata.priority=3 -H "Content-Type: application/json" -d '{"orderId": "100"}'
You can set a time-to-live (TTL) value at either the message or component level. Set default component-level TTL using the component spec ttlInSeconds field in your component.
Note
If you set both component-level and message-level TTL, the default component-level TTL is ignored in favor of the message-level TTL.
Single Active Consumer
The RabbitMQ Single Active Consumer setup ensures that only one consumer at a time processes messages from a queue and switches to another registered consumer if the active one is canceled or fails. This approach might be required when it is crucial for messages to be consumed in the exact order they arrive in the queue and if distributed processing with multiple instances is not supported.
When this option is enabled on a queue by Dapr, an instance of the Dapr runtime will be the single active consumer. To allow another application instance to take over in case of failure, Dapr runtime must probe the application’s health and unsubscribe from the pub/sub component.
Note
This pattern will prevent the application to scale as only one instance can process the load. While it might be interesting for Dapr integration with legacy or sensible applications, you should consider a design allowing distributed processing if you need scalability.
Detailed documentation on the Redis Streams pubsub component
Component format
To set up Redis Streams pub/sub, create a component of type pubsub.redis. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
If the Redis instance supports TLS with public certificates, can be configured to be enabled or disabled. Defaults to "false"
"true", "false"
clientCert
N
The content of the client certificate, used for Redis instances that require client-side certificates. Must be used with clientKey and enableTLS must be set to true. It is recommended to use a secret store as described here
"----BEGIN CERTIFICATE-----\nMIIC..."
clientKey
N
The content of the client private key, used in conjunction with clientCert for authentication. It is recommended to use a secret store as described here
"----BEGIN PRIVATE KEY-----\nMIIE..."
redeliverInterval
N
The interval between checking for pending messages to redeliver. Can use either be Go duration string (for example “ms”, “s”, “m”) or milliseconds number. Defaults to "60s". "0" disables redelivery.
"30s", "5000"
processingTimeout
N
The amount time that a message must be pending before attempting to redeliver it. Can use either be Go duration string ( for example “ms”, “s”, “m”) or milliseconds number. Defaults to "15s". "0" disables redelivery.
"60s", "600000"
queueDepth
N
The size of the message queue for processing. Defaults to "100".
"1000"
concurrency
N
The number of concurrent workers that are processing messages. Defaults to "10".
"15"
redisType
N
The type of redis. There are two valid values, one is "node" for single node mode, the other is "cluster" for redis cluster mode. Defaults to "node".
"cluster"
redisDB
N
Database selected after connecting to redis. If "redisType" is "cluster" this option is ignored. Defaults to "0".
"0"
redisMaxRetries
N
Maximum number of times to retry commands before giving up. Default is to not retry failed commands.
"5"
redisMinRetryInterval
N
Minimum backoff for redis commands between each retry. Default is "8ms"; "-1" disables backoff.
"8ms"
redisMaxRetryInterval
N
Maximum backoff for redis commands between each retry. Default is "512ms";"-1" disables backoff.
"5s"
dialTimeout
N
Dial timeout for establishing new connections. Defaults to "5s".
"5s"
readTimeout
N
Timeout for socket reads. If reached, redis commands will fail with a timeout instead of blocking. Defaults to "3s", "-1" for no timeout.
"3s"
writeTimeout
N
Timeout for socket writes. If reached, redis commands will fail with a timeout instead of blocking. Defaults is readTimeout.
"3s"
poolSize
N
Maximum number of socket connections. Default is 10 connections per every CPU as reported by runtime.NumCPU.
"20"
poolTimeout
N
Amount of time client waits for a connection if all connections are busy before returning an error. Default is readTimeout + 1 second.
"5s"
maxConnAge
N
Connection age at which the client retires (closes) the connection. Default is to not close aged connections.
"30m"
minIdleConns
N
Minimum number of idle connections to keep open in order to avoid the performance degradation associated with creating new connections. Defaults to "0".
"2"
idleCheckFrequency
N
Frequency of idle checks made by idle connections reaper. Default is "1m". "-1" disables idle connections reaper.
"-1"
idleTimeout
N
Amount of time after which the client closes idle connections. Should be less than server’s timeout. Default is "5m". "-1" disables idle timeout check.
"10m"
failover
N
Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false"
Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
"10000"
streamTTL
N
TTL duration for stream entries. Entries older than this duration will be evicted. This is an approximate value, as it’s implemented using Redis stream’s MINID trimming with the ‘~’ modifier. The actual retention may include slightly more entries than strictly defined by the TTL, as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries.
"30d"
Create a Redis instance
Dapr can use any Redis instance - containerized, running on your local dev machine, or a managed cloud service, provided the version of Redis is 5.x or 6.x.
The Dapr CLI will automatically create and setup a Redis Streams instance for you.
The Redis instance will be installed via Docker when you run dapr init, and the component file will be created in default directory. ($HOME/.dapr/components directory (Mac/Linux) or %USERPROFILE%\.dapr\components on Windows).
You can use Helm to quickly create a Redis instance in our Kubernetes cluster. This approach requires Installing Helm.
Run kubectl get pods to see the Redis containers now running in your cluster.
Add redis-master:6379 as the redisHost in your redis.yaml file. For example:
metadata:- name:redisHostvalue:redis-master:6379
Next, we’ll get our Redis password, which is slightly different depending on the OS we’re using:
Windows: Run kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" > encoded.b64, which will create a file with your encoded password. Next, run certutil -decode encoded.b64 password.txt, which will put your redis password in a text file called password.txt. Copy the password and delete the two files.
Linux/MacOS: Run kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" | base64 --decode and copy the outputted password.
Add this password as the redisPassword value in your redis.yaml file. For example:
Once your instance is created, grab the Host name (FQDN) and your access key from the Azure portal.
For the Host name:
Navigate to the resource’s Overview page.
Copy the Host name value.
For your access key:
Navigate to Settings > Access Keys.
Copy and save your key.
Add your key and your host name to a redis.yaml file that Dapr can apply to your cluster.
If you’re running a sample, add the host and key to the provided redis.yaml.
If you’re creating a project from the ground up, create a redis.yaml file as specified in the Component format section.
Set the redisHost key to [HOST NAME FROM PREVIOUS STEP]:6379 and the redisPassword key to the key you saved earlier.
Note: In a production-grade application, follow secret management instructions to securely manage your secrets.
Enable EntraID support:
Enable Entra ID authentication on your Azure Redis server. This may takes a few minutes.
Set useEntraID to "true" to implement EntraID support for Azure Cache for Redis.
Set enableTLS to "true" to support TLS.
Note:useEntraID assumes that either your UserPrincipal (via AzureCLICredential) or the SystemAssigned managed identity have the RedisDataOwner role permission. If a user-assigned identity is used, you need to specify the azureClientID property.
Detailed documentation on the RocketMQ pubsub component
Component format
To set up RocketMQ pub/sub, create a component of type pubsub.rocketmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
default
Example
instanceName
N
Instance name
time.Now().String()
dapr-rocketmq-test
consumerGroup
N
Consumer group name. Recommend. If producerGroup is null,groupName is used.
dapr-rocketmq-test-g-c
producerGroup (consumerID)
N
Producer group name. Recommended. If producerGroup is null,consumerID is used. If consumerID also is null, groupName is used.
dapr-rocketmq-test-g-p
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.
Producer Queue selector. There are five implementations of queue selector: hash, random, manual, roundRobin, dapr.
dapr
hash
consumerModel
N
Message model that defines how messages are delivered to each consumer client. RocketMQ supports two message models: clustering and broadcasting.
clustering
broadcasting , clustering
fromWhere (consumeFromWhere)
N
Consuming point on consumer booting. There are three consuming points: CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP
CONSUME_FROM_LAST_OFFSET
CONSUME_FROM_LAST_OFFSET
consumeTimestamp
N
Backtracks consumption time with second precision. Time format is yyyymmddhhmmss. For example, 20131223171201 implies the time of 17:12:01 and date of December 23, 2013
Determines if it’s an ordered message using FIFO order.
false
false
consumeMessageBatchMaxSize
N
Batch consumption size out of range [1, 1024]
512
10
consumeConcurrentlyMaxSpan
N
Concurrently max span offset. This has no effect on sequential consumption. Range: [1, 65535]
1000
1000
maxReconsumeTimes
N
Max re-consume times. -1 means 16 times. If messages are re-consumed more than {@link maxReconsumeTimes} before success, they’ll be directed to a deletion queue.
Orderly message is MaxInt32; Concurrently message is 16
16
autoCommit
N
Enable auto commit
true
false
consumeTimeout
N
Maximum amount of time a message may block the consuming thread. Time unit: Minute
15
15
consumerPullTimeout
N
The socket timeout in milliseconds
pullInterval
N
Message pull interval
100
100
pullBatchSize
N
The number of messages pulled from the broker at a time. If pullBatchSize is null, use ConsumerBatchSize. pullBatchSize out of range [1, 1024]
32
10
pullThresholdForQueue
N
Flow control threshold on queue level. Each message queue will cache a maximum of 1000 messages by default. Consider the PullBatchSize - the instantaneous value may exceed the limit. Range: [1, 65535]
1024
1000
pullThresholdForTopic
N
Flow control threshold on topic level. The value of pullThresholdForQueue will be overwritten and calculated based on pullThresholdForTopic if it isn’t unlimited. For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, then pullThresholdForQueue will be set to 100. Range: [1, 6553500]
-1(Unlimited)
10
pullThresholdSizeForQueue
N
Limit the cached message size on queue level. Consider the pullBatchSize - the instantaneous value may exceed the limit. The size of a message is only measured by message body, so it’s not accurate. Range: [1, 1024]
100
100
pullThresholdSizeForTopic
N
Limit the cached message size on topic level. The value of pullThresholdSizeForQueue will be overwritten and calculated based on pullThresholdSizeForTopic if it isn’t unlimited. For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB. Range: [1, 102400]
Read this guide for instructions on configuring pub/sub components
16 - Solace-AMQP
Detailed documentation on the Solace-AMQP pubsub component
Component format
To set up Solace-AMQP pub/sub, create a component of type pubsub.solace.amqp. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.
Spec metadata fields
Field
Required
Details
Example
url
Y
Address of the AMQP broker. Can be secretKeyRef to use a secret reference. Use the amqp:// URI scheme for non-TLS communication. Use the amqps:// URI scheme for TLS communication.
"amqp://host.domain[:port]"
username
Y
The username to connect to the broker. Only required if anonymous is not specified or set to false .
default
password
Y
The password to connect to the broker. Only required if anonymous is not specified or set to false.
default
consumerID
N
Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value.
To connect to the broker without credential validation. Only works if enabled on the broker. A username and password would not be required if this is set to true.
true
caCert
Required for using TLS
Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates.
While the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.
Publishing/subscribing to topics and queues
By default, messages are published and subscribed over topics. If you would like your destination to be a queue, prefix the topic with queue: and the Solace AMQP component will connect to a queue.