This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Pub/sub brokers component specs

The supported pub/sub brokers that interface with Dapr

The following table lists publish and subscribe brokers supported by the Dapr pub/sub building block. Learn how to set up different brokers for Dapr publish and subscribe.

Table headers to note:

Header Description Example
Status Component certification status Alpha
Beta
Stable
Component version The version of the component v1
Since runtime version The version of the Dapr runtime when the component status was set or updated 1.11

Generic

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-apache-kafka/">Apache Kafka</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.5</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-inmemory/">In-memory</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.7</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-jetstream/">JetStream</a>
    </td>
    <td>Beta</td>
    <td>v1</td>
    <td>1.10</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-kubemq/">KubeMQ</a>
    </td>
    <td>Beta</td>
    <td>v1</td>
    <td>1.10</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-mqtt3/">MQTT3</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.7</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-pulsar/">Pulsar</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.10</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-rabbitmq/">RabbitMQ</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.7</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-redis-pubsub/">Redis Streams</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.0</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-rocketmq/">RocketMQ</a>
    </td>
    <td>Alpha</td>
    <td>v1</td>
    <td>1.8</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-solace-amqp/">Solace-AMQP</a>
    </td>
    <td>Beta</td>
    <td>v1</td>
    <td>1.10</td>
</tr>
Component Status Component version Since runtime version

Amazon Web Services (AWS)

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-aws-snssqs/">AWS SNS/SQS</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.10</td>
</tr>
Component Status Component version Since runtime version

Google Cloud Platform (GCP)

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-gcp-pubsub/">GCP Pub/Sub</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.11</td>
</tr>
Component Status Component version Since runtime version

Microsoft Azure

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-azure-eventhubs/">Azure Event Hubs</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.8</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-azure-servicebus-queues/">Azure Service Bus Queues</a>
    </td>
    <td>Beta</td>
    <td>v1</td>
    <td>1.10</td>
</tr>

<tr>
    <td><a href="/reference/components-reference/supported-pubsub/setup-azure-servicebus-topics/">Azure Service Bus Topics</a>
    </td>
    <td>Stable</td>
    <td>v1</td>
    <td>1.0</td>
</tr>
Component Status Component version Since runtime version

1 - Apache Kafka

Detailed documentation on the Apache Kafka pubsub component

Component format

To set up Apache Kafka pub/sub, create a component of type pubsub.kafka. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

All component metadata field values can carry templated metadata values, which are resolved on Dapr sidecar startup. For example, you can choose to use {namespace} as the consumerGroup to enable using the same appId in different namespaces using the same topics as described in this article.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "{namespace}"
  - name: consumerID # Optional. If not supplied, runtime will create one.
    value: "channel1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "password"
  - name: saslUsername # Required if authType is `password`.
    value: "adminuser"
  - name: saslPassword # Required if authType is `password`.
    secretKeyRef:
      name: kafka-secrets
      key: saslPasswordSecret
  - name: saslMechanism
    value: "SHA-512"
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: heartbeatInterval # Optional.
    value: 5s
  - name: sessionTimeout # Optional.
    value: 15s
  - name: version # Optional.
    value: 2.0.0
  - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
    value: "true"
  - name: consumerFetchMin # Optional. Advanced setting. The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
    value: 1
  - name: consumerFetchDefault # Optional. Advanced setting. The default number of message bytes to fetch from the broker in each request.
    value: 2097152
  - name: channelBufferSize # Optional. Advanced setting. The number of events to buffer in internal and external channels.
    value: 512
  - name: consumerGroupRebalanceStrategy # Optional. Advanced setting. The strategy to use for consumer group rebalancing.
    value: sticky
  - name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
    value: http://localhost:8081
  - name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
    value: XYAXXAZ
  - name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
    value: "ABCDEFGMEADFF"
  - name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
    value: true
  - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
    value: 5m
  - name: useAvroJson # Optional. Enables Avro JSON schema for serialization as opposed to Standard JSON default. Only applicable when the subscription uses valueSchemaType=Avro
    value: "true"
  - name: escapeHeaders # Optional.
    value: false
  

For details on using secretKeyRef, see the guide on how to reference secrets in components.

Spec metadata fields

Field Required Details Example
brokers Y A comma-separated list of Kafka brokers. "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093"
consumerGroup N A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. If a value for consumerGroup is provided, any value for consumerID is ignored - a combination of the consumer group and a random unique identifier will be set for the consumerID instead. "group1"
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. If a value for consumerGroup is provided, any value for consumerID is ignored - a combination of the consumer group and a random unique identifier will be set for the consumerID instead. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
clientID N A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to "namespace.appID" for Kubernetes mode or "appID" for Self-Hosted mode. "my-namespace.my-dapr-app", "my-dapr-app"
authRequired N Deprecated Enable SASL authentication with the Kafka brokers. "true", "false"
authType Y Configure or disable authentication. Supported values: none, password, mtls, oidc or awsiam "password", "none"
saslUsername N The SASL username used for authentication. Only required if authType is set to "password". "adminuser"
saslPassword N The SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to “password”`. "", "KeFg23!"
saslMechanism N The SASL Authentication Mechanism you wish to use. Only required if authType is set to "password". Defaults to PLAINTEXT "SHA-512", "SHA-256", "PLAINTEXT"
initialOffset N The initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”. "oldest"
maxMessageBytes N The maximum size in bytes allowed for a single Kafka message. Defaults to 1024. 2048
consumeRetryInterval N The interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to 100ms. 200ms
consumeRetryEnabled N Disable consume retry by setting "false" "true", "false"
version N Kafka cluster version. Defaults to 2.0.0. Note that this must be set to 1.0.0 if you are using Azure EventHubs with Kafka. 0.10.2.0
caCert N Certificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert N Client certificate, required for authType mtls. Can be secretKeyRef to use a secret reference "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey N Client key, required for authType mtls Can be secretKeyRef to use a secret reference "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
skipVerify N Skip TLS verification, this is not recommended for use in production. Defaults to "false" "true", "false"
disableTls N Disable TLS for transport security. To disable, you’re not required to set value to "true". This is not recommended for use in production. Defaults to "false". "true", "false"
oidcTokenEndpoint N Full URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidc https://identity.example.com/v1/token"
oidcClientID N The OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidc dapr-kafka
oidcClientSecret N The OAuth2 client secret that has been provisioned in the identity provider: Required when authType is set to oidc "KeFg23!"
oidcScopes N Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to "openid" "openid,kafka-prod"
oidcExtensions N String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token {"cluster":"kafka","poolid":"kafkapool"}
awsRegion N This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘region’ instead. The AWS region where the Kafka cluster is deployed to. Required when authType is set to awsiam us-west-1
awsAccessKey N This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘accessKey’ instead. AWS access key associated with an IAM account. "accessKey"
awsSecretKey N This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘secretKey’ instead. The secret key associated with the access key. "secretKey"
awsSessionToken N This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘sessionToken’ instead. AWS session token to use. A session token is only required if you are using temporary security credentials. "sessionToken"
awsIamRoleArn N This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘assumeRoleArn’ instead. IAM role that has access to AWS Managed Streaming for Apache Kafka (MSK). This is another option to authenticate with MSK aside from the AWS Credentials. "arn:aws:iam::123456789:role/mskRole"
awsStsSessionName N This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use ‘sessionName’ instead. Represents the session name for assuming a role. "DaprDefaultSession"
schemaRegistryURL N Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. http://localhost:8081
schemaRegistryAPIKey N When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. XYAXXAZ
schemaRegistryAPISecret N When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. ABCDEFGMEADFF
schemaCachingEnabled N When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is true true
schemaLatestVersionCacheTTL N When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min 5m
useAvroJson N Enables Avro JSON schema for serialization as opposed to Standard JSON default. Only applicable when the subscription uses valueSchemaType=Avro. Default is "false" "true"
clientConnectionTopicMetadataRefreshInterval N The interval for the client connection’s topic metadata to be refreshed with the broker as a Go duration. Defaults to 9m. "4m"
clientConnectionKeepAliveInterval N The maximum time for the client connection to be kept alive with the broker, as a Go duration, before closing the connection. A zero value (default) means keeping alive indefinitely. "4m"
consumerFetchMin N The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available. The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM’s fetch.min.bytes. "2"
consumerFetchDefault N The default number of message bytes to fetch from the broker in each request. Default is "1048576" bytes. "2097152"
channelBufferSize N The number of events to buffer in internal and external channels. This permits the producer and consumer to continue processing some messages in the background while user code is working, greatly improving throughput. Defaults to 256. "512"
heartbeatInterval N The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the sessionTimeout value. Defaults to “3s”. "5s"
sessionTimeout N The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to “10s”. "20s"
consumerGroupRebalanceStrategy N The strategy to use for consumer group rebalancing. Supported values: range, sticky, roundrobin. Default is range "sticky"
escapeHeaders N Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is false. true

The secretKeyRef above is referencing a kubernetes secrets store to access the tls information. Visit here to learn more about how to configure a secret store component.

Note

The metadata version must be set to 1.0.0 when using Azure EventHubs with Kafka.

Authentication

Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the authRequired field has been deprecated from the v1.6 release and instead the authType field should be used. If authRequired is set to true, Dapr will attempt to configure authType correctly based on the value of saslPassword. The valid values for authType are:

  • none
  • password
  • certificate
  • mtls
  • oidc
  • awsiam

None

Setting authType to none will disable any authentication. This is NOT recommended in production.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-noauth
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "none"
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: heartbeatInterval # Optional.
    value: 5s
  - name: sessionTimeout # Optional.
    value: 15s
  - name: version # Optional.
    value: 0.10.2.0
  - name: disableTls
    value: "true"

SASL Password

Setting authType to password enables SASL authentication. This requires setting the saslUsername and saslPassword fields.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-sasl
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "password"
  - name: saslUsername # Required if authType is `password`.
    value: "adminuser"
  - name: saslPassword # Required if authType is `password`.
    secretKeyRef:
      name: kafka-secrets
      key: saslPasswordSecret
  - name: saslMechanism
    value: "SHA-512"
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: heartbeatInterval # Optional.
    value: 5s
  - name: sessionTimeout # Optional.
    value: 15s
  - name: version # Optional.
    value: 0.10.2.0
  - name: caCert
    secretKeyRef:
      name: kafka-tls
      key: caCert

Mutual TLS

Setting authType to mtls uses a x509 client certificate (the clientCert field) and key (the clientKey field) to authenticate. Note that mTLS as an authentication mechanism is distinct from using TLS to secure the transport layer via encryption. mTLS requires TLS transport (meaning disableTls must be false), but securing the transport layer does not require using mTLS. See Communication using TLS for configuring underlying TLS transport.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-mtls
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "mtls"
  - name: caCert
    secretKeyRef:
      name: kafka-tls
      key: caCert
  - name: clientCert
    secretKeyRef:
      name: kafka-tls
      key: clientCert
  - name: clientKey
    secretKeyRef:
      name: kafka-tls
      key: clientKey
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: heartbeatInterval # Optional.
    value: 5s
  - name: sessionTimeout # Optional.
    value: 15s
  - name: version # Optional.
    value: 0.10.2.0

OAuth2 or OpenID Connect

Setting authType to oidc enables SASL authentication via the OAUTHBEARER mechanism. This supports specifying a bearer token from an external OAuth2 or OIDC identity provider. Currently, only the client_credentials grant is supported.

Configure oidcTokenEndpoint to the full URL for the identity provider access token endpoint.

Set oidcClientID and oidcClientSecret to the client credentials provisioned in the identity provider.

If caCert is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if skipVerify is specified in the component configuration, verification will also be skipped when accessing the identity provider.

By default, the only scope requested for the token is openid; it is highly recommended that additional scopes be specified via oidcScopes in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token, a compromised Kafka broker could replay the token to access other services as the Dapr clientID.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "oidc"
  - name: oidcTokenEndpoint # Required if authType is `oidc`.
    value: "https://identity.example.com/v1/token"
  - name: oidcClientID      # Required if authType is `oidc`.
    value: "dapr-myapp"
  - name: oidcClientSecret  # Required if authType is `oidc`.
    secretKeyRef:
      name: kafka-secrets
      key: oidcClientSecret
  - name: oidcScopes        # Recommended if authType is `oidc`.
    value: "openid,kafka-dev"
  - name: caCert            # Also applied to verifying OIDC provider certificate
    secretKeyRef:
      name: kafka-tls
      key: caCert
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: heartbeatInterval # Optional.
    value: 5s
  - name: sessionTimeout # Optional.
    value: 15s
  - name: version # Optional.
    value: 0.10.2.0

AWS IAM

Authenticating with AWS IAM is supported with MSK. Setting authType to awsiam uses AWS SDK to generate auth tokens to authenticate.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-awsiam
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "awsiam"
  - name: region # Required.
    value: "us-west-1"
  - name: accessKey # Optional.
    value: <AWS_ACCESS_KEY>
  - name: secretKey # Optional.
    value: <AWS_SECRET_KEY>
  - name: sessionToken # Optional.
    value: <AWS_SESSION_KEY>
  - name: assumeRoleArn # Optional.
    value: "arn:aws:iam::123456789:role/mskRole"
  - name: sessionName # Optional.
    value: "DaprDefaultSession"

Communication using TLS

By default TLS is enabled to secure the transport layer to Kafka. To disable TLS, set disableTls to true. When TLS is enabled, you can control server certificate verification using skipVerify to disable verification (NOT recommended in production environments) and caCert to specify a trusted TLS certificate authority (CA). If no caCert is specified, the system CA trust will be used. To also configure mTLS authentication, see the section under Authentication. Below is an example of a Kafka pubsub component configured to use transport layer TLS:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "certificate"
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: heartbeatInterval # Optional.
    value: 5s
  - name: sessionTimeout # Optional.
    value: 15s
  - name: version # Optional.
    value: 0.10.2.0
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: caCert # Certificate authority certificate.
    secretKeyRef:
      name: kafka-tls
      key: caCert
auth:
  secretStore: <SECRET_STORE_NAME>

Consuming from multiple topics

When consuming from multiple topics using a single pub/sub component, there is no guarantee about how the consumers in your consumer group are balanced across the topic partitions.

For instance, let’s say you are subscribing to two topics with 10 partitions per topic and you have 20 replicas of your service consuming from the two topics. There is no guarantee that 10 will be assigned to the first topic and 10 to the second topic. Instead, the partitions could be divided unequally, with more than 10 assigned to the first topic and the rest assigned to the second topic.

This can result in idle consumers listening to the first topic and over-extended consumers on the second topic, or vice versa. This same behavior can be observed when using auto-scalers such as HPA or KEDA.

If you run into this particular issue, it is recommended that you configure a single pub/sub component per topic with uniquely defined consumer groups per component. This guarantees that all replicas of your service are fully allocated to the unique consumer group, where each consumer group targets one specific topic.

For example, you may define two Dapr components with the following configuration:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-topic-one
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: consumerGroup
    value: "{appID}-topic-one"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-topic-two
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: consumerGroup
    value: "{appID}-topic-two"

Sending and receiving multiple messages

Apache Kafka component supports sending and receiving multiple messages in a single operation using the bulk Pub/sub API.

Configuring bulk subscribe

When subscribing to a topic, you can configure bulkSubscribe options. Refer to Subscribing messages in bulk for more details. Learn more about the bulk subscribe API.

Apache Kafka supports the following bulk metadata options:

Configuration Default
maxAwaitDurationMs 10000 (10s)
maxMessagesCount 80

Per-call metadata fields

Partition Key

When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the metadata query param in the request url.

The param name can either be partitionKey or __key

Example:

curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

Message headers

All other metadata key/value pairs (that are not partitionKey or __key) are set as headers in the Kafka message. Here is an example setting a correlationId for the message.

curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

Kafka Pubsub special message headers received on consumer side

When consuming messages, special message metadata are being automatically passed as headers. These are:

  • __key: the message key if available
  • __topic: the topic for the message
  • __partition: the partition number for the message
  • __offset: the offset of the message in the partition
  • __timestamp: the timestamp for the message

You can access them within the consumer endpoint as follows:

from fastapi import APIRouter, Body, Response, status
import json
import sys

app = FastAPI()

router = APIRouter()


@router.get('/dapr/subscribe')
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'my-topic',
                      'route': 'my_topic_subscriber',
                      }]
    return subscriptions

@router.post('/my_topic_subscriber')
def my_topic_subscriber(
      key: Annotated[str, Header(alias="__key")],
      offset: Annotated[int, Header(alias="__offset")],
      event_data=Body()):
    print(f"key={key} - offset={offset} - data={event_data}", flush=True)
      return Response(status_code=status.HTTP_200_OK)

app.include_router(router)

Receiving message headers with special characters

The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. HTTP header values must follow specifications, making some characters not allowed. Learn more about the protocols. In this case, you can enable escapeHeaders configuration setting, which uses URL escaping to encode header values on the consumer side.

Set escapeHeaders to true to URL escape.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-escape-headers
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authType # Required.
    value: "none"
  - name: escapeHeaders
    value: "true"

Avro Schema Registry serialization/deserialization

You can configure pub/sub to publish or consume data encoded using Avro binary serialization, leveraging an Apache Schema Registry (for example, Confluent Schema Registry, Apicurio).

Configuration

When configuring the Kafka pub/sub component metadata, you must define:

  • The schema registry URL
  • The API key/secret, if applicable

Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named my-topic, the schema subject will be my-topic-value. When interacting with the message payload within the service, it is in JSON format. The payload is transparently serialized/deserialized within the Dapr component. Date/Datetime fields must be passed as their Epoch Unix timestamp equivalent (rather than typical Iso8601). For example:

  • 2024-01-10T04:36:05.986Z should be passed as 1704861365986 (the number of milliseconds since Jan 1st, 1970)
  • 2024-01-10 should be passed as 19732 (the number of days since Jan 1st, 1970)

Publishing Avro messages

In order to indicate to the Kafka pub/sub component that the message should be using Avro serialization, the valueSchemaType metadata must be set to Avro.

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order_number': '345',
        'created_date': 1704861365986
    }
    # Create a typed message with content type and body
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic_name='my-topic',
        data=json.dumps(req_data),
        publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
    )
    # Print the request
    print(req_data, flush=True)

Subscribing to Avro topics

In order to indicate to the Kafka pub/sub component that the message should be deserialized using Avro, the valueSchemaType metadata must be set to Avro in the subscription metadata.

from fastapi import APIRouter, Body, Response, status
import json
import sys

app = FastAPI()

router = APIRouter()


@router.get('/dapr/subscribe')
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'my-topic',
                      'route': 'my_topic_subscriber',
                      'metadata': {
                          'valueSchemaType': 'Avro',
                      } }]
    return subscriptions

@router.post('/my_topic_subscriber')
def my_topic_subscriber(event_data=Body()):
    print(event_data, flush=True)
      return Response(status_code=status.HTTP_200_OK)

app.include_router(router)

Overriding default consumer group rebalancing

In Kafka, rebalancing strategies determine how partitions are assigned to consumers within a consumer group. The default strategy is “range”, but “roundrobin” and “sticky” are also available.

  • Range: Partitions are assigned to consumers based on their lexicographical order. If you have three partitions (0, 1, 2) and two consumers (A, B), consumer A might get partitions 0 and 1, while consumer B gets partition 2.
  • RoundRobin: Partitions are assigned to consumers in a round-robin fashion. With the same example above, consumer A might get partitions 0 and 2, while consumer B gets partition 1.
  • Sticky: This strategy aims to preserve previous assignments as much as possible while still maintaining a balanced distribution. If a consumer leaves or joins the group, only the affected partitions are reassigned, minimizing disruption.

Choosing a Strategy:

  • Range: Simple to understand and implement, but can lead to uneven distribution if partition sizes vary significantly.
  • RoundRobin: Provides a good balance in many cases, but might not be optimal if message keys are unevenly distributed.
  • Sticky: Generally preferred for its ability to minimize disruption during rebalances, especially when dealing with a large number of partitions or frequent consumer group changes.

Create a Kafka instance

You can run Kafka locally using this Docker image. To run without Docker, see the getting started guide here.

To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.

2 - AWS SNS/SQS

Detailed documentation on the AWS SNS/SQS pubsub component

Component format

To set up AWS SNS/SQS pub/sub, create a component of type pubsub.aws.snssqs.

By default, the AWS SNS/SQS component:

  • Generates the SNS topics
  • Provisions the SQS queues
  • Configures a subscription of the queues to the topics
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: snssqs-pubsub
spec:
  type: pubsub.aws.snssqs
  version: v1
  metadata:
    - name: accessKey
      value: "AKIAIOSFODNN7EXAMPLE"
    - name: secretKey
      value: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    - name: region
      value: "us-east-1"
    # - name: consumerID # Optional. If not supplied, runtime will create one.
    #   value: "channel1"
    # - name: endpoint # Optional. 
    #   value: "http://localhost:4566"
    # - name: sessionToken  # Optional (mandatory if using AssignedRole; for example, temporary accessKey and secretKey)
    #   value: "TOKEN"
    # - name: messageVisibilityTimeout # Optional
    #   value: 10
    # - name: messageRetryLimit # Optional
    #   value: 10
    # - name: messageReceiveLimit # Optional
    #   value: 10
    # - name: sqsDeadLettersQueueName # Optional
    # - value: "myapp-dlq"
    # - name: messageWaitTimeSeconds # Optional
    #   value: 1
    # - name: messageMaxNumber # Optional
    #   value: 10
    # - name: fifo # Optional
    #   value: "true"
    # - name: fifoMessageGroupID # Optional
    #   value: "app1-mgi"
    # - name: disableEntityManagement # Optional
    #   value: "false"
    # - name: disableDeleteOnRetryLimit # Optional
    #   value: "false"
    # - name: assetsManagementTimeoutSeconds # Optional
    #   value: 5
    # - name: concurrencyMode # Optional
    #   value: "single"
    # - name: concurrencyLimit # Optional
    #   value: "0"

Spec metadata fields

Field Required Details Example
accessKey Y ID of the AWS account/role with appropriate permissions to SNS and SQS (see below) "AKIAIOSFODNN7EXAMPLE"
secretKey Y Secret for the AWS user/role. If using an AssumeRole access, you will also need to provide a sessionToken "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
region Y The AWS region where the SNS/SQS assets are located or be created in. See this page for valid regions. Ensure that SNS and SQS are available in that region "us-east-1"
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
endpoint N AWS endpoint for the component to use. Only used for local development with, for example, localstack. The endpoint is unnecessary when running against production AWS "http://localhost:4566"
sessionToken N AWS session token to use. A session token is only required if you are using temporary security credentials "TOKEN"
messageReceiveLimit N Number of times a message is received, after processing of that message fails, that once reached, results in removing of that message from the queue. If sqsDeadLettersQueueName is specified, messageReceiveLimit is the number of times a message is received, after processing of that message fails, that once reached, results in moving of the message to the SQS dead-letters queue. Default: 10 10
sqsDeadLettersQueueName N Name of the dead letters queue for this application "myapp-dlq"
messageVisibilityTimeout N Amount of time in seconds that a message is hidden from receive requests after it is sent to a subscriber. Default: 10 10
messageRetryLimit N Number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10 10
messageWaitTimeSeconds N The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than messageWaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. Default: 1 1
messageMaxNumber N Maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10 10
fifo N Use SQS FIFO queue to provide message ordering and deduplication. Default: "false". See further details about SQS FIFO "true", "false"
fifoMessageGroupID N If fifo is enabled, instructs Dapr to use a custom Message Group ID for the pubsub deployment. This is not mandatory as Dapr creates a custom Message Group ID for each producer, thus ensuring ordering of messages per a Dapr producer. Default: "" "app1-mgi"
disableEntityManagement N When set to true, SNS topics, SQS queues and the SQS subscriptions to SNS do not get created automatically. Default: "false" "true", "false"
disableDeleteOnRetryLimit N When set to true, after retrying and failing of messageRetryLimit times processing a message, reset the message visibility timeout so that other consumers can try processing, instead of deleting the message from SQS (the default behvior). Default: "false" "true", "false"
assetsManagementTimeoutSeconds N Amount of time in seconds, for an AWS asset management operation, before it times out and cancelled. Asset management operations are any operations performed on STS, SNS and SQS, except message publish and consume operations that implement the default Dapr component retry behavior. The value can be set to any non-negative float/integer. Default: 5 0.5, 10
concurrencyMode N When messages are received in bulk from SQS, call the subscriber sequentially (“single” message at a time), or concurrently (in “parallel”). Default: "parallel" "single", "parallel"
concurrencyLimit N Defines the maximum number of concurrent workers handling messages. This value is ignored when concurrencyMode is set to "single". To avoid limiting the number of concurrent workers, set this to 0. Default: 0 100

Additional info

Conforming with AWS specifications

Dapr created SNS topic and SQS queue names conform with AWS specifications. By default, Dapr creates an SQS queue name based on the consumer app-id, therefore Dapr might perform name standardization to meet with AWS specifications.

SNS/SQS component behavior

When the pub/sub SNS/SQS component provisions SNS topics, the SQS queues and the subscription behave differently in situations where the component is operating on behalf of a message producer (with no subscriber app deployed), than in situations where a subscriber app is present (with no publisher deployed).

Due to how SNS works without SQS subscription in publisher only setup, the SQS queues and the subscription behave as a “classic” pub/sub system that relies on subscribers listening to topic messages. Without those subscribers, messages:

  • Cannot be passed onwards and are effectively dropped
  • Are not available for future subscribers (no replay of message when the subscriber finally subscribes)

SQS FIFO

Using SQS FIFO (fifo metadata field set to "true") per AWS specifications provides message ordering and deduplication, but incurs a lower SQS processing throughput, among other caveats.

Specifying fifoMessageGroupID limits the number of concurrent consumers of the FIFO queue used to only one but guarantees global ordering of messages published by the app’s Dapr sidecars. See this AWS blog post to better understand the topic of Message Group IDs and FIFO queues.

To avoid losing the order of messages delivered to consumers, the FIFO configuration for the SQS Component requires the concurrencyMode metadata field set to "single".

Default parallel concurrencyMode

Since v1.8.0, the component supports the "parallel" concurrencyMode as its default mode. In prior versions, the component default behavior was calling the subscriber a single message at a time and waiting for its response.

SQS dead-letter Queues

When configuring the PubSub component with SQS dead-letter queues, the metadata fields messageReceiveLimit and sqsDeadLettersQueueName must both be set to a value. For messageReceiveLimit, the value must be greater than 0 and the sqsDeadLettersQueueName must not be empty string.

SNS/SQS Contention with Dapr

Fundamentally, SNS aggregates messages from multiple publisher topics into a single SQS queue by creating SQS subscriptions to those topics. As a subscriber, the SNS/SQS pub/sub component consumes messages from that sole SQS queue.

However, like any SQS consumer, the component cannot selectively retrieve the messages published to the SNS topics to which it is specifically subscribed. This can result in the component receiving messages originating from topics without associated handlers. Typically, this occurs during:

  • Component initialization: If infrastructure subscriptions are ready before component subscription handlers, or
  • Shutdown: If component handlers are removed before infrastructure subscriptions.

Since this issue affects any SQS consumer of multiple SNS topics, the component cannot prevent consuming messages from topics lacking handlers. When this happens, the component logs an error indicating such messages were erroneously retrieved.

In these situations, the unhandled messages would reappear in SQS with their receive count decremented after each pull. Thus, there is a risk that an unhandled message could exceed its messageReceiveLimit and be lost.

Create an SNS/SQS instance

For local development, the localstack project is used to integrate AWS SNS/SQS. Follow these instructions to run localstack.

To run localstack locally from the command line using Docker, apply the following cmd:

docker run --rm -it -p 4566:4566 -p 4571:4571 -e SERVICES="sts,sns,sqs" -e AWS_DEFAULT_REGION="us-east-1" localstack/localstack

In order to use localstack with your pub/sub binding, you need to provide the endpoint configuration in the component metadata. The endpoint is unnecessary when running against production AWS.

See Authenticating to AWS for information about authentication-related attributes.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: snssqs-pubsub
spec:
  type: pubsub.aws.snssqs
  version: v1
  metadata:
    - name: accessKey
      value: "anyString"
    - name: secretKey
      value: "anyString"
    - name: endpoint
      value: http://localhost:4566
    # Use us-east-1 or any other region if provided to localstack as defined by "AWS_DEFAULT_REGION" envvar
    - name: region
      value: us-east-1

To run localstack on Kubernetes, you can apply the configuration below. Localstack is then reachable at the DNS name http://localstack.default.svc.cluster.local:4566 (assuming this was applied to the default namespace), which should be used as the endpoint.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: localstack
spec:
  # using the selector, we will expose the running deployments
  # this is how Kubernetes knows, that a given service belongs to a deployment
  selector:
    matchLabels:
      app: localstack
  replicas: 1
  template:
    metadata:
      labels:
        app: localstack
    spec:
      containers:
      - name: localstack
        image: localstack/localstack:latest
        ports:
          # Expose the edge endpoint
          - containerPort: 4566
---
kind: Service
apiVersion: v1
metadata:
  name: localstack
  labels:
    app: localstack
spec:
  selector:
    app: localstack
  ports:
  - protocol: TCP
    port: 4566
    targetPort: 4566
  type: LoadBalancer

In order to run in AWS, create or assign an IAM user with permissions to the SNS and SQS services, with a policy like:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "YOUR_POLICY_NAME",
      "Effect": "Allow",
      "Action": [
        "sns:CreateTopic",
        "sns:GetTopicAttributes",
        "sns:ListSubscriptionsByTopic",
        "sns:Publish",
        "sns:Subscribe",
        "sns:TagResource",
        "sqs:ChangeMessageVisibility",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ReceiveMessage",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue"
      ],
      "Resource": [
        "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:*",
        "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:*"
      ]
    }
  ]
}

Plug the AWS account ID and AWS account secret into the accessKey and secretKey in the component metadata, using Kubernetes secrets and secretKeyRef.

Alternatively, let’s say you want to provision the SNS and SQS assets using your own tool of choice (for example, Terraform) while preventing Dapr from doing so dynamically. You need to enable disableEntityManagement and assign your Dapr-using application with an IAM Role, with a policy like:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "YOUR_POLICY_NAME",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sns:Publish",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes"

      ],
      "Resource": [
        "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:APP_TOPIC_NAME",
        "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:APP_ID"
      ]
    }
  ]
}

In the above example, you are running your applications on an EKS cluster with dynamic assets creation (the default Dapr behavior).

3 - Azure Event Hubs

Detailed documentation on the Azure Event Hubs pubsub component

Component format

To set up an Azure Event Hubs pub/sub, create a component of type pubsub.azure.eventhubs. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

Apart from the configuration metadata fields shown below, Azure Event Hubs also supports Azure Authentication mechanisms.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    # Either connectionString or eventHubNamespace is required
    # Use connectionString when *not* using Microsoft Entra ID
    - name: connectionString
      value: "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}"
    # Use eventHubNamespace when using Microsoft Entra ID
    - name: eventHubNamespace
      value: "namespace"
    - name: consumerID # Optional. If not supplied, the runtime will create one.
      value: "channel1"
    - name: enableEntityManagement
      value: "false"
    - name: enableInOrderMessageDelivery
      value: "false"
    # The following four properties are needed only if enableEntityManagement is set to true
    - name: resourceGroupName
      value: "test-rg"
    - name: subscriptionID
      value: "value of Azure subscription ID"
    - name: partitionCount
      value: "1"
    - name: messageRetentionInDays
      value: "3"
    # Checkpoint store attributes
    - name: storageAccountName
      value: "myeventhubstorage"
    - name: storageAccountKey
      value: "112233445566778899"
    - name: storageContainerName
      value: "myeventhubstoragecontainer"
    # Alternative to passing storageAccountKey
    - name: storageConnectionString
      value: "DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<account-key>"

Spec metadata fields

Field Required Details Example
connectionString Y* Connection string for the Event Hub or the Event Hub namespace.
* Mutally exclusive with eventHubNamespace field.
* Required when not using Microsoft Entra ID Authentication
"Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}" or "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key}"
eventHubNamespace Y* The Event Hub Namespace name.
* Mutally exclusive with connectionString field.
* Required when using Microsoft Entra ID Authentication
"namespace"
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
enableEntityManagement N Boolean value to allow management of the EventHub namespace and storage account. Default: false "true", "false"
enableInOrderMessageDelivery N Input/Output Boolean value to allow messages to be delivered in the order in which they were posted. This assumes partitionKey is set when publishing or posting to ensure ordering across partitions. Default: false
storageAccountName Y Storage account name to use for the checkpoint store. "myeventhubstorage"
storageAccountKey Y* Storage account key for the checkpoint store account.
* When using Microsoft Entra ID, it’s possible to omit this if the service principal has access to the storage account too.
"112233445566778899"
storageConnectionString Y* Connection string for the checkpoint store, alternative to specifying storageAccountKey "DefaultEndpointsProtocol=https;AccountName=myeventhubstorage;AccountKey=<account-key>"
storageContainerName Y Storage container name for the storage account name. "myeventhubstoragecontainer"
resourceGroupName N Name of the resource group the Event Hub namespace is part of. Required when entity management is enabled "test-rg"
subscriptionID N Azure subscription ID value. Required when entity management is enabled "azure subscription id"
partitionCount N Number of partitions for the new Event Hub namespace. Used only when entity management is enabled. Default: "1" "2"
messageRetentionInDays N Number of days to retain messages for in the newly created Event Hub namespace. Used only when entity management is enabled. Default: "1" "90"

Microsoft Entra ID authentication

The Azure Event Hubs pub/sub component supports authentication using all Microsoft Entra ID mechanisms. For further information and the relevant component metadata fields to provide depending on the choice of Microsoft Entra ID authentication mechanism, see the docs for authenticating to Azure.

Example Configuration

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    # Azure Authentication Used
    - name: azureTenantId
      value: "***"
    - name: azureClientId
      value: "***"
    - name: azureClientSecret
      value: "***"
    - name: eventHubNamespace 
      value: "namespace"
    - name: enableEntityManagement
      value: "false"
    # The following four properties are needed only if enableEntityManagement is set to true
    - name: resourceGroupName
      value: "test-rg"
    - name: subscriptionID
      value: "value of Azure subscription ID"
    - name: partitionCount
      value: "1"
    - name: messageRetentionInDays
    # Checkpoint store attributes
    # In this case, we're using Microsoft Entra ID to access the storage account too
    - name: storageAccountName
      value: "myeventhubstorage"
    - name: storageContainerName
      value: "myeventhubstoragecontainer"

Sending and receiving multiple messages

Azure Eventhubs supports sending and receiving multiple messages in a single operation using the bulk pub/sub API.

Configuring bulk publish

To set the metadata for bulk publish operation, set the query parameters on the HTTP request or the gRPC metadata, as documented in the API reference.

Metadata Default
metadata.maxBulkPubBytes 1000000

Configuring bulk subscribe

When subscribing to a topic, you can configure bulkSubscribe options. Refer to Subscribing messages in bulk for more details and to learn more about the bulk subscribe API.

Configuration Default
maxMessagesCount 100
maxAwaitDurationMs 10000

Configuring checkpoint frequency

When subscribing to a topic, you can configure the checkpointing frequency in a partition by setting the metadata in the HTTP or gRPC subscribe request . This metadata enables checkpointing after the configured number of events within a partition event sequence. Disable checkpointing by setting the frequency to 0.

Learn more about checkpointing.

Metadata Default
metadata.checkPointFrequencyPerPartition 1

Following example shows a sample subscription file for Declarative subscription using checkPointFrequencyPerPartition metadata. Similarly, you can also pass the metadata in Programmatic subscriptions as well.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
  metadata:
    checkPointFrequencyPerPartition: 1
scopes:
- orderprocessing
- checkout

Create an Azure Event Hub

Follow the instructions on the documentation to set up Azure Event Hubs.

Because this component uses Azure Storage as checkpoint store, you will also need an Azure Storage Account. Follow the instructions on the documentation to manage the storage account access keys.

See the documentation on how to get the Event Hubs connection string (note this is not for the Event Hubs namespace).

Create consumer groups for each subscriber

For every Dapr app that wants to subscribe to events, create an Event Hubs consumer group with the name of the Dapr app ID. For example, a Dapr app running on Kubernetes with dapr.io/app-id: "myapp" will need an Event Hubs consumer group named myapp.

Note: Dapr passes the name of the consumer group to the Event Hub, so this is not supplied in the metadata.

Entity Management

When entity management is enabled in the metadata, as long as the application has the right role and permissions to manipulate the Event Hub namespace, Dapr can automatically create the Event Hub and consumer group for you.

The Evet Hub name is the topic field in the incoming request to publish or subscribe to, while the consumer group name is the name of the Dapr app which subscribes to a given Event Hub. For example, a Dapr app running on Kubernetes with name dapr.io/app-id: "myapp" requires an Event Hubs consumer group named myapp.

Entity management is only possible when using Microsoft Entra ID Authentication and not using a connection string.

Dapr passes the name of the consumer group to the Event Hub, so this is not supplied in the metadata.

Receiving custom properties

By default, Dapr does not forward custom properties. However, by setting the subscription metadata requireAllProperties to "true", you can receive custom properties as HTTP headers.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
  metadata:
    requireAllProperties: "true"

The same can be achieved using the Dapr SDK:

[Topic("order-pub-sub", "orders")]
[TopicMetadata("requireAllProperties", "true")]
[HttpPost("checkout")]
public ActionResult Checkout(Order order, [FromHeader] int priority)
{
    return Ok();
}

Subscribing to Azure IoT Hub Events

Azure IoT Hub provides an endpoint that is compatible with Event Hubs, so the Azure Event Hubs pubsub component can also be used to subscribe to Azure IoT Hub events.

The device-to-cloud events created by Azure IoT Hub devices will contain additional IoT Hub System Properties, and the Azure Event Hubs pubsub component for Dapr will return the following as part of the response metadata:

System Property Name Description & Routing Query Keyword
iothub-connection-auth-generation-id The connectionDeviceGenerationId of the device that sent the message. See IoT Hub device identity properties.
iothub-connection-auth-method The connectionAuthMethod used to authenticate the device that sent the message.
iothub-connection-device-id The deviceId of the device that sent the message. See IoT Hub device identity properties.
iothub-connection-module-id The moduleId of the device that sent the message. See IoT Hub device identity properties.
iothub-enqueuedtime The enqueuedTime in RFC3339 format that the device-to-cloud message was received by IoT Hub.
message-id The user-settable AMQP messageId.

For example, the headers of a delivered HTTP subscription message would contain:

{
  'user-agent': 'fasthttp',
  'host': '127.0.0.1:3000',
  'content-type': 'application/json',
  'content-length': '120',
  'iothub-connection-device-id': 'my-test-device',
  'iothub-connection-auth-generation-id': '637618061680407492',
  'iothub-connection-auth-method': '{"scope":"module","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}',
  'iothub-connection-module-id': 'my-test-module-a',
  'iothub-enqueuedtime': '2021-07-13T22:08:09Z',
  'message-id': 'my-custom-message-id',
  'x-opt-sequence-number': '35',
  'x-opt-enqueued-time': '2021-07-13T22:08:09Z',
  'x-opt-offset': '21560',
  'traceparent': '00-4655608164bc48b985b42d39865f3834-ed6cf3697c86e7bd-01'
}

4 - Azure Service Bus Queues

Detailed documentation on the Azure Service Bus Queues pubsub component

Component format

To set up Azure Service Bus Queues pub/sub, create a component of type pubsub.azure.servicebus.queues. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

This component uses queues on Azure Service Bus; see the official documentation for the differences between topics and queues. For using topics, see the Azure Service Bus Topics pubsub component.

Connection String Authentication

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.queues
  version: v1
  metadata:
  # Required when not using Microsoft Entra ID Authentication
  - name: connectionString
    value: "Endpoint=sb://{ServiceBusNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={ServiceBus}"
  # - name: consumerID # Optional
  #   value: channel1
  # - name: timeoutInSec # Optional
  #   value: 60
  # - name: handlerTimeoutInSec # Optional
  #   value: 60
  # - name: disableEntityManagement # Optional
  #   value: "false"
  # - name: maxDeliveryCount # Optional
  #   value: 3
  # - name: lockDurationInSec # Optional
  #   value: 60
  # - name: lockRenewalInSec # Optional
  #   value: 20
  # - name: maxActiveMessages # Optional
  #   value: 10000
  # - name: maxConcurrentHandlers # Optional
  #   value: 10
  # - name: defaultMessageTimeToLiveInSec # Optional
  #   value: 10
  # - name: autoDeleteOnIdleInSec # Optional
  #   value: 3600
  # - name: minConnectionRecoveryInSec # Optional
  #   value: 2
  # - name: maxConnectionRecoveryInSec # Optional
  #   value: 300
  # - name: maxRetriableErrorsPerSec # Optional
  #   value: 10
  # - name: publishMaxRetries # Optional
  #   value: 5
  # - name: publishInitialRetryIntervalInMs # Optional
  #   value: 500

Spec metadata fields

Field Required Details Example
connectionString Y Shared access policy connection string for the Service Bus. Required unless using Microsoft Entra ID authentication. See example above
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
namespaceName N Parameter to set the address of the Service Bus namespace, as a fully-qualified domain name. Required if using Microsoft Entra ID authentication. "namespace.servicebus.windows.net"
timeoutInSec N Timeout for sending messages and for management operations. Default: 60 30
handlerTimeoutInSec N Timeout for invoking the app’s handler. Default: 60 30
lockRenewalInSec N Defines the frequency at which buffered message locks will be renewed. Default: 20. 20
maxActiveMessages N Defines the maximum number of messages to be processing or in the buffer at once. This should be at least as big as the maximum concurrent handlers. Default: 1000 2000
maxConcurrentHandlers N Defines the maximum number of concurrent message handlers. Default: 0 (unlimited) 10
disableEntityManagement N When set to true, queues and subscriptions do not get created automatically. Default: "false" "true", "false"
defaultMessageTimeToLiveInSec N Default message time to live, in seconds. Used during subscription creation only. 10
autoDeleteOnIdleInSec N Time in seconds to wait before auto deleting idle subscriptions. Used during subscription creation only. Must be 300s or greater. Default: 0 (disabled) 3600
maxDeliveryCount N Defines the number of attempts the server will make to deliver a message. Used during subscription creation only. Default set by server. 10
lockDurationInSec N Defines the length in seconds that a message will be locked for before expiring. Used during subscription creation only. Default set by server. 30
minConnectionRecoveryInSec N Minimum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. Default: 2 5
maxConnectionRecoveryInSec N Maximum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. After each attempt, the component waits a random number of seconds, increasing every time, between the minimum and the maximum. Default: 300 (5 minutes) 600
maxRetriableErrorsPerSec N Maximum number of retriable errors that are processed per second. If a message fails to be processed with a retriable error, the component adds a delay before it starts processing another message, to avoid immediately re-processing messages that have failed. Default: 10 10
publishMaxRetries N The max number of retries for when Azure Service Bus responds with “too busy” in order to throttle messages. Defaults: 5 5
publishInitialRetryIntervalInMs N Time in milliseconds for the initial exponential backoff when Azure Service Bus throttle messages. Defaults: 500 500

Microsoft Entra ID authentication

The Azure Service Bus Queues pubsub component supports authentication using all Microsoft Entra ID mechanisms, including Managed Identities. For further information and the relevant component metadata fields to provide depending on the choice of Microsoft Entra ID authentication mechanism, see the docs for authenticating to Azure.

Example Configuration

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.queues
  version: v1
  metadata:
  - name: namespaceName
    # Required when using Azure Authentication.
    # Must be a fully-qualified domain name
    value: "servicebusnamespace.servicebus.windows.net"
  - name: azureTenantId
    value: "***"
  - name: azureClientId
    value: "***"
  - name: azureClientSecret
    value: "***"

Message metadata

Azure Service Bus messages extend the Dapr message format with additional contextual metadata. Some metadata fields are set by Azure Service Bus itself (read-only) and others can be set by the client when publishing a message.

Sending a message with metadata

To set Azure Service Bus metadata when sending a message, set the query parameters on the HTTP request or the gRPC metadata as documented here.

  • metadata.MessageId
  • metadata.CorrelationId
  • metadata.SessionId
  • metadata.Label
  • metadata.ReplyTo
  • metadata.PartitionKey
  • metadata.To
  • metadata.ContentType
  • metadata.ScheduledEnqueueTimeUtc
  • metadata.ReplyToSessionId

Receiving a message with metadata

When Dapr calls your application, it attaches Azure Service Bus message metadata to the request using either HTTP headers or gRPC metadata. In addition to the settable metadata listed above, you can also access the following read-only message metadata.

  • metadata.DeliveryCount
  • metadata.LockedUntilUtc
  • metadata.LockToken
  • metadata.EnqueuedTimeUtc
  • metadata.SequenceNumber

To find out more details on the purpose of any of these metadata properties refer to the official Azure Service Bus documentation.

In addition, all entries of ApplicationProperties from the original Azure Service Bus message are appended as metadata.<application property's name>.

Sending and receiving multiple messages

Azure Service Bus supports sending and receiving multiple messages in a single operation using the bulk pub/sub API.

Configuring bulk publish

To set the metadata for bulk publish operation, set the query parameters on the HTTP request or the gRPC metadata as documented here

Metadata Default
metadata.maxBulkPubBytes 131072 (128 KiB)

Configuring bulk subscribe

When subscribing to a topic, you can configure bulkSubscribe options. Refer to Subscribing messages in bulk for more details. Learn more about the bulk subscribe API.

Configuration Default
maxMessagesCount 100

Create an Azure Service Bus broker for queues

Follow the instructions here on setting up Azure Service Bus Queues.

Retry policy and dead-letter queues

By default, an Azure Service Bus Queue has a dead-letter queue. The messages are retried the amount given for maxDeliveryCount. The default maxDeliveryCount value defaults to 10, but can be set up to 2000. These retries happen very rapidly and the message is put in the dead-letter queue if no success is returned.

Dapr Pub/sub offers its own dead-letter queue concept that lets you control the retry policy and subscribe to the dead-letter queue through Dapr.

  1. Set up a separate queue as that dead-letter queue in the Azure Service Bus namespace, and a resilience policy that defines how to retry.
  2. Subscribe to the topic to get the failed messages and deal with them.

For example, setting up a dead-letter queue orders-dlq in the subscription and a resiliency policy lets you subscribe to the topic orders-dlq to handle failed messages.

For more details on setting up dead-letter queues, see the dead-letter article.

5 - Azure Service Bus Topics

Detailed documentation on the Azure Service Bus Topics pubsub component

Component format

To set up Azure Service Bus Topics pub/sub, create a component of type pubsub.azure.servicebus.topics. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

This component uses topics on Azure Service Bus; see the official documentation for the differences between topics and queues.
For using queues, see the Azure Service Bus Queues pubsub component.

Connection String Authentication

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
  # Required when not using Microsoft Entra ID Authentication
  - name: connectionString
    value: "Endpoint=sb://{ServiceBusNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={ServiceBus}"
  # - name: consumerID # Optional: defaults to the app's own ID
  #   value: channel1 
  # - name: timeoutInSec # Optional
  #   value: 60
  # - name: handlerTimeoutInSec # Optional
  #   value: 60
  # - name: disableEntityManagement # Optional
  #   value: "false"
  # - name: maxDeliveryCount # Optional
  #   value: 3
  # - name: lockDurationInSec # Optional
  #   value: 60
  # - name: lockRenewalInSec # Optional
  #   value: 20
  # - name: maxActiveMessages # Optional
  #   value: 10000
  # - name: maxConcurrentHandlers # Optional
  #   value: 10
  # - name: defaultMessageTimeToLiveInSec # Optional
  #   value: 10
  # - name: autoDeleteOnIdleInSec # Optional
  #   value: 3600
  # - name: minConnectionRecoveryInSec # Optional
  #   value: 2
  # - name: maxConnectionRecoveryInSec # Optional
  #   value: 300
  # - name: maxRetriableErrorsPerSec # Optional
  #   value: 10
  # - name: publishMaxRetries # Optional
  #   value: 5
  # - name: publishInitialRetryIntervalInMs # Optional
  #   value: 500

NOTE: The above settings are shared across all topics that use this component.

Spec metadata fields

Field Required Details Example
connectionString Y Shared access policy connection string for the Service Bus. Required unless using Microsoft Entra ID authentication. See example above
namespaceName N Parameter to set the address of the Service Bus namespace, as a fully-qualified domain name. Required if using Microsoft Entra ID authentication. "namespace.servicebus.windows.net"
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
timeoutInSec N Timeout for sending messages and for management operations. Default: 60 30
handlerTimeoutInSec N Timeout for invoking the app’s handler. Default: 60 30
lockRenewalInSec N Defines the frequency at which buffered message locks will be renewed. Default: 20. 20
maxActiveMessages N Defines the maximum number of messages to be processing or in the buffer at once. This should be at least as big as the maximum concurrent handlers. Default: 1000 2000
maxConcurrentHandlers N Defines the maximum number of concurrent message handlers. Default: 0 (unlimited) 10
disableEntityManagement N When set to true, queues and subscriptions do not get created automatically. Default: "false" "true", "false"
defaultMessageTimeToLiveInSec N Default message time to live, in seconds. Used during subscription creation only. 10
autoDeleteOnIdleInSec N Time in seconds to wait before auto deleting idle subscriptions. Used during subscription creation only. Must be 300s or greater. Default: 0 (disabled) 3600
maxDeliveryCount N Defines the number of attempts the server makes to deliver a message. Used during subscription creation only. Default set by server. 10
lockDurationInSec N Defines the length in seconds that a message will be locked for before expiring. Used during subscription creation only. Default set by server. 30
minConnectionRecoveryInSec N Minimum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. Default: 2 5
maxConnectionRecoveryInSec N Maximum interval (in seconds) to wait before attempting to reconnect to Azure Service Bus in case of a connection failure. After each attempt, the component waits a random number of seconds, increasing every time, between the minimum and the maximum. Default: 300 (5 minutes) 600
maxRetriableErrorsPerSec N Maximum number of retriable errors that are processed per second. If a message fails to be processed with a retriable error, the component adds a delay before it starts processing another message, to avoid immediately re-processing messages that have failed. Default: 10 10
publishMaxRetries N The max number of retries for when Azure Service Bus responds with “too busy” in order to throttle messages. Defaults: 5 5
publishInitialRetryIntervalInMs N Time in milliseconds for the initial exponential backoff when Azure Service Bus throttle messages. Defaults: 500 500

Microsoft Entra ID authentication

The Azure Service Bus Topics pubsub component supports authentication using all Microsoft Entra ID mechanisms, including Managed Identities. For further information and the relevant component metadata fields to provide depending on the choice of Microsoft Entra ID authentication mechanism, see the docs for authenticating to Azure.

Example Configuration

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: servicebus-pubsub
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
  - name: namespaceName
    # Required when using Azure Authentication.
    # Must be a fully-qualified domain name
    value: "servicebusnamespace.servicebus.windows.net"
  - name: azureTenantId
    value: "***"
  - name: azureClientId
    value: "***"
  - name: azureClientSecret
    value: "***"

Message metadata

Azure Service Bus messages extend the Dapr message format with additional contextual metadata. Some metadata fields are set by Azure Service Bus itself (read-only) and others can be set by the client when publishing a message.

Sending a message with metadata

To set Azure Service Bus metadata when sending a message, set the query parameters on the HTTP request or the gRPC metadata as documented here.

  • metadata.MessageId
  • metadata.CorrelationId
  • metadata.SessionId
  • metadata.Label
  • metadata.ReplyTo
  • metadata.PartitionKey
  • metadata.To
  • metadata.ContentType
  • metadata.ScheduledEnqueueTimeUtc
  • metadata.ReplyToSessionId

Note: The metadata.MessageId property does not set the id property of the cloud event returned by Dapr and should be treated in isolation.

NOTE: If the metadata.SessionId property is not set but the topic requires sessions then an empty session id will be used.

NOTE: The metadata.ScheduledEnqueueTimeUtc property supports the RFC1123 and RFC3339 timestamp formats.

Receiving a message with metadata

When Dapr calls your application, it will attach Azure Service Bus message metadata to the request using either HTTP headers or gRPC metadata. In addition to the settable metadata listed above, you can also access the following read-only message metadata.

  • metadata.DeliveryCount
  • metadata.LockedUntilUtc
  • metadata.LockToken
  • metadata.EnqueuedTimeUtc
  • metadata.SequenceNumber

To find out more details on the purpose of any of these metadata properties refer to the official Azure Service Bus documentation.

In addition, all entries of ApplicationProperties from the original Azure Service Bus message are appended as metadata.<application property's name>.

Note: that all times are populated by the server and are not adjusted for clock skews.

Subscribe to a session enabled topic

To subscribe to a topic that has sessions enabled you can provide the following properties in the subscription metadata.

  • requireSessions (default: false)
  • sessionIdleTimeoutInSec (default: 60)
  • maxConcurrentSessions (default: 8)

Create an Azure Service Bus broker for topics

Follow the instructions here on setting up Azure Service Bus Topics.

6 - GCP

Detailed documentation on the GCP Pub/Sub component

Create a Dapr component

To set up GCP pub/sub, create a component of type pubsub.gcp.pubsub. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: gcp-pubsub
spec:
  type: pubsub.gcp.pubsub
  version: v1
  metadata:
  - name: type
    value: service_account
  - name: projectId
    value: <PROJECT_ID> # replace
  - name: endpoint # Optional.
    value: "http://localhost:8085"
  - name: consumerID # Optional - defaults to the app's own ID
    value: <CONSUMER_ID>
  - name: identityProjectId
    value: <IDENTITY_PROJECT_ID> # replace
  - name: privateKeyId
    value: <PRIVATE_KEY_ID> #replace
  - name: clientEmail
    value: <CLIENT_EMAIL> #replace
  - name: clientId
    value: <CLIENT_ID> # replace
  - name: authUri
    value: https://accounts.google.com/o/oauth2/auth
  - name: tokenUri
    value: https://oauth2.googleapis.com/token
  - name: authProviderX509CertUrl
    value: https://www.googleapis.com/oauth2/v1/certs
  - name: clientX509CertUrl
    value: https://www.googleapis.com/robot/v1/metadata/x509/<PROJECT_NAME>.iam.gserviceaccount.com #replace PROJECT_NAME
  - name: privateKey
    value: <PRIVATE_KEY> # replace x509 cert
  - name: disableEntityManagement
    value: "false"
  - name: enableMessageOrdering
    value: "false"
  - name: orderingKey # Optional
    value: <ORDERING_KEY>
  - name: maxReconnectionAttempts # Optional
    value: 30
  - name: connectionRecoveryInSec # Optional
    value: 2
  - name: deadLetterTopic # Optional
    value: <EXISTING_PUBSUB_TOPIC>
  - name: maxDeliveryAttempts # Optional
    value: 5
  - name: maxOutstandingMessages # Optional
    value: 1000
  - name: maxOutstandingBytes # Optional
    value: 1000000000
  - name: maxConcurrentConnections # Optional
    value: 10

Spec metadata fields

Field Required Details Example
projectId Y GCP project ID myproject-123
endpoint N GCP endpoint for the component to use. Only used for local development (for example) with GCP Pub/Sub Emulator. The endpoint is unnecessary when running against the GCP production API. "http://localhost:8085"
consumerID N The Consumer ID organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. The consumerID, along with the topic provided as part of the request, are used to build the Pub/Sub subscription ID Can be set to string value (such as "channel1") or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
identityProjectId N If the GCP pubsub project is different from the identity project, specify the identity project using this attribute "myproject-123"
privateKeyId N If using explicit credentials, this field should contain the private_key_id field from the service account json document "my-private-key"
privateKey N If using explicit credentials, this field should contain the private_key field from the service account json -----BEGIN PRIVATE KEY-----MIIBVgIBADANBgkqhkiG9w0B
clientEmail N If using explicit credentials, this field should contain the client_email field from the service account json "myservice@myproject-123.iam.gserviceaccount.com"
clientId N If using explicit credentials, this field should contain the client_id field from the service account json 106234234234
authUri N If using explicit credentials, this field should contain the auth_uri field from the service account json https://accounts.google.com/o/oauth2/auth
tokenUri N If using explicit credentials, this field should contain the token_uri field from the service account json https://oauth2.googleapis.com/token
authProviderX509CertUrl N If using explicit credentials, this field should contain the auth_provider_x509_cert_url field from the service account json https://www.googleapis.com/oauth2/v1/certs
clientX509CertUrl N If using explicit credentials, this field should contain the client_x509_cert_url field from the service account json https://www.googleapis.com/robot/v1/metadata/x509/myserviceaccount%40myproject.iam.gserviceaccount.com
disableEntityManagement N When set to "true", topics and subscriptions do not get created automatically. Default: "false" "true", "false"
enableMessageOrdering N When set to "true", subscribed messages will be received in order, depending on publishing and permissions configuration. "true", "false"
orderingKey N The key provided in the request. It’s used when enableMessageOrdering is set to true to order messages based on such key. “my-orderingkey”
maxReconnectionAttempts N Defines the maximum number of reconnect attempts. Default: 30 30
connectionRecoveryInSec N Time in seconds to wait between connection recovery attempts. Default: 2 2
deadLetterTopic N Name of the GCP Pub/Sub Topic. This topic must exist before using this component. "myapp-dlq"
maxDeliveryAttempts N Maximum number of attempts to deliver the message. If deadLetterTopic is specified, maxDeliveryAttempts is the maximum number of attempts for failed processing of messages. Once that number is reached, the message will be moved to the dead-letter topic. Default: 5 5
type N DEPRECATED GCP credentials type. Only service_account is supported. Defaults to service_account service_account
maxOutstandingMessages N Maximum number of outstanding messages a given streaming-pull connection can have. Default: 1000 50
maxOutstandingBytes N Maximum number of outstanding bytes a given streaming-pull connection can have. Default: 1000000000 1000000000
maxConcurrentConnections N Maximum number of concurrent streaming-pull connections to be maintained. Default: 10 2
ackDeadline N Message acknowledgement duration deadline. Default: 20s 1m

GCP Credentials

Since the GCP Pub/Sub component uses the GCP Go Client Libraries, by default it authenticates using Application Default Credentials. This is explained further in the Authenticate to GCP Cloud services using client libraries guide.

Create a GCP Pub/Sub

For local development, the GCP Pub/Sub Emulator is used to test the GCP Pub/Sub Component. Follow these instructions to run the GCP Pub/Sub Emulator.

To run the GCP Pub/Sub Emulator locally using Docker, use the following docker-compose.yaml:

version: '3'
services:
  pubsub:
    image: gcr.io/google.com/cloudsdktool/cloud-sdk:422.0.0-emulators
    ports:
      - "8085:8085"
    container_name: gcp-pubsub
    entrypoint: gcloud beta emulators pubsub start --project local-test-prj --host-port 0.0.0.0:8085

In order to use the GCP Pub/Sub Emulator with your pub/sub binding, you need to provide the endpoint configuration in the component metadata. The endpoint is unnecessary when running against the GCP Production API.

The projectId attribute must match the --project used in either the docker-compose.yaml or Docker command.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: gcp-pubsub
spec:
  type: pubsub.gcp.pubsub
  version: v1
  metadata:
  - name: projectId
    value: "local-test-prj"
  - name: consumerID
    value: "testConsumer"
  - name: endpoint
    value: "localhost:8085"

You can use either “explicit” or “implicit” credentials to configure access to your GCP pubsub instance. If using explicit, most fields are required. Implicit relies on dapr running under a Kubernetes service account (KSA) mapped to a Google service account (GSA) which has the necessary permissions to access pubsub. In implicit mode, only the projectId attribute is needed, all other are optional.

Follow the instructions here on setting up Google Cloud Pub/Sub system.

7 - In-memory

Detailed documentation on the In Memory pubsub component

The in-memory pub/sub component operates within a single Dapr sidecar. This is primarily meant for development purposes. State is not replicated across multiple sidecars and is lost when the Dapr sidecar is restarted.

Component format

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.in-memory
  version: v1
  metadata: []

Note: in-memory does not require any specific metadata for the component to work, however spec.metadata is a required field.

8 - JetStream

Detailed documentation on the NATS JetStream component

Component format

To set up JetStream pub/sub, create a component of type pubsub.jetstream. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: jetstream-pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: jwt # Optional. Used for decentralized JWT authentication.
    value: "eyJhbGciOiJ...6yJV_adQssw5c"
  - name: seedKey # Optional. Used for decentralized JWT authentication.
    value: "SUACS34K232O...5Z3POU7BNIL4Y"
  - name: tls_client_cert # Optional. Used for TLS Client authentication.
    value: "/path/to/tls.crt"
  - name: tls_client_key # Optional. Used for TLS Client authentication.
    value: "/path/to/tls.key"
  - name: token # Optional. Used for token based authentication.
    value: "my-token"
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"
  - name: startSequence
    value: 1
  - name: startTime # In Unix format
    value: 1630349391
  - name: flowControl
    value: false
  - name: ackWait
    value: 10s
  - name: maxDeliver
    value: 5
  - name: backOff
    value: "50ms, 1s, 10s"
  - name: maxAckPending
    value: 5000
  - name: replicas
    value: 1
  - name: memoryStorage
    value: false
  - name: rateLimit
    value: 1024
  - name: heartbeat
    value: 15s
  - name: ackPolicy
    value: explicit
  - name: deliverPolicy
    value: all
  - name: domain
    value: hub
  - name: apiPrefix
    value: PREFIX

Spec metadata fields

Field Required Details Example
natsURL Y NATS server address URL "nats://localhost:4222"
jwt N NATS decentralized authentication JWT "eyJhbGciOiJ...6yJV_adQssw5c"
seedKey N NATS decentralized authentication seed key "SUACS34K232O...5Z3POU7BNIL4Y"
tls_client_cert N NATS TLS Client Authentication Certificate "/path/to/tls.crt"
tls_client_key N NATS TLS Client Authentication Key "/path/to/tls.key"
token N NATS token based authentication "my-token"
name N NATS connection name "my-conn-name"
streamName N Name of the JetStream Stream to bind to "my-stream"
durableName N Durable name "my-durable"
queueGroupName N Queue group name "my-queue"
startSequence N Start Sequence 1
startTime N Start Time in Unix format 1630349391
flowControl N Flow Control true
ackWait N Ack Wait 10s
maxDeliver N Max Deliver 15
backOff N BackOff "50ms, 1s, 5s, 10s"
maxAckPending N Max Ack Pending 5000
replicas N Replicas 3
memoryStorage N Memory Storage false
rateLimit N Rate Limit 1024
heartbeat N Heartbeat 10s
ackPolicy N Ack Policy explicit
deliverPolicy N One of: all, last, new, sequence, time all
domain N [JetStream Leafondes] HUB
apiPrefix N [JetStream Leafnodes] PREFIX

Create a NATS server

You can run a NATS Server with JetStream enabled locally using Docker:

docker run -d -p 4222:4222 nats:latest -js

You can then interact with the server using the client port: localhost:4222.

Install NATS JetStream on Kubernetes by using the helm:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install --set nats.jetstream.enabled=true my-nats nats/nats

This installs a single NATS server into the default namespace. To interact with NATS, find the service with:

kubectl get svc my-nats

For more information on helm chart settings, see the Helm chart documentation.

Create JetStream

It is essential to create a NATS JetStream for a specific subject. For example, for a NATS server running locally use:

nats -s localhost:4222 stream add myStream --subjects mySubject

Example: Competing consumers pattern

Let’s say you’d like each message to be processed by only one application or pod with the same app-id. Typically, the consumerID metadata spec helps you define competing consumers.

Since consumerID is not supported in NATS JetStream, you need to specify durableName and queueGroupName to achieve the competing consumers pattern. For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"

9 - KubeMQ

Detailed documentation on the KubeMQ pubsub component

Component format

To set up KubeMQ pub/sub, create a component of type pubsub.kubemq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kubemq-pubsub
spec:
  type: pubsub.kubemq
  version: v1
  metadata:
    - name: address
      value: localhost:50000
    - name: store
      value: false
    - name: consumerID
      value: channel1

Spec metadata fields

Field Required Details Example
address Y Address of the KubeMQ server "localhost:50000"
store N type of pubsub, true: pubsub persisted (EventsStore), false: pubsub in-memory (Events) true or false (default is false)
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
clientID N Name for client id connection sub-client-12345
authToken N Auth JWT token for connection Check out KubeMQ Authentication ew...
group N Subscriber group for load balancing g1
disableReDelivery N Set if message should be re-delivered in case of error coming from application true or false (default is false)

Create a KubeMQ broker

  1. Obtain KubeMQ Key.
  2. Wait for an email confirmation with your Key

You can run a KubeMQ broker with Docker:

docker run -d -p 8080:8080 -p 50000:50000 -p 9090:9090 -e KUBEMQ_TOKEN=<your-key> kubemq/kubemq

You can then interact with the server using the client port: localhost:50000

  1. Obtain KubeMQ Key.
  2. Wait for an email confirmation with your Key

Then Run the following kubectl commands:

kubectl apply -f https://deploy.kubemq.io/init
kubectl apply -f https://deploy.kubemq.io/key/<your-key>

Install KubeMQ CLI

Go to KubeMQ CLI and download the latest version of the CLI.

Browse KubeMQ Dashboard

Open a browser and navigate to http://localhost:8080

With KubeMQCTL installed, run the following command:

kubemqctl get dashboard

Or, with kubectl installed, run port-forward command:

kubectl port-forward svc/kubemq-cluster-api -n kubemq 8080:8080

KubeMQ Documentation

Visit KubeMQ Documentation for more information.

10 - MQTT

Detailed documentation on the MQTT pubsub component

Component format

To set up MQTT pub/sub, create a component of type pubsub.mqtt. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "tcp://[username][:password]@host.domain[:port]"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: consumerID
    value: "channel1"

Spec metadata fields

Field Required Details Example
url Y Address of the MQTT broker. Can be secretKeyRef to use a secret reference.
Use the tcp:// URI scheme for non-TLS communication.
Use the ssl:// URI scheme for TLS communication.
"tcp://[username][:password]@host.domain[:port]"
consumerID N The client ID used to connect to the MQTT broker for the consumer connection. Defaults to the Dapr app ID.
Note: if producerID is not set, -consumer is appended to this value for the consumer connection
Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
producerID N The client ID used to connect to the MQTT broker for the producer connection. Defaults to {consumerID}-producer. "myMqttProducerApp"
qos N Indicates the Quality of Service Level (QoS) of the message (more info). Defaults to 1. 0, 1, 2
retain N Defines whether the message is saved by the broker as the last known good value for a specified topic. Defaults to "false". "true", "false"
cleanSession N Sets the clean_session flag in the connection message to the MQTT broker if "true" (more info). Defaults to "false". "true", "false"
caCert Required for using TLS Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates. "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert Required for using TLS TLS client certificate in PEM format. Must be used with clientKey. "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey Required for using TLS TLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference. "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"

Enabling message delivery retries

The MQTT pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. If the service marks the message as not processed, the message won’t be acknowledged back to the broker. Only if broker resends the message, would it would be retried.

To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the MQTT pub/sub component.

There is a crucial difference between the two ways of retries:

  1. Re-delivery of unacknowledged messages is completely dependent on the broker. Dapr does not guarantee it. Some brokers like emqx, vernemq etc. support it but it not a part of MQTT3 spec.

  2. Using a retry resiliency policy makes the same Dapr sidecar retry redelivering the messages. So it is the same Dapr sidecar and the same app receiving the same message.

Communication using TLS

To configure communication using TLS, ensure that the MQTT broker (for example, mosquitto) is configured to support certificates and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "ssl://host.domain[:port]"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: myMqttClientKey
      key: myMqttClientKey
auth:
  secretStore: <SECRET_STORE_NAME>

Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Consuming a shared topic

When consuming a shared topic, each consumer must have a unique identifier. By default, the application ID is used to uniquely identify each consumer and publisher. In self-hosted mode, invoking each dapr run with a different application ID is sufficient to have them consume from the same shared topic. However, on Kubernetes, multiple instances of an application pod will share the same application ID, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s consumerID metadata with a {uuid} tag, which will give each instance a randomly generated consumerID value on start up. For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
    - name: consumerID
      value: "{uuid}"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"
    - name: cleanSession
      value: "true"

Note that in the case, the value of the consumer ID is random every time Dapr restarts, so we are setting cleanSession to true as well.

Create a MQTT broker

You can run a MQTT broker locally using Docker:

docker run -d -p 1883:1883 -p 9001:9001 --name mqtt eclipse-mosquitto:1.6

You can then interact with the server using the client port: mqtt://localhost:1883

You can run a MQTT broker in kubernetes using following yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app-name: mqtt-broker
  template:
    metadata:
      labels:
        app-name: mqtt-broker
    spec:
      containers:
        - name: mqtt
          image: eclipse-mosquitto:1.6
          imagePullPolicy: IfNotPresent
          ports:
            - name: default
              containerPort: 1883
              protocol: TCP
            - name: websocket
              containerPort: 9001
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  type: ClusterIP
  selector:
    app-name: mqtt-broker
  ports:
    - port: 1883
      targetPort: default
      name: default
      protocol: TCP
    - port: 9001
      targetPort: websocket
      name: websocket
      protocol: TCP

You can then interact with the server using the client port: tcp://mqtt-broker.default.svc.cluster.local:1883

11 - MQTT3

Detailed documentation on the MQTT3 pubsub component

Component format

To set up a MQTT3 pub/sub, create a component of type pubsub.mqtt3. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: url
      value: "tcp://[username][:password]@host.domain[:port]"
    # Optional
    - name: retain
      value: "false"
    - name: cleanSession
      value: "false"
    - name: qos
      value: "1"
    - name: consumerID
      value: "channel1"

Spec metadata fields

Field Required Details Example
url Y Address of the MQTT broker. Can be secretKeyRef to use a secret reference.
Use the tcp:// URI scheme for non-TLS communication.
Use the ssl:// URI scheme for TLS communication.
"tcp://[username][:password]@host.domain[:port]"
consumerID N The client ID used to connect to the MQTT broker. Defaults to the Dapr app ID. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
retain N Defines whether the message is saved by the broker as the last known good value for a specified topic. Defaults to "false". "true", "false"
cleanSession N Sets the clean_session flag in the connection message to the MQTT broker if "true" (more info). Defaults to "false". "true", "false"
caCert Required for using TLS Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates. See example below
clientCert Required for using TLS TLS client certificate in PEM format. Must be used with clientKey. See example below
clientKey Required for using TLS TLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference. See example below
qos N Indicates the Quality of Service Level (QoS) of the message (more info). Defaults to 1. 0, 1, 2

Communication using TLS

To configure communication using TLS, ensure that the MQTT broker (for example, emqx) is configured to support certificates and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: url
      value: "ssl://host.domain[:port]"
  # TLS configuration
    - name: caCert
      value: |
        -----BEGIN CERTIFICATE-----
        ...
        -----END CERTIFICATE-----
    - name: clientCert
      value: |
        -----BEGIN CERTIFICATE-----
        ...
        -----END CERTIFICATE-----
    - name: clientKey
      secretKeyRef:
        name: myMqttClientKey
        key: myMqttClientKey
    # Optional
    - name: retain
      value: "false"
    - name: cleanSession
      value: "false"
    - name: qos
      value: 1

Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Consuming a shared topic

When consuming a shared topic, each consumer must have a unique identifier. By default, the application ID is used to uniquely identify each consumer and publisher. In self-hosted mode, invoking each dapr run with a different application ID is sufficient to have them consume from the same shared topic. However, on Kubernetes, multiple instances of an application pod will share the same application ID, prohibiting all instances from consuming the same topic. To overcome this, configure the component’s consumerID metadata with a {uuid} tag (which will give each instance a randomly generated value on start up) or {podName} (which will use the Pod’s name on Kubernetes). For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: consumerID
      value: "{uuid}"
    - name: cleanSession
      value: "true"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"

Note that in the case, the value of the consumer ID is random every time Dapr restarts, so you should set cleanSession to true as well.

It is recommended to use StatefulSets with shared subscriptions.

Create a MQTT3 broker

You can run a MQTT broker like emqx locally using Docker:

docker run -d -p 1883:1883 --name mqtt emqx:latest

You can then interact with the server using the client port: tcp://localhost:1883

You can run a MQTT3 broker in kubernetes using following yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app-name: mqtt-broker
  template:
    metadata:
      labels:
        app-name: mqtt-broker
    spec:
      containers:
        - name: mqtt
          image: emqx:latest
          imagePullPolicy: IfNotPresent
          ports:
            - name: default
              containerPort: 1883
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-broker
  labels:
    app-name: mqtt-broker
spec:
  type: ClusterIP
  selector:
    app-name: mqtt-broker
  ports:
    - port: 1883
      targetPort: default
      name: default
      protocol: TCP

You can then interact with the server using the client port: tcp://mqtt-broker.default.svc.cluster.local:1883

12 - Pulsar

Detailed documentation on the Pulsar pubsub component

Component format

To set up Apache Pulsar pub/sub, create a component of type pubsub.pulsar. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

For more information on Apache Pulsar, read the official docs.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pulsar-pubsub
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: enableTLS
    value: "false"
  - name: tenant
    value: "public"
  - name: token
    value: "eyJrZXlJZCI6InB1bHNhci1wajU0cXd3ZHB6NGIiLCJhbGciOiJIUzI1NiJ9.eyJzd"
  - name: consumerID
    value: "channel1"
  - name: namespace
    value: "default"
  - name: persistent
    value: "true"
  - name: disableBatching
    value: "false"
  - name: receiverQueueSize
    value: "1000"
  - name: <topic-name>.jsonschema # sets a json schema validation for the configured topic
    value: |
      {
        "type": "record",
        "name": "Example",
        "namespace": "test",
        "fields": [
          {"name": "ID","type": "int"},
          {"name": "Name","type": "string"}
        ]
      }
  - name: <topic-name>.avroschema # sets an avro schema validation for the configured topic
    value: |
      {
        "type": "record",
        "name": "Example",
        "namespace": "test",
        "fields": [
          {"name": "ID","type": "int"},
          {"name": "Name","type": "string"}
        ]
      }

Spec metadata fields

Field Required Details Example
host Y Address of the Pulsar broker. Default is "localhost:6650" "localhost:6650" OR "http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080"
enableTLS N Enable TLS. Default: "false" "true", "false"
tenant N The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. Default: "public" "public"
consumerID N Used to set the subscription name or consumer ID. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
namespace N The administrative unit of the topic, which acts as a grouping mechanism for related topics. Default: "default" "default"
persistent N Pulsar supports two kinds of topics: persistent and non-persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks.
disableBatching N disable batching.When batching enabled default batch delay is set to 10 ms and default batch size is 1000 messages,Setting disableBatching: true will make the producer to send messages individually. Default: "false" "true", "false"
receiverQueueSize N Sets the size of the consumer receiver queue. Controls how many messages can be accumulated by the consumer before it is explicitly called to read messages by Dapr. Default: "1000" "1000"
batchingMaxPublishDelay N batchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Default: "10ms" "10ms", "10"
batchingMaxMessages N batchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: "1000" "1000"
batchingMaxSize N batchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: "128KB" "131072"
.jsonschema N Enforces JSON schema validation for the configured topic.
.avroschema N Enforces Avro schema validation for the configured topic.
publicKey N A public key to be used for publisher and consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value
privateKey N A private key to be used for consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value
keys N A comma delimited string containing names of Pulsar session keys. Used in conjunction with publicKey for publisher encryption
processMode N Enable processing multiple messages at once. Default: "async" "async", "sync"
subscribeType N Pulsar supports four kinds of subscription types. Default: "shared" "shared", "exclusive", "failover", "key_shared"
subscribeInitialPosition N Subscription position is the initial position which the cursor is set when start consuming. Default: "latest" "latest", "earliest"
subscribeMode N Subscription mode indicates the cursor persistence, durable subscription retains messages and persists the current position. Default: "durable" "durable", "non_durable"
partitionKey N Sets the key of the message for routing policy. Default: ""
maxConcurrentHandlers N Defines the maximum number of concurrent message handlers. Default: 100 10

Authenticate using Token

To authenticate to pulsar using a static JWT token, you can use the following metadata field:

Field Required Details Example
token N Token used for authentication. How to create Pulsar token
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "pulsar.example.com:6650"
  - name: token
    secretKeyRef:
      name: pulsar
      key:  token

Authenticate using OIDC

Since v3.0, Pulsar supports OIDC authentication. To enable OIDC authentication, you need to provide the following OAuth2 parameters to the component spec. OAuth2 authentication cannot be used in combination with token authentication. It is recommended that you use a secret reference for the client secret. The pulsar OAuth2 authenticator is not specifically complaint with OIDC so it is your responsibility to ensure fields are compliant. For example, the issuer URL must use the https protocol, the requested scopes include openid, etc. If the oauth2TokenCAPEM field is omitted then the system’s certificate pool is used for connecting to the OAuth2 issuer if using https.

Field Required Details Example
oauth2TokenURL N URL to request the OIDC client_credentials token from. Must not be empty. https://oauth.example.com/o/oauth2/token"`
oauth2TokenCAPEM N CA PEM certificate bundle to connect to the OAuth2 issuer. If not defined, the system’s certificate pool will be used. "---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
oauth2ClientID N OIDC client ID. Must not be empty. "my-client-id"
oauth2ClientSecret N OIDC client secret. Must not be empty. "my-client-secret"
oauth2Audiences N Comma separated list of audiences to request for. Must not be empty. "my-audience-1,my-audience-2"
oauth2Scopes N Comma separated list of scopes to request. Must not be empty. "openid,profile,email"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "pulsar.example.com:6650"
  - name: oauth2TokenURL
    value: https://oauth.example.com/o/oauth2/token
  - name: oauth2TokenCAPEM
    value: "---BEGIN CERTIFICATE---\n...\n---END CERTIFICATE---"
  - name: oauth2ClientID
    value: my-client-id
  - name: oauth2ClientSecret
    secretKeyRef:
      name: pulsar-oauth2
      key:  my-client-secret
  - name: oauth2Audiences
    value: "my.pulsar.example.com,another.pulsar.example.com"
  - name: oauth2Scopes
    value: "openid,profile,email"

Enabling message delivery retries

The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.

Delay queue

When invoking the Pulsar pub/sub, it’s possible to provide an optional delay queue by using the metadata query parameters in the request url.

These optional parameter names are metadata.deliverAt or metadata.deliverAfter:

  • deliverAt: Delay message to deliver at a specified time (RFC3339 format); for example, "2021-09-01T10:00:00Z"
  • deliverAfter: Delay message to deliver after a specified amount of time; for example,"4h5m3s"

Examples:

curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAt='2021-09-01T10:00:00Z' \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

Or

curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAfter='4h5m3s' \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

E2E Encryption

Dapr supports setting public and private key pairs to enable Pulsar’s end-to-end encryption feature.

Enabling publisher encryption from file certs

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value: ./public.key
  - name: keys
    value: myapp.key

Enabling consumer encryption from file certs

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value: ./public.key
  - name: privateKey
    value: ./private.key

Enabling publisher encryption from value

Note: It is recommended to reference the public key from a secret.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value:  "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
  - name: keys
    value: myapp.key

Enabling consumer encryption from value

Note: It is recommended to reference the public and private keys from a secret.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.pulsar
  version: v1
  metadata:
  - name: host
    value: "localhost:6650"
  - name: publicKey
    value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
  - name: privateKey
    value: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n"

Partition Key

When invoking the Pulsar pub/sub, it’s possible to provide an optional partition key by using the metadata query parameter in the request url.

The parameter name is partitionKey.

Example:

curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

Message headers

All other metadata key/value pairs (that are not partitionKey) are set as headers in the Pulsar message. For example, set a correlationId for the message:

curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

Order guarantee

To ensure that messages arrive in order for each consumer subscribed to a specific key, three conditions must be met.

  1. subscribeType should be set to key_shared.
  2. partitionKey must be set.
  3. processMode should be set to sync.

Create a Pulsar instance

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:2.5.1 \
  bin/pulsar standalone

Refer to the following Helm chart Documentation.

13 - RabbitMQ

Detailed documentation on the RabbitMQ pubsub component

Component format

To set up RabbitMQ pub/sub, create a component of type pubsub.rabbitmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbitmq-pubsub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: connectionString
    value: "amqp://localhost:5672"
  - name: protocol
    value: amqp  
  - name: hostname
    value: localhost 
  - name: username
    value: username
  - name: password
    value: password  
  - name: consumerID
    value: channel1
  - name: durable
    value: false
  - name: deletedWhenUnused
    value: false
  - name: autoAck
    value: false
  - name: deliveryMode
    value: 0
  - name: requeueInFailure
    value: false
  - name: prefetchCount
    value: 0
  - name: reconnectWait
    value: 0
  - name: concurrencyMode
    value: parallel
  - name: publisherConfirm
    value: false
  - name: enableDeadLetter # Optional enable dead Letter or not
    value: true
  - name: maxLen # Optional max message count in a queue
    value: 3000
  - name: maxLenBytes # Optional maximum length in bytes of a queue.
    value: 10485760
  - name: exchangeKind
    value: fanout
  - name: saslExternal
    value: false
  - name: ttlInSeconds
    value: 60
  - name: clientName
    value: {podName}
  - name: heartBeat
    value: 10s

Spec metadata fields

Field Required Details Example
connectionString Y* The RabbitMQ connection string. *Mutally exclusive with protocol, hostname, username, password field amqp://user:pass@localhost:5672
protocol N* The RabbitMQ protocol. *Mutally exclusive with connectionString field amqp
hostname N* The RabbitMQ hostname. *Mutally exclusive with connectionString field localhost
username N* The RabbitMQ username. *Mutally exclusive with connectionString field username
password N* The RabbitMQ password. *Mutally exclusive with connectionString field password
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
durable N Whether or not to use durable queues. Defaults to "false" "true", "false"
deletedWhenUnused N Whether or not the queue should be configured to auto-delete Defaults to "true" "true", "false"
autoAck N Whether or not the queue consumer should auto-ack messages. Defaults to "false" "true", "false"
deliveryMode N Persistence mode when publishing messages. Defaults to "0". RabbitMQ treats "2" as persistent, all other numbers as non-persistent "0", "2"
requeueInFailure N Whether or not to requeue when sending a negative acknowledgement in case of a failure. Defaults to "false" "true", "false"
prefetchCount N Number of messages to prefetch. Consider changing this to a non-zero value for production environments. Defaults to "0", which means that all available messages will be pre-fetched. "2"
publisherConfirm N If enabled, client waits for publisher confirms after publishing a message. Defaults to "false" "true", "false"
reconnectWait N How long to wait (in seconds) before reconnecting if a connection failure occurs "0"
concurrencyMode N parallel is the default, and allows processing multiple messages in parallel (limited by the app-max-concurrency annotation, if configured). Set to single to disable parallel processing. In most situations there’s no reason to change this. parallel, single
enableDeadLetter N Enable forwarding Messages that cannot be handled to a dead-letter topic. Defaults to "false" "true", "false"
maxLen N The maximum number of messages of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit. "1000"
maxLenBytes N Maximum length in bytes of a queue and its dead letter queue (if dead letter enabled). If both maxLen and maxLenBytes are set then both will apply; whichever limit is hit first will be enforced. Defaults to no limit. "1048576"
exchangeKind N Exchange kind of the rabbitmq exchange. Defaults to "fanout". "fanout","topic"
saslExternal N With TLS, should the username be taken from an additional field (for example, CN). See RabbitMQ Authentication Mechanisms. Defaults to "false". "true", "false"
ttlInSeconds N Set message TTL at the component level, which can be overwritten by message level TTL per request. "60"
caCert Required for using TLS Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates. "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert Required for using TLS TLS client certificate in PEM format. Must be used with clientKey. "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey Required for using TLS TLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference. "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
clientName N This RabbitMQ client-provided connection name is a custom identifier. If set, the identifier is mentioned in RabbitMQ server log entries and management UI. Can be set to {uuid}, {podName}, or {appID}, which is replaced by Dapr runtime to the real value. "app1", {uuid}, {podName}, {appID}
heartBeat N Defines the heartbeat interval with the server, detecting the aliveness of the peer TCP connection with the RabbitMQ server. Defaults to 10s . "10s"

Communication using TLS

To configure communication using TLS, ensure that the RabbitMQ nodes have TLS enabled and provide the caCert, clientCert, clientKey metadata in the component configuration. For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbitmq-pubsub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqps://localhost:5671"
  - name: consumerID
    value: myapp
  - name: durable
    value: false
  - name: deletedWhenUnused
    value: false
  - name: autoAck
    value: false
  - name: deliveryMode
    value: 0
  - name: requeueInFailure
    value: false
  - name: prefetchCount
    value: 0
  - name: reconnectWait
    value: 0
  - name: concurrencyMode
    value: parallel
  - name: publisherConfirm
    value: false
  - name: enableDeadLetter # Optional enable dead Letter or not
    value: true
  - name: maxLen # Optional max message count in a queue
    value: 3000
  - name: maxLenBytes # Optional maximum length in bytes of a queue.
    value: 10485760
  - name: exchangeKind
    value: fanout
  - name: saslExternal
    value: false
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: myRabbitMQClientKey
      key: myRabbitMQClientKey

Note that while the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Enabling message delivery retries

The RabbitMQ pub/sub component has no built-in support for retry strategies. This means that the sidecar sends a message to the service only once. When the service returns a result, the message will be marked as consumed regardless of whether it was processed correctly or not. Note that this is common among all Dapr PubSub components and not just RabbitMQ. Dapr can try redelivering a message a second time, when autoAck is set to false and requeueInFailure is set to true.

To make Dapr use more sophisticated retry policies, you can apply a retry resiliency policy to the RabbitMQ pub/sub component.

There is a crucial difference between the two ways to retry messages:

  1. When using autoAck = false and requeueInFailure = true, RabbitMQ is the one responsible for re-delivering messages and any subscriber can get the redelivered message. If you have more than one instance of your consumer, then it’s possible that another consumer will get it. This is usually the better approach because if there’s a transient failure, it’s more likely that a different worker will be in a better position to successfully process the message.
  2. Using Resiliency makes the same Dapr sidecar retry redelivering the messages. So it will be the same Dapr sidecar and the same app receiving the same message.

Create a RabbitMQ server

You can run a RabbitMQ server locally using Docker:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

You can then interact with the server using the client port: localhost:5672.

The easiest way to install RabbitMQ on Kubernetes is by using the Helm chart:

helm install rabbitmq stable/rabbitmq

Look at the chart output and get the username and password.

This will install RabbitMQ into the default namespace. To interact with RabbitMQ, find the service with: kubectl get svc rabbitmq.

For example, if installing using the example above, the RabbitMQ server client address would be:

rabbitmq.default.svc.cluster.local:5672

Use topic exchange to route messages

Setting exchangeKind to "topic" uses the topic exchanges, which are commonly used for the multicast routing of messages. In order to route messages using topic exchange, you must set the following metadata:

  • routingKey:
    Messages with a routing key are routed to one or many queues based on the routing key defined in the metadata when subscribing.

  • queueName:
    If you don’t set the queueName, only one queue is created, and all routing keys will route to that queue. This means all subscribers will bind to that queue, which won’t give the desired results.

For example, if an app is configured with a routing key keyA and queueName of queue-A:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: orderspubsub
spec:
  topic: B
  routes: 
    default: /B
  pubsubname: pubsub
  metadata:
    routingKey: keyA
    queueName: queue-A

It will receive messages with routing key keyA, and messages with other routing keys are not received.

// publish messages with routing key `keyA`, and these will be received by the above example.
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is a message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyA"}))
// publish messages with routing key `keyB`, and these will not be received by the above example.
client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another message"), dapr.PublishEventWithMetadata(map[string]string{"routingKey": "keyB"}))

Bind multiple routingKey

Multiple routing keys can be separated by commas.
The example below binds three routingKey: keyA, keyB, and "". Note the binding method of empty keys.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: orderspubsub
spec:
  topic: B
  routes: 
    default: /B
  pubsubname: pubsub
  metadata:
    routingKey: keyA,keyB,

For more information see rabbitmq exchanges.

Use priority queues

Dapr supports RabbitMQ priority queues. To set a priority for a queue, use the maxPriority topic subscription metadata.

Declarative priority queue example

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: pubsub
spec:
  topic: checkout
  routes: 
    default: /orders
  pubsubname: order-pub-sub
  metadata:
    maxPriority: 3

Programmatic priority queue example

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'checkout',
        'routes': {
          'default': '/orders'
        },
        'metadata': {'maxPriority': '3'}
      }
    ]
    return jsonify(subscriptions)
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "checkout",
      routes: {
        default: '/orders'
      },
      metadata: {
        maxPriority: '3'
      }
    }
  ]);
})
package main

	"encoding/json"
	"net/http"

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "checkout",
			Routes: routes{
				Default: "/orders",
			},
      Metadata: map[string]string{
        "maxPriority": "3"
      },
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

Setting a priority when publishing a message

To set a priority on a message, add the publish metadata key maxPriority to the publish endpoint or SDK method.

curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders?metadata.priority=3 -H "Content-Type: application/json" -d '{"orderId": "100"}'
with DaprClient() as client:
        result = client.publish_event(
            pubsub_name=PUBSUB_NAME,
            topic_name=TOPIC_NAME,
            data=json.dumps(orderId),
            data_content_type='application/json',
            metadata= { 'priority': '3' })
await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId, { 'priority': '3' });
client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)), map[string]string{"priority": "3"})

Use quorum queues

By default, Dapr creates classic queues. To create quorum queues, add the following metadata to your pub/sub subscription

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: pubsub
spec:
  topic: checkout
  routes: 
    default: /orders
  pubsubname: order-pub-sub
  metadata:
    queueType: quorum

Time-to-live

You can set a time-to-live (TTL) value at either the message or component level. Set default component-level TTL using the component spec ttlInSeconds field in your component.

Single Active Consumer

The RabbitMQ Single Active Consumer setup ensures that only one consumer at a time processes messages from a queue and switches to another registered consumer if the active one is canceled or fails. This approach might be required when it is crucial for messages to be consumed in the exact order they arrive in the queue and if distributed processing with multiple instances is not supported. When this option is enabled on a queue by Dapr, an instance of the Dapr runtime will be the single active consumer. To allow another application instance to take over in case of failure, Dapr runtime must probe the application’s health and unsubscribe from the pub/sub component.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: pubsub
spec:
  topic: orders
  routes:
    default: /orders
  pubsubname: order-pub-sub
  metadata:
    singleActiveConsumer: "true"

14 - Redis Streams

Detailed documentation on the Redis Streams pubsub component

Component format

To set up Redis Streams pub/sub, create a component of type pubsub.redis. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: "KeFg23!"
  - name: consumerID
    value: "channel1"
  - name: useEntraID
    value: "true"
  - name: enableTLS
    value: "false"

Spec metadata fields

Field Required Details Example
redisHost Y Connection-string for the redis host. If "redisType" is "cluster" it can be multiple hosts separated by commas or just a single host localhost:6379, redis-master.default.svc.cluster.local:6379
redisPassword N Password for Redis host. No Default. Can be secretKeyRef to use a secret reference "", "KeFg23!"
redisUsername N Username for Redis host. Defaults to empty. Make sure your redis server version is 6 or above, and have created acl rule correctly. "", "default"
consumerID N The consumer group ID. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
useEntraID N Implements EntraID support for Azure Cache for Redis. Before enabling this:
  • The redisHost name must be specified in the form of "server:port"
  • TLS must be enabled
Learn more about this setting under Create a Redis instance > Azure Cache for Redis
"true", "false"
enableTLS N If the Redis instance supports TLS with public certificates, can be configured to be enabled or disabled. Defaults to "false" "true", "false"
clientCert N The content of the client certificate, used for Redis instances that require client-side certificates. Must be used with clientKey and enableTLS must be set to true. It is recommended to use a secret store as described here "----BEGIN CERTIFICATE-----\nMIIC..."
clientKey N The content of the client private key, used in conjunction with clientCert for authentication. It is recommended to use a secret store as described here "----BEGIN PRIVATE KEY-----\nMIIE..."
redeliverInterval N The interval between checking for pending messages to redeliver. Can use either be Go duration string (for example “ms”, “s”, “m”) or milliseconds number. Defaults to "60s". "0" disables redelivery. "30s", "5000"
processingTimeout N The amount time that a message must be pending before attempting to redeliver it. Can use either be Go duration string ( for example “ms”, “s”, “m”) or milliseconds number. Defaults to "15s". "0" disables redelivery. "60s", "600000"
queueDepth N The size of the message queue for processing. Defaults to "100". "1000"
concurrency N The number of concurrent workers that are processing messages. Defaults to "10". "15"
redisType N The type of redis. There are two valid values, one is "node" for single node mode, the other is "cluster" for redis cluster mode. Defaults to "node". "cluster"
redisDB N Database selected after connecting to redis. If "redisType" is "cluster" this option is ignored. Defaults to "0". "0"
redisMaxRetries N Maximum number of times to retry commands before giving up. Default is to not retry failed commands. "5"
redisMinRetryInterval N Minimum backoff for redis commands between each retry. Default is "8ms"; "-1" disables backoff. "8ms"
redisMaxRetryInterval N Maximum backoff for redis commands between each retry. Default is "512ms";"-1" disables backoff. "5s"
dialTimeout N Dial timeout for establishing new connections. Defaults to "5s". "5s"
readTimeout N Timeout for socket reads. If reached, redis commands will fail with a timeout instead of blocking. Defaults to "3s", "-1" for no timeout. "3s"
writeTimeout N Timeout for socket writes. If reached, redis commands will fail with a timeout instead of blocking. Defaults is readTimeout. "3s"
poolSize N Maximum number of socket connections. Default is 10 connections per every CPU as reported by runtime.NumCPU. "20"
poolTimeout N Amount of time client waits for a connection if all connections are busy before returning an error. Default is readTimeout + 1 second. "5s"
maxConnAge N Connection age at which the client retires (closes) the connection. Default is to not close aged connections. "30m"
minIdleConns N Minimum number of idle connections to keep open in order to avoid the performance degradation associated with creating new connections. Defaults to "0". "2"
idleCheckFrequency N Frequency of idle checks made by idle connections reaper. Default is "1m". "-1" disables idle connections reaper. "-1"
idleTimeout N Amount of time after which the client closes idle connections. Should be less than server’s timeout. Default is "5m". "-1" disables idle timeout check. "10m"
failover N Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false" "true", "false"
sentinelMasterName N The sentinel master name. See Redis Sentinel Documentation "", "127.0.0.1:6379"
maxLenApprox N Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited. "10000"
streamTTL N TTL duration for stream entries. Entries older than this duration will be evicted. This is an approximate value, as it’s implemented using Redis stream’s MINID trimming with the ‘~’ modifier. The actual retention may include slightly more entries than strictly defined by the TTL, as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries. "30d"

Create a Redis instance

Dapr can use any Redis instance - containerized, running on your local dev machine, or a managed cloud service, provided the version of Redis is 5.x or 6.x.

The Dapr CLI will automatically create and setup a Redis Streams instance for you. The Redis instance will be installed via Docker when you run dapr init, and the component file will be created in default directory. ($HOME/.dapr/components directory (Mac/Linux) or %USERPROFILE%\.dapr\components on Windows).

You can use Helm to quickly create a Redis instance in our Kubernetes cluster. This approach requires Installing Helm.

  1. Install Redis into your cluster.

    helm repo add bitnami https://charts.bitnami.com/bitnami
    helm install redis bitnami/redis --set image.tag=6.2
    
  2. Run kubectl get pods to see the Redis containers now running in your cluster.

  3. Add redis-master:6379 as the redisHost in your redis.yaml file. For example:

        metadata:
        - name: redisHost
          value: redis-master:6379
    
  4. Next, we’ll get our Redis password, which is slightly different depending on the OS we’re using:

    • Windows: Run kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" > encoded.b64, which will create a file with your encoded password. Next, run certutil -decode encoded.b64 password.txt, which will put your redis password in a text file called password.txt. Copy the password and delete the two files.

    • Linux/MacOS: Run kubectl get secret --namespace default redis -o jsonpath="{.data.redis-password}" | base64 --decode and copy the outputted password.

    Add this password as the redisPassword value in your redis.yaml file. For example:

        - name: redisPassword
          value: "lhDOkwTlp0"
    
  1. Create an Azure Cache for Redis instance using the official Microsoft documentation.

  2. Once your instance is created, grab the Host name (FQDN) and your access key from the Azure portal.

    • For the Host name:
      • Navigate to the resource’s Overview page.
      • Copy the Host name value.
    • For your access key:
      • Navigate to Settings > Access Keys.
      • Copy and save your key.
  3. Add your key and your host name to a redis.yaml file that Dapr can apply to your cluster.

    • If you’re running a sample, add the host and key to the provided redis.yaml.
    • If you’re creating a project from the ground up, create a redis.yaml file as specified in the Component format section.
  4. Set the redisHost key to [HOST NAME FROM PREVIOUS STEP]:6379 and the redisPassword key to the key you saved earlier.

    Note: In a production-grade application, follow secret management instructions to securely manage your secrets.

  5. Enable EntraID support:

    • Enable Entra ID authentication on your Azure Redis server. This may takes a few minutes.
    • Set useEntraID to "true" to implement EntraID support for Azure Cache for Redis.
  6. Set enableTLS to "true" to support TLS.

Note:useEntraID assumes that either your UserPrincipal (via AzureCLICredential) or the SystemAssigned managed identity have the RedisDataOwner role permission. If a user-assigned identity is used, you need to specify the azureClientID property.

15 - RocketMQ

Detailed documentation on the RocketMQ pubsub component

Component format

To set up RocketMQ pub/sub, create a component of type pubsub.rocketmq. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rocketmq-pubsub
spec:
  type: pubsub.rocketmq
  version: v1
  metadata:
    - name: instanceName
      value: dapr-rocketmq-test
    - name: consumerGroup
      value: dapr-rocketmq-test-g-c
    - name: producerGroup 
      value: dapr-rocketmq-test-g-p
    - name: consumerID
      value: channel1
    - name: nameSpace
      value: dapr-test
    - name: nameServer
      value: "127.0.0.1:9876,127.0.0.2:9876"
    - name: retries
      value: 3
    - name: consumerModel
      value: "clustering"
    - name: consumeOrderly
      value: false

Spec metadata fields

Field Required Details default Example
instanceName N Instance name time.Now().String() dapr-rocketmq-test
consumerGroup N Consumer group name. Recommend. If producerGroup is nullgroupName is used. dapr-rocketmq-test-g-c
producerGroup (consumerID) N Producer group name. Recommended. If producerGroup is nullconsumerID is used. If consumerID also is null, groupName is used. dapr-rocketmq-test-g-p
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
groupName N Consumer/Producer group name. Depreciated. dapr-rocketmq-test-g
nameSpace N RocketMQ namespace dapr-rocketmq
nameServerDomain N RocketMQ name server domain https://my-app.net:8080/nsaddr
nameServer N RocketMQ name server, separated by “,” or “;” 127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKey N Access Key (Username) "admin"
secretKey N Secret Key (Password) "password"
securityToken N Security Token
retries N Number of retries to send a message to broker 3 3
producerQueueSelector (queueSelector) N Producer Queue selector. There are five implementations of queue selector: hash, random, manual, roundRobin, dapr. dapr hash
consumerModel N Message model that defines how messages are delivered to each consumer client. RocketMQ supports two message models: clustering and broadcasting. clustering broadcasting , clustering
fromWhere (consumeFromWhere) N Consuming point on consumer booting. There are three consuming points: CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM_TIMESTAMP CONSUME_FROM_LAST_OFFSET CONSUME_FROM_LAST_OFFSET
consumeTimestamp N Backtracks consumption time with second precision. Time format is yyyymmddhhmmss. For example, 20131223171201 implies the time of 17:12:01 and date of December 23, 2013 time.Now().Add(time.Minute * (-30)).Format("20060102150405") 20131223171201
consumeOrderly N Determines if it’s an ordered message using FIFO order. false false
consumeMessageBatchMaxSize N Batch consumption size out of range [1, 1024] 512 10
consumeConcurrentlyMaxSpan N Concurrently max span offset. This has no effect on sequential consumption. Range: [1, 65535] 1000 1000
maxReconsumeTimes N Max re-consume times. -1 means 16 times. If messages are re-consumed more than {@link maxReconsumeTimes} before success, they’ll be directed to a deletion queue. Orderly message is MaxInt32; Concurrently message is 16 16
autoCommit N Enable auto commit true false
consumeTimeout N Maximum amount of time a message may block the consuming thread. Time unit: Minute 15 15
consumerPullTimeout N The socket timeout in milliseconds
pullInterval N Message pull interval 100 100
pullBatchSize N The number of messages pulled from the broker at a time. If pullBatchSize is null, use ConsumerBatchSize. pullBatchSize out of range [1, 1024] 32 10
pullThresholdForQueue N Flow control threshold on queue level. Each message queue will cache a maximum of 1000 messages by default. Consider the PullBatchSize - the instantaneous value may exceed the limit. Range: [1, 65535] 1024 1000
pullThresholdForTopic N Flow control threshold on topic level. The value of pullThresholdForQueue will be overwritten and calculated based on pullThresholdForTopic if it isn’t unlimited. For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, then pullThresholdForQueue will be set to 100. Range: [1, 6553500] -1(Unlimited) 10
pullThresholdSizeForQueue N Limit the cached message size on queue level. Consider the pullBatchSize - the instantaneous value may exceed the limit. The size of a message is only measured by message body, so it’s not accurate. Range: [1, 1024] 100 100
pullThresholdSizeForTopic N Limit the cached message size on topic level. The value of pullThresholdSizeForQueue will be overwritten and calculated based on pullThresholdSizeForTopic if it isn’t unlimited. For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB. Range: [1, 102400] -1 100
content-type N Message content type. "text/plain" "application/cloudevents+json; charset=utf-8", "application/octet-stream"
logLevel N Log level warn info
sendTimeOut N Send message timeout to connect RocketMQ’s broker, measured in nanoseconds. Deprecated. 3 seconds 10000000000
sendTimeOutSec N Timeout duration for publishing a message in seconds. If sendTimeOutSec is null, sendTimeOut is used. 3 seconds 3
mspProperties N The RocketMQ message properties in this collection are passed to the APP in Data Separate multiple properties with “,” key,mkey

For backwards-compatibility reasons, the following values in the metadata are supported, although their use is discouraged.

Field (supported but deprecated) Required Details Example
groupName N Producer group name for RocketMQ publishers "my_unique_group_name"
sendTimeOut N Timeout duration for publishing a message in nanoseconds 0
consumerBatchSize N The number of messages pulled from the broker at a time 32

Setup RocketMQ

See https://rocketmq.apache.org/docs/quick-start/ to setup a local RocketMQ instance.

Per-call metadata fields

Partition Key

When invoking the RocketMQ pub/sub, it’s possible to provide an optional partition key by using the metadata query param in the request url.

You need to specify rocketmq-tag , "rocketmq-key" , rocketmq-shardingkey , rocketmq-queue in metadata

Example:

curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

QueueSelector

The RocketMQ component contains a total of five queue selectors. The RocketMQ client provides the following queue selectors:

  • HashQueueSelector
  • RandomQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

To learn more about these RocketMQ client queue selectors, read the RocketMQ documentation.

The Dapr RocketMQ component implements the following queue selector:

  • DaprQueueSelector

This article focuses on the design of DaprQueueSelector.

DaprQueueSelector

DaprQueueSelector integrates three queue selectors:

  • HashQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

DaprQueueSelector gets the queue id from the request parameter. You can set the queue id by running the following:

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1

The ManualQueueSelector is implemented using the method above.

Next, the DaprQueueSelector tries to:

  • Get a ShardingKey
  • Hash the ShardingKey to determine the queue id.

You can set the ShardingKey by doing the following:

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key

If the ShardingKey does not exist, the RoundRobin algorithm is used to determine the queue id.

16 - Solace-AMQP

Detailed documentation on the Solace-AMQP pubsub component

Component format

To set up Solace-AMQP pub/sub, create a component of type pubsub.solace.amqp. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: solace
spec:
  type: pubsub.solace.amqp
  version: v1
  metadata:
    - name: url
      value: 'amqp://localhost:5672'
    - name: username
      value: 'default'
    - name: password
      value: 'default'
    - name: consumerID
      value: 'channel1'

Spec metadata fields

Field Required Details Example
url Y Address of the AMQP broker. Can be secretKeyRef to use a secret reference.
Use the amqp:// URI scheme for non-TLS communication.
Use the amqps:// URI scheme for TLS communication.
"amqp://host.domain[:port]"
username Y The username to connect to the broker. Only required if anonymous is not specified or set to false . default
password Y The password to connect to the broker. Only required if anonymous is not specified or set to false. default
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. Can be set to string value (such as "channel1" in the example above) or string format value (such as "{podName}", etc.). See all of template tags you can use in your component metadata.
anonymous N To connect to the broker without credential validation. Only works if enabled on the broker. A username and password would not be required if this is set to true. true
caCert Required for using TLS Certificate Authority (CA) certificate in PEM format for verifying server TLS certificates. "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert Required for using TLS TLS client certificate in PEM format. Must be used with clientKey. "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey Required for using TLS TLS client key in PEM format. Must be used with clientCert. Can be secretKeyRef to use a secret reference. "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"

Communication using TLS

To configure communication using TLS:

  1. Ensure that the Solace broker is configured to support certificates.
  2. Provide the caCert, clientCert, and clientKey metadata in the component configuration.

For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: solace
spec:
  type: pubsub.solace.amqp
  version: v1
  metadata:
  - name: url
    value: "amqps://host.domain[:port]"
  - name: username
    value: 'default'
  - name: password
    value: 'default'
  - name: caCert
    value: ${{ myLoadedCACert }}
  - name: clientCert
    value: ${{ myLoadedClientCert }}
  - name: clientKey
    secretKeyRef:
      name: mySolaceClientKey
      key: mySolaceClientKey
auth:
  secretStore: <SECRET_STORE_NAME>

While the caCert and clientCert values may not be secrets, they can be referenced from a Dapr secret store as well for convenience.

Publishing/subscribing to topics and queues

By default, messages are published and subscribed over topics. If you would like your destination to be a queue, prefix the topic with queue: and the Solace AMQP component will connect to a queue.

Create a Solace broker

You can run a Solace broker locally using Docker:

docker run -d -p 8080:8080 -p 55554:55555 -p 8008:8008 -p 1883:1883 -p 8000:8000 -p 5672:5672 -p 9000:9000 -p 2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard

You can then interact with the server using the client port: mqtt://localhost:5672

You can also sign up for a free SaaS broker on Solace Cloud.